diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageAndSessionPump.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageAndSessionPump.java index 83b3c3bfadd73..1d01aa765ce7e 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageAndSessionPump.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageAndSessionPump.java @@ -160,9 +160,16 @@ private void receiveAndPumpMessage() { } } else { // Abandon message - TRACE_LOGGER.debug("Abandoning message with sequence number '{}'", message.getSequenceNumber()); dispositionPhase = ExceptionPhase.ABANDON; - updateDispositionFuture = this.innerReceiver.abandonAsync(message.getLockToken()); + if(this.messageHandlerOptions.isAutoComplete()) + { + TRACE_LOGGER.debug("Abandoning message with sequence number '{}'", message.getSequenceNumber()); + updateDispositionFuture = this.innerReceiver.abandonAsync(message.getLockToken()); + } + else + { + updateDispositionFuture = CompletableFuture.completedFuture(null); + } } updateDispositionFuture.handleAsync((u, updateDispositionEx) -> { @@ -292,9 +299,16 @@ private void receiveFromSessionAndPumpMessage(SessionTracker sessionTracker) { } } else { // Abandon message - TRACE_LOGGER.debug("Abandoning message with sequence number '{}'", message.getSequenceNumber()); dispositionPhase = ExceptionPhase.ABANDON; - updateDispositionFuture = session.abandonAsync(message.getLockToken()); + if (this.sessionHandlerOptions.isAutoComplete()) + { + TRACE_LOGGER.debug("Abandoning message with sequence number '{}'", message.getSequenceNumber()); + updateDispositionFuture = session.abandonAsync(message.getLockToken()); + } + else + { + updateDispositionFuture = CompletableFuture.completedFuture(null); + } } updateDispositionFuture.handleAsync((u, updateDispositionEx) -> { @@ -452,17 +466,16 @@ public void cancelLoop() { } } - protected static Duration getNextRenewInterval(Instant lockedUntilUtc) { + protected static Duration getNextRenewInterval(Instant lockedUntilUtc, String identifier) { Duration remainingTime = Duration.between(Instant.now(), lockedUntilUtc); if (remainingTime.isNegative()) { - // Lock likely expired. May be there is clock skew. Assume some minimum time remainingTime = MessageAndSessionPump.MINIMUM_MESSAGE_LOCK_VALIDITY; - TRACE_LOGGER.warn("Lock already expired. May be there is clock skew. Still trying to renew lock"); + TRACE_LOGGER.warn("Lock of '{}' already expired. May be there is clock skew. Still trying to renew lock", identifier); } Duration buffer = remainingTime.dividedBy(2).compareTo(MAXIMUM_RENEW_LOCK_BUFFER) > 0 ? MAXIMUM_RENEW_LOCK_BUFFER : remainingTime.dividedBy(2); - TRACE_LOGGER.debug("Lock is valid for '{}'. It will be renewed '{}' before it expires.", remainingTime, buffer); + TRACE_LOGGER.debug("Lock of '{}' is valid for '{}'. It will be renewed '{}' before it expires.", identifier, remainingTime, buffer); return remainingTime.minus(buffer); } } @@ -472,6 +485,7 @@ private static class MessgeRenewLockLoop extends RenewLockLoop { private MessageAndSessionPump messageAndSessionPump; private IMessage message; private Instant stopRenewalAt; + private String messageIdentifier; ScheduledFuture timerFuture; MessgeRenewLockLoop(IMessageReceiver innerReceiver, MessageAndSessionPump messageAndSessionPump, IMessage message, Instant stopRenewalAt) { @@ -480,6 +494,7 @@ private static class MessgeRenewLockLoop extends RenewLockLoop { this.messageAndSessionPump = messageAndSessionPump; this.message = message; this.stopRenewalAt = stopRenewalAt; + this.messageIdentifier = String.format("message with locktoken : %s, sequence number : %s", this.message.getLockToken(), this.message.getSequenceNumber()); } @Override @@ -493,22 +508,21 @@ protected void loop() { Duration renewInterval = this.getNextRenewInterval(); if (renewInterval != null && !renewInterval.isNegative()) { this.timerFuture = Timer.schedule(() -> { - TRACE_LOGGER.debug("Renewing lock on message with sequence number '{}'", this.message.getSequenceNumber()); + TRACE_LOGGER.debug("Renewing lock on '{}'", this.messageIdentifier); this.innerReceiver.renewMessageLockAsync(message).handleAsync((v, renewLockEx) -> { if (renewLockEx != null) { renewLockEx = ExceptionUtil.extractAsyncCompletionCause(renewLockEx); - TRACE_LOGGER.error("Renewing lock on message with sequence number '{}' failed", this.message.getSequenceNumber(), renewLockEx); + TRACE_LOGGER.error("Renewing lock on '{}' failed", this.messageIdentifier, renewLockEx); this.messageAndSessionPump.notifyExceptionToMessageHandler(renewLockEx, ExceptionPhase.RENEWMESSAGELOCK); if (!(renewLockEx instanceof MessageLockLostException || renewLockEx instanceof OperationCancelledException)) { this.loop(); } } else { - TRACE_LOGGER.debug("Renewed lock on message with sequence number '{}'", this.message.getSequenceNumber()); + TRACE_LOGGER.debug("Renewed lock on '{}'", this.messageIdentifier); this.loop(); } - return null; }); }, renewInterval, TimerType.OneTimeRun); @@ -518,7 +532,7 @@ protected void loop() { private Duration getNextRenewInterval() { if (this.message.getLockedUntilUtc().isBefore(stopRenewalAt)) { - return RenewLockLoop.getNextRenewInterval(this.message.getLockedUntilUtc()); + return RenewLockLoop.getNextRenewInterval(this.message.getLockedUntilUtc(), this.messageIdentifier); } else { return null; } @@ -528,12 +542,14 @@ private Duration getNextRenewInterval() { private static class SessionRenewLockLoop extends RenewLockLoop { private IMessageSession session; private MessageAndSessionPump messageAndSessionPump; + private String sessionIdentifier; ScheduledFuture timerFuture; SessionRenewLockLoop(IMessageSession session, MessageAndSessionPump messageAndSessionPump) { super(); this.session = session; this.messageAndSessionPump = messageAndSessionPump; + this.sessionIdentifier = String.format("session with id:%s", this.session.getSessionId()); } @Override @@ -544,25 +560,24 @@ protected ScheduledFuture getTimerFuture() { @Override protected void loop() { if (!this.isCancelled()) { - Duration renewInterval = RenewLockLoop.getNextRenewInterval(this.session.getLockedUntilUtc()); + Duration renewInterval = RenewLockLoop.getNextRenewInterval(this.session.getLockedUntilUtc(), this.sessionIdentifier); if (renewInterval != null && !renewInterval.isNegative()) { this.timerFuture = Timer.schedule(() -> { - TRACE_LOGGER.debug("Renewing lock on session '{}'", this.session.getSessionId()); + TRACE_LOGGER.debug("Renewing lock on '{}'", this.sessionIdentifier); this.session.renewSessionLockAsync().handleAsync((v, renewLockEx) -> { if (renewLockEx != null) { renewLockEx = ExceptionUtil.extractAsyncCompletionCause(renewLockEx); - TRACE_LOGGER.error("Renewing lock on session '{}' failed", this.session.getSessionId(), renewLockEx); + TRACE_LOGGER.error("Renewing lock on '{}' failed", this.sessionIdentifier, renewLockEx); this.messageAndSessionPump.notifyExceptionToSessionHandler(renewLockEx, ExceptionPhase.RENEWSESSIONLOCK); if (!(renewLockEx instanceof SessionLockLostException || renewLockEx instanceof OperationCancelledException)) { this.loop(); } } else { - TRACE_LOGGER.debug("Renewed lock on session '{}'", this.session.getSessionId()); + TRACE_LOGGER.debug("Renewed lock on '{}'", this.sessionIdentifier); this.loop(); } - return null; }); }, renewInterval, TimerType.OneTimeRun); diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageConverter.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageConverter.java index b240429c0cf79..40883041ce5d5 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageConverter.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageConverter.java @@ -155,7 +155,7 @@ public static Message convertAmqpMessageToBrokeredMessage(org.apache.qpid.proton case ClientConstants.SEQUENCENUBMERNAME: brokeredMessage.setSequenceNumber((long)entry.getValue()); break; - case ClientConstants.LOCKEDUNTILNAME: + case ClientConstants.LOCKEDUNTILNAME: brokeredMessage.setLockedUntilUtc(((Date)entry.getValue()).toInstant()); break; case ClientConstants.PARTITIONKEYNAME: diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java index 0403da2dbc024..ea6d601842f32 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java @@ -1245,7 +1245,7 @@ public CompletableFuture> renewMessageLocksAsync(UUID[] lock { // error response Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage); - TRACE_LOGGER.error("Renewing message locks on entity '{}' failed", this.receivePath, failureException); + TRACE_LOGGER.error("Renewing message locks for lock tokens '{}' on entity '{}' failed", Arrays.toString(lockTokens), this.receivePath, failureException); returningFuture.completeExceptionally(failureException); } return returningFuture; @@ -1311,7 +1311,7 @@ public CompletableFuture> receiveDeferredMessag { // error response Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage); - TRACE_LOGGER.error("Receiving messages by sequence number from entity '{}' failed", this.receivePath, failureException); + TRACE_LOGGER.error("Receiving messages by sequence numbers '{}' from entity '{}' failed", Arrays.toString(sequenceNumbers), this.receivePath, failureException); returningFuture.completeExceptionally(failureException); } return returningFuture;