Skip to content

Commit

Permalink
Change to throw an exception if client attempt to perform an operatio…
Browse files Browse the repository at this point in the history
…n on a session object whose lock is lost on service side. (Azure#72)

* Fixing javadoc generation errors

* Adding dependency to use SLF4J logging instead of java util logging.

* More SLF4J migration

* Changed all java util logging statements to slf4j.

* Checking in the POM that goes with the release to maven reposisory.

* Removed unused imports

* Instrumented all code with SLF4J logging.

* Fixing some log statements

* Moving request-response link creation to message factory so we don't create multiple links to the same entity.

* Updaing third party notice to include SLF4J license.

* Updating distributed POM

* Renaming setContent method to setBody.

* Fixing a typo in log statement.

* Handling the special case of 0 timeout for receives. If there are no prefetched messages, receive call returns immediately with already prefetched messages.

* Correcting a log statement

* Added javadoc comments for rules package.

* Adding javadoc comments. Work in progress.

* More javadoc comments added.

* Fixing a minor bug in sender and receiver creation. Closes the messaging factory if sender or receiver creation fails.

* Fixing the issue of not renewing CBS tokens. Also a concurrency fix.

* Fixing a bug in session receiver to not repeatedly attempt accepting the same session on lock lost.
  • Loading branch information
yvgopal authored Jul 7, 2017
1 parent d9979d5 commit d6f9b92
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class CoreMessageReceiver extends ClientEntity implements IAmqpReceiver,
private boolean isSessionReceiver;
private boolean isBrowsableSession;
private Instant sessionLockedUntilUtc;
private boolean isSessionLockLost;

private ConcurrentLinkedQueue<MessageWithDeliveryTag> prefetchedMessages;
private Receiver receiveLink;
Expand Down Expand Up @@ -329,6 +330,16 @@ CompletableFuture<Void> sendSASTokenAndSetRenewTimer()
return sendTokenFuture.thenAccept((f) -> {this.sasTokenRenewTimerFuture = f;TRACE_LOGGER.debug("Sent SAS Token and set renew timer");});
}
}

private void throwIfInUnusableState()
{
if(this.isSessionReceiver && this.isSessionLockLost)
{
throw new IllegalStateException("Session lock lost and cannot be used. Close this session and accept another session.");
}

this.throwIfClosed(this.lastKnownLinkError);
}

private void cancelSASTokenRenewTimer()
{
Expand Down Expand Up @@ -391,7 +402,8 @@ public Instant getSessionLockedUntilUtc()
}

public void setPrefetchCount(final int value) throws ServiceBusException
{
{
this.throwIfInUnusableState();
final int deltaPrefetchCount;
synchronized (this.prefetchCountSync)
{
Expand Down Expand Up @@ -424,7 +436,7 @@ public CompletableFuture<Collection<MessageWithDeliveryTag>> receiveAsync(final

public CompletableFuture<Collection<MessageWithDeliveryTag>> receiveAsync(final int maxMessageCount, Duration timeout)
{
this.throwIfClosed(this.lastKnownLinkError);
this.throwIfInUnusableState();

if (maxMessageCount <= 0 || maxMessageCount > this.prefetchCount)
{
Expand Down Expand Up @@ -535,6 +547,7 @@ public void onOpenComplete(Exception exception)
}
else
{
TRACE_LOGGER.warn("Accepted a session with id '{}', from '{}' which didn't set '{}' property on the receive link.", this.sessionId, this.receivePath, ClientConstants.LOCKED_UNTIL_UTC);
this.sessionLockedUntilUtc = Instant.ofEpochMilli(0);
}

Expand Down Expand Up @@ -739,48 +752,61 @@ public void onError(Exception exception)
{
TRACE_LOGGER.error("Receive link to '{}' closed with error.", this.receivePath, exception);
this.lastKnownLinkError = exception;
this.onOpenComplete(exception);

if (exception != null &&
(!(exception instanceof ServiceBusException) || !((ServiceBusException) exception).getIsTransient()))
if (this.linkOpen != null && !this.linkOpen.getWork().isDone())
{
this.clearAllPendingWorkItems(exception);
this.onOpenComplete(exception);
}
else
{
// TODO change it. Why recreating link needs to wait for retry interval of pending receive?
ReceiveWorkItem workItem = null;
if(!this.pendingReceives.isEmpty())
{
workItem = this.pendingReceives.get(0);
}

if (workItem != null && workItem.getTimeoutTracker() != null)
{
Duration nextRetryInterval = this.underlyingFactory.getRetryPolicy()
.getNextRetryInterval(this.getClientId(), exception, workItem.getTimeoutTracker().remaining());
if (nextRetryInterval != null)
{
TRACE_LOGGER.error("Receive link to '{}' will be reopened after '{}'", this.receivePath, nextRetryInterval);
try
{
this.underlyingFactory.scheduleOnReactorThread((int) nextRetryInterval.toMillis(), new DispatchHandler()
{
@Override
public void onEvent()
{
if (receiveLink.getLocalState() == EndpointState.CLOSED || receiveLink.getRemoteState() == EndpointState.CLOSED)
{
createReceiveLink();
}
}
});
}
catch (IOException ignore)
{
}
}
}
{
if (exception != null &&
(!(exception instanceof ServiceBusException) || !((ServiceBusException) exception).getIsTransient()))
{
this.clearAllPendingWorkItems(exception);

if(this.isSessionReceiver && (exception instanceof SessionLockLostException || exception instanceof SessionCannotBeLockedException))
{
// No point in retrying to establish a link.. SessionLock is lost
TRACE_LOGGER.warn("Session '{}' lock lost. Closing receiver.", this.sessionId);
this.isSessionLockLost = true;
this.closeAsync();
}
}
else
{
// TODO change it. Why recreating link needs to wait for retry interval of pending receive?
ReceiveWorkItem workItem = null;
if(!this.pendingReceives.isEmpty())
{
workItem = this.pendingReceives.get(0);
}

if (workItem != null && workItem.getTimeoutTracker() != null)
{
Duration nextRetryInterval = this.underlyingFactory.getRetryPolicy()
.getNextRetryInterval(this.getClientId(), exception, workItem.getTimeoutTracker().remaining());
if (nextRetryInterval != null)
{
TRACE_LOGGER.error("Receive link to '{}' will be reopened after '{}'", this.receivePath, nextRetryInterval);
try
{
this.underlyingFactory.scheduleOnReactorThread((int) nextRetryInterval.toMillis(), new DispatchHandler()
{
@Override
public void onEvent()
{
if (receiveLink.getLocalState() == EndpointState.CLOSED || receiveLink.getRemoteState() == EndpointState.CLOSED)
{
createReceiveLink();
}
}
});
}
catch (IOException ignore)
{
}
}
}
}
}
}
}
Expand Down Expand Up @@ -1001,7 +1027,7 @@ public CompletableFuture<Void> deadLetterMessageAsync(UUID lockToken, String dea

private CompletableFuture<Void> updateMessageStateAsync(byte[] deliveryTag, Outcome outcome)
{
this.throwIfClosed(this.lastKnownLinkError);
this.throwIfInUnusableState();
CompletableFuture<Void> completeMessageFuture = new CompletableFuture<Void>();

try
Expand Down Expand Up @@ -1039,7 +1065,7 @@ public void onEvent()
}

private void ensureLinkIsOpen()
{
{
if (this.receiveLink.getLocalState() == EndpointState.CLOSED || this.receiveLink.getRemoteState() == EndpointState.CLOSED)
{
this.createReceiveLink();
Expand Down Expand Up @@ -1108,6 +1134,7 @@ private static ServiceBusException generateDispatacherSchedulingFailedException(

public CompletableFuture<Collection<Instant>> renewMessageLocksAsync(UUID[] lockTokens)
{
this.throwIfInUnusableState();
if(TRACE_LOGGER.isDebugEnabled())
{
TRACE_LOGGER.debug("Renewing message locks for lock tokens '{}' of entity '{}', sesion '{}'", Arrays.toString(lockTokens), this.receivePath, this.isSessionReceiver ? this.getSessionId() : "");
Expand Down Expand Up @@ -1149,6 +1176,7 @@ public CompletableFuture<Collection<Instant>> renewMessageLocksAsync(UUID[] lock

public CompletableFuture<Collection<MessageWithLockToken>> receiveBySequenceNumbersAsync(Long[] sequenceNumbers)
{
this.throwIfInUnusableState();
if(TRACE_LOGGER.isDebugEnabled())
{
TRACE_LOGGER.debug("Receiving messges for sequence numbers '{}' from entity '{}', sesion '{}'", Arrays.toString(sequenceNumbers), this.receivePath, this.isSessionReceiver ? this.getSessionId() : "");
Expand Down Expand Up @@ -1214,6 +1242,7 @@ public CompletableFuture<Collection<MessageWithLockToken>> receiveBySequenceNumb

public CompletableFuture<Void> updateDispositionAsync(UUID[] lockTokens, String dispositionStatus, String deadLetterReason, String deadLetterErrorDescription, Map<String, Object> propertiesToModify)
{
this.throwIfInUnusableState();
if(TRACE_LOGGER.isDebugEnabled())
{
TRACE_LOGGER.debug("Update disposition of deliveries '{}' to '{}' on entity '{}', sesion '{}'", Arrays.toString(lockTokens), dispositionStatus, this.receivePath, this.isSessionReceiver ? this.getSessionId() : "");
Expand Down Expand Up @@ -1270,6 +1299,7 @@ public CompletableFuture<Void> updateDispositionAsync(UUID[] lockTokens, String

public CompletableFuture<Void> renewSessionLocksAsync()
{
this.throwIfInUnusableState();
TRACE_LOGGER.debug("Renewing session lock on entity '{}' of sesion '{}'", this.receivePath, this.getSessionId());
return this.createRequestResponseLinkAsync().thenComposeAsync((v) -> {
HashMap requestBodyMap = new HashMap();
Expand Down Expand Up @@ -1301,6 +1331,7 @@ public CompletableFuture<Void> renewSessionLocksAsync()

public CompletableFuture<byte[]> getSessionStateAsync()
{
this.throwIfInUnusableState();
TRACE_LOGGER.debug("Getting session state of sesion '{}' from entity '{}'", this.getSessionId(), this.receivePath);
return this.createRequestResponseLinkAsync().thenComposeAsync((v) -> {
HashMap requestBodyMap = new HashMap();
Expand Down Expand Up @@ -1342,6 +1373,7 @@ public CompletableFuture<byte[]> getSessionStateAsync()
// NULL session state is allowed
public CompletableFuture<Void> setSessionStateAsync(byte[] sessionState)
{
this.throwIfInUnusableState();
TRACE_LOGGER.debug("Setting session state of sesion '{}' on entity '{}'", this.getSessionId(), this.receivePath);
return this.createRequestResponseLinkAsync().thenComposeAsync((v) -> {
HashMap requestBodyMap = new HashMap();
Expand Down Expand Up @@ -1373,6 +1405,7 @@ public CompletableFuture<Void> setSessionStateAsync(byte[] sessionState)
// A receiver can be used to peek messages from any session-id, useful for browsable sessions
public CompletableFuture<Collection<Message>> peekMessagesAsync(long fromSequenceNumber, int messageCount, String sessionId)
{
this.throwIfInUnusableState();
return this.createRequestResponseLinkAsync().thenComposeAsync((v) -> {
return CommonRequestResponseOperations.peekMessagesAsync(this.requestResponseLink, this.operationTimeout, fromSequenceNumber, messageCount, sessionId);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,7 @@ static int sizeof(Object obj)

throw new IllegalArgumentException(String.format(Locale.US, "Encoding Type: %s is not supported", obj.getClass()));
}

// Unused now.. ServiceBus service serializes DateTime types as java time as per AMQP spec

// .Net ticks are measured from 01/01/0001, java instants are measured from 01/01/1970
public static Instant convertDotNetTicksToInstant(long dotNetTicks)
{
Expand Down

0 comments on commit d6f9b92

Please sign in to comment.