Skip to content

Commit

Permalink
Improving tracing in renew lock path (Azure#99)
Browse files Browse the repository at this point in the history
* Fixing javadoc generation errors

* Adding dependency to use SLF4J logging instead of java util logging.

* More SLF4J migration

* Changed all java util logging statements to slf4j.

* Checking in the POM that goes with the release to maven reposisory.

* Removed unused imports

* Instrumented all code with SLF4J logging.

* Fixing some log statements

* Moving request-response link creation to message factory so we don't create multiple links to the same entity.

* Updaing third party notice to include SLF4J license.

* Updating distributed POM

* Renaming setContent method to setBody.

* Fixing a typo in log statement.

* Handling the special case of 0 timeout for receives. If there are no prefetched messages, receive call returns immediately with already prefetched messages.

* Correcting a log statement

* Added javadoc comments for rules package.

* Adding javadoc comments. Work in progress.

* More javadoc comments added.

* Fixing a minor bug in sender and receiver creation. Closes the messaging factory if sender or receiver creation fails.

* Fixing the issue of not renewing CBS tokens. Also a concurrency fix.

* Fixing a bug in session receiver to not repeatedly attempt accepting the same session on lock lost.

* Added some java docs.. And changed default prefetch count, based on receive mode.

* Fixing a thread unending wait bug in request-response link.

* Fixing AuthorizationFailed exceptions that pop when the conncetion is recreated.

* Minor tweaks

* Another minor fix

* More minor bug fixes and tweaks

* Changing version to 1.0.0-RC-1

* Fixing some concurrency issues, performance improvements and refactoring.

* Minor fix in receiver

* Changing return messages loop interval to 100 milliseconds

* Changing zero timeout approximation to 200 milliseconds

* Changing zero timeout approximation to 200 milliseconds. Also improving tracing to better identify stress issues.
  • Loading branch information
yvgopal authored Aug 10, 2017
1 parent d9d2dd3 commit f7dccf5
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,16 @@ private void receiveAndPumpMessage() {
}
} else {
// Abandon message
TRACE_LOGGER.debug("Abandoning message with sequence number '{}'", message.getSequenceNumber());
dispositionPhase = ExceptionPhase.ABANDON;
updateDispositionFuture = this.innerReceiver.abandonAsync(message.getLockToken());
if(this.messageHandlerOptions.isAutoComplete())
{
TRACE_LOGGER.debug("Abandoning message with sequence number '{}'", message.getSequenceNumber());
updateDispositionFuture = this.innerReceiver.abandonAsync(message.getLockToken());
}
else
{
updateDispositionFuture = CompletableFuture.completedFuture(null);
}
}

updateDispositionFuture.handleAsync((u, updateDispositionEx) -> {
Expand Down Expand Up @@ -292,9 +299,16 @@ private void receiveFromSessionAndPumpMessage(SessionTracker sessionTracker) {
}
} else {
// Abandon message
TRACE_LOGGER.debug("Abandoning message with sequence number '{}'", message.getSequenceNumber());
dispositionPhase = ExceptionPhase.ABANDON;
updateDispositionFuture = session.abandonAsync(message.getLockToken());
if (this.sessionHandlerOptions.isAutoComplete())
{
TRACE_LOGGER.debug("Abandoning message with sequence number '{}'", message.getSequenceNumber());
updateDispositionFuture = session.abandonAsync(message.getLockToken());
}
else
{
updateDispositionFuture = CompletableFuture.completedFuture(null);
}
}

updateDispositionFuture.handleAsync((u, updateDispositionEx) -> {
Expand Down Expand Up @@ -452,17 +466,16 @@ public void cancelLoop() {
}
}

protected static Duration getNextRenewInterval(Instant lockedUntilUtc) {
protected static Duration getNextRenewInterval(Instant lockedUntilUtc, String identifier) {
Duration remainingTime = Duration.between(Instant.now(), lockedUntilUtc);
if (remainingTime.isNegative()) {

// Lock likely expired. May be there is clock skew. Assume some minimum time
remainingTime = MessageAndSessionPump.MINIMUM_MESSAGE_LOCK_VALIDITY;
TRACE_LOGGER.warn("Lock already expired. May be there is clock skew. Still trying to renew lock");
TRACE_LOGGER.warn("Lock of '{}' already expired. May be there is clock skew. Still trying to renew lock", identifier);
}

Duration buffer = remainingTime.dividedBy(2).compareTo(MAXIMUM_RENEW_LOCK_BUFFER) > 0 ? MAXIMUM_RENEW_LOCK_BUFFER : remainingTime.dividedBy(2);
TRACE_LOGGER.debug("Lock is valid for '{}'. It will be renewed '{}' before it expires.", remainingTime, buffer);
TRACE_LOGGER.debug("Lock of '{}' is valid for '{}'. It will be renewed '{}' before it expires.", identifier, remainingTime, buffer);
return remainingTime.minus(buffer);
}
}
Expand All @@ -472,6 +485,7 @@ private static class MessgeRenewLockLoop extends RenewLockLoop {
private MessageAndSessionPump messageAndSessionPump;
private IMessage message;
private Instant stopRenewalAt;
private String messageIdentifier;
ScheduledFuture<?> timerFuture;

MessgeRenewLockLoop(IMessageReceiver innerReceiver, MessageAndSessionPump messageAndSessionPump, IMessage message, Instant stopRenewalAt) {
Expand All @@ -480,6 +494,7 @@ private static class MessgeRenewLockLoop extends RenewLockLoop {
this.messageAndSessionPump = messageAndSessionPump;
this.message = message;
this.stopRenewalAt = stopRenewalAt;
this.messageIdentifier = String.format("message with locktoken : %s, sequence number : %s", this.message.getLockToken(), this.message.getSequenceNumber());
}

@Override
Expand All @@ -493,22 +508,21 @@ protected void loop() {
Duration renewInterval = this.getNextRenewInterval();
if (renewInterval != null && !renewInterval.isNegative()) {
this.timerFuture = Timer.schedule(() -> {
TRACE_LOGGER.debug("Renewing lock on message with sequence number '{}'", this.message.getSequenceNumber());
TRACE_LOGGER.debug("Renewing lock on '{}'", this.messageIdentifier);
this.innerReceiver.renewMessageLockAsync(message).handleAsync((v, renewLockEx) ->
{
if (renewLockEx != null) {
renewLockEx = ExceptionUtil.extractAsyncCompletionCause(renewLockEx);
TRACE_LOGGER.error("Renewing lock on message with sequence number '{}' failed", this.message.getSequenceNumber(), renewLockEx);
TRACE_LOGGER.error("Renewing lock on '{}' failed", this.messageIdentifier, renewLockEx);
this.messageAndSessionPump.notifyExceptionToMessageHandler(renewLockEx, ExceptionPhase.RENEWMESSAGELOCK);
if (!(renewLockEx instanceof MessageLockLostException || renewLockEx instanceof OperationCancelledException)) {
this.loop();
}
} else {
TRACE_LOGGER.debug("Renewed lock on message with sequence number '{}'", this.message.getSequenceNumber());
TRACE_LOGGER.debug("Renewed lock on '{}'", this.messageIdentifier);
this.loop();
}


return null;
});
}, renewInterval, TimerType.OneTimeRun);
Expand All @@ -518,7 +532,7 @@ protected void loop() {

private Duration getNextRenewInterval() {
if (this.message.getLockedUntilUtc().isBefore(stopRenewalAt)) {
return RenewLockLoop.getNextRenewInterval(this.message.getLockedUntilUtc());
return RenewLockLoop.getNextRenewInterval(this.message.getLockedUntilUtc(), this.messageIdentifier);
} else {
return null;
}
Expand All @@ -528,12 +542,14 @@ private Duration getNextRenewInterval() {
private static class SessionRenewLockLoop extends RenewLockLoop {
private IMessageSession session;
private MessageAndSessionPump messageAndSessionPump;
private String sessionIdentifier;
ScheduledFuture<?> timerFuture;

SessionRenewLockLoop(IMessageSession session, MessageAndSessionPump messageAndSessionPump) {
super();
this.session = session;
this.messageAndSessionPump = messageAndSessionPump;
this.sessionIdentifier = String.format("session with id:%s", this.session.getSessionId());
}

@Override
Expand All @@ -544,25 +560,24 @@ protected ScheduledFuture<?> getTimerFuture() {
@Override
protected void loop() {
if (!this.isCancelled()) {
Duration renewInterval = RenewLockLoop.getNextRenewInterval(this.session.getLockedUntilUtc());
Duration renewInterval = RenewLockLoop.getNextRenewInterval(this.session.getLockedUntilUtc(), this.sessionIdentifier);
if (renewInterval != null && !renewInterval.isNegative()) {
this.timerFuture = Timer.schedule(() -> {
TRACE_LOGGER.debug("Renewing lock on session '{}'", this.session.getSessionId());
TRACE_LOGGER.debug("Renewing lock on '{}'", this.sessionIdentifier);
this.session.renewSessionLockAsync().handleAsync((v, renewLockEx) ->
{
if (renewLockEx != null) {
renewLockEx = ExceptionUtil.extractAsyncCompletionCause(renewLockEx);
TRACE_LOGGER.error("Renewing lock on session '{}' failed", this.session.getSessionId(), renewLockEx);
TRACE_LOGGER.error("Renewing lock on '{}' failed", this.sessionIdentifier, renewLockEx);
this.messageAndSessionPump.notifyExceptionToSessionHandler(renewLockEx, ExceptionPhase.RENEWSESSIONLOCK);
if (!(renewLockEx instanceof SessionLockLostException || renewLockEx instanceof OperationCancelledException)) {
this.loop();
}
} else {
TRACE_LOGGER.debug("Renewed lock on session '{}'", this.session.getSessionId());
TRACE_LOGGER.debug("Renewed lock on '{}'", this.sessionIdentifier);
this.loop();
}


return null;
});
}, renewInterval, TimerType.OneTimeRun);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public static Message convertAmqpMessageToBrokeredMessage(org.apache.qpid.proton
case ClientConstants.SEQUENCENUBMERNAME:
brokeredMessage.setSequenceNumber((long)entry.getValue());
break;
case ClientConstants.LOCKEDUNTILNAME:
case ClientConstants.LOCKEDUNTILNAME:
brokeredMessage.setLockedUntilUtc(((Date)entry.getValue()).toInstant());
break;
case ClientConstants.PARTITIONKEYNAME:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1245,7 +1245,7 @@ public CompletableFuture<Collection<Instant>> renewMessageLocksAsync(UUID[] lock
{
// error response
Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage);
TRACE_LOGGER.error("Renewing message locks on entity '{}' failed", this.receivePath, failureException);
TRACE_LOGGER.error("Renewing message locks for lock tokens '{}' on entity '{}' failed", Arrays.toString(lockTokens), this.receivePath, failureException);
returningFuture.completeExceptionally(failureException);
}
return returningFuture;
Expand Down Expand Up @@ -1311,7 +1311,7 @@ public CompletableFuture<Collection<MessageWithLockToken>> receiveDeferredMessag
{
// error response
Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage);
TRACE_LOGGER.error("Receiving messages by sequence number from entity '{}' failed", this.receivePath, failureException);
TRACE_LOGGER.error("Receiving messages by sequence numbers '{}' from entity '{}' failed", Arrays.toString(sequenceNumbers), this.receivePath, failureException);
returningFuture.completeExceptionally(failureException);
}
return returningFuture;
Expand Down

0 comments on commit f7dccf5

Please sign in to comment.