diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java index f75770ea8..85ef8a5d8 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java @@ -132,7 +132,7 @@ public static CompletableFuture createFromConnectionString(final final EventHubClient eventHubClient = new EventHubClient(connStr); return MessagingFactory.createFromConnectionString(connectionString.toString(), retryPolicy) - .thenApplyAsync(new Function() { + .thenApply(new Function() { @Override public EventHubClient apply(MessagingFactory factory) { eventHubClient.underlyingFactory = factory; @@ -205,7 +205,7 @@ public final CompletableFuture send(final EventData data) { throw new IllegalArgumentException("EventData cannot be empty."); } - return this.createInternalSender().thenComposeAsync(new Function>() { + return this.createInternalSender().thenCompose(new Function>() { @Override public CompletableFuture apply(Void voidArg) { return EventHubClient.this.sender.send(data.toAmqpMessage()); @@ -295,7 +295,7 @@ public final CompletableFuture send(final Iterable eventDatas) throw new IllegalArgumentException("Empty batch of EventData cannot be sent."); } - return this.createInternalSender().thenComposeAsync(new Function>() { + return this.createInternalSender().thenCompose(new Function>() { @Override public CompletableFuture apply(Void voidArg) { return EventHubClient.this.sender.send(EventDataUtil.toAmqpMessages(eventDatas)); @@ -371,7 +371,7 @@ public final CompletableFuture send(final EventData eventData, final Strin throw new IllegalArgumentException("partitionKey cannot be null"); } - return this.createInternalSender().thenComposeAsync(new Function>() { + return this.createInternalSender().thenCompose(new Function>() { @Override public CompletableFuture apply(Void voidArg) { return EventHubClient.this.sender.send(eventData.toAmqpMessage(partitionKey)); @@ -445,7 +445,7 @@ public final CompletableFuture send(final Iterable eventDatas, String.format(Locale.US, "PartitionKey exceeds the maximum allowed length of partitionKey: {0}", ClientConstants.MAX_PARTITION_KEY_LENGTH)); } - return this.createInternalSender().thenComposeAsync(new Function>() { + return this.createInternalSender().thenCompose(new Function>() { @Override public CompletableFuture apply(Void voidArg) { return EventHubClient.this.sender.send(EventDataUtil.toAmqpMessages(eventDatas, partitionKey)); @@ -1222,7 +1222,7 @@ public CompletableFuture onClose() { if (this.underlyingFactory != null) { synchronized (this.senderCreateSync) { final CompletableFuture internalSenderClose = this.sender != null - ? this.sender.close().thenComposeAsync(new Function>() { + ? this.sender.close().thenCompose(new Function>() { @Override public CompletableFuture apply(Void voidArg) { return EventHubClient.this.underlyingFactory.close(); @@ -1242,7 +1242,7 @@ private CompletableFuture createInternalSender() { synchronized (this.senderCreateSync) { if (!this.isSenderCreateStarted) { this.createSender = MessageSender.create(this.underlyingFactory, StringUtil.getRandomString(), this.eventHubName) - .thenAcceptAsync(new Consumer() { + .thenAccept(new Consumer() { public void accept(MessageSender a) { EventHubClient.this.sender = a; } diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java index 420d6ecc0..c36826ec4 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java @@ -124,7 +124,7 @@ static CompletableFuture create(MessagingFactory factory, } final PartitionReceiver receiver = new PartitionReceiver(factory, eventHubName, consumerGroupName, partitionId, startingOffset, offsetInclusive, dateTime, epoch, isEpochReceiver, receiverOptions); - return receiver.createInternalReceiver().thenApplyAsync(new Function() { + return receiver.createInternalReceiver().thenApply(new Function() { public PartitionReceiver apply(Void a) { return receiver; } @@ -136,7 +136,7 @@ private CompletableFuture createInternalReceiver() throws ServiceBusExcept StringUtil.getRandomString(), String.format("%s/ConsumerGroups/%s/Partitions/%s", this.eventHubName, this.consumerGroupName, this.partitionId), PartitionReceiver.DEFAULT_PREFETCH_COUNT, this) - .thenAcceptAsync(new Consumer() { + .thenAccept(new Consumer() { public void accept(MessageReceiver r) { PartitionReceiver.this.internalReceiver = r; } diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionSender.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionSender.java index 7abd5ece3..5159c3c4e 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionSender.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionSender.java @@ -37,7 +37,7 @@ private PartitionSender(MessagingFactory factory, String eventHubName, String pa static CompletableFuture Create(MessagingFactory factory, String eventHubName, String partitionId) throws ServiceBusException { final PartitionSender sender = new PartitionSender(factory, eventHubName, partitionId); return sender.createInternalSender() - .thenApplyAsync(new Function() { + .thenApply(new Function() { public PartitionSender apply(Void a) { return sender; } @@ -47,7 +47,7 @@ public PartitionSender apply(Void a) { private CompletableFuture createInternalSender() throws ServiceBusException { return MessageSender.create(this.factory, StringUtil.getRandomString(), String.format("%s/Partitions/%s", this.eventHubName, this.partitionId)) - .thenAcceptAsync(new Consumer() { + .thenAccept(new Consumer() { public void accept(MessageSender a) { PartitionSender.this.internalSender = a; } diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/CBSChannel.java b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/CBSChannel.java index dc7f23796..f9115e7e8 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/CBSChannel.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/CBSChannel.java @@ -10,6 +10,7 @@ import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; +import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.transport.ErrorCondition; @@ -100,21 +101,26 @@ private class OpenRequestResponseChannel implements IOperation operationCallback) { + Session session = CBSChannel.this.sessionProvider.getSession( + "cbs-session", + null, + new BiConsumer() { + @Override + public void accept(ErrorCondition error, Exception exception) { + if (error != null) + operationCallback.onError(new AmqpException(error)); + else if (exception != null) + operationCallback.onError(exception); + } + }); + + if (session == null) + return; + final RequestResponseChannel requestResponseChannel = new RequestResponseChannel( "cbs", ClientConstants.CBS_ADDRESS, - CBSChannel.this.sessionProvider.getSession( - "cbs-session", - null, - new BiConsumer() { - @Override - public void accept(ErrorCondition error, Exception exception) { - if (error != null) - operationCallback.onError(new AmqpException(error)); - else if (exception != null) - operationCallback.onError(exception); - } - })); + session); requestResponseChannel.open( new IOperationResult() { diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ClientEntity.java b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ClientEntity.java index 2dfdac07d..6c193f96e 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ClientEntity.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ClientEntity.java @@ -102,9 +102,13 @@ public final void closeSync() throws ServiceBusException { } } - protected final void throwIfClosed(Throwable cause) { + protected final void throwIfClosed() { if (this.getIsClosingOrClosed()) { - throw new IllegalStateException(String.format(Locale.US, "Operation not allowed after the %s instance is Closed.", this.getClass().getName()), cause); + throw new IllegalStateException(String.format(Locale.US, "Operation not allowed after the %s instance is Closed.", this.getClass().getName()), this.getLastKnownError()); } } + + protected Exception getLastKnownError() { + return null; + } } diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java index 7500d4d05..f6c80e692 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java @@ -64,6 +64,7 @@ public final class MessageReceiver extends ClientEntity implements IAmqpReceiver private final ConcurrentLinkedQueue prefetchedMessages; private final ReceiveWork receiveWork; private final CreateAndReceive createAndReceive; + private final Object errorCasesLock; private int prefetchCount; private Receiver receiveLink; @@ -95,6 +96,7 @@ private MessageReceiver(final MessagingFactory factory, this.linkOpen = new WorkItem<>(new CompletableFuture<>(), factory.getOperationTimeout()); this.pendingReceives = new ConcurrentLinkedQueue<>(); + this.errorCasesLock = new Object(); // onOperationTimeout delegate - per receive call this.onOperationTimedout = new Runnable() { @@ -250,7 +252,7 @@ public void setReceiveTimeout(final Duration value) { } public CompletableFuture> receive(final int maxMessageCount) { - this.throwIfClosed(this.lastKnownLinkError); + this.throwIfClosed(); if (maxMessageCount <= 0 || maxMessageCount > this.prefetchCount) { throw new IllegalArgumentException(String.format(Locale.US, "parameter 'maxMessageCount' should be a positive number and should be less than prefetchCount(%s)", this.prefetchCount)); @@ -289,7 +291,9 @@ public void onOpenComplete(Exception exception) { this.openTimer.cancel(false); } - this.lastKnownLinkError = null; + synchronized (this.errorCasesLock) { + this.lastKnownLinkError = null; + } this.underlyingFactory.getRetryPolicy().resetRetryCount(this.underlyingFactory.getClientId()); @@ -330,11 +334,6 @@ public void onReceiveComplete(Delivery delivery) { this.receiveWork.onEvent(); } - public void onError(final ErrorCondition error) { - final Exception completionException = ExceptionUtil.toException(error); - this.onError(completionException); - } - @Override public void onError(final Exception exception) { this.prefetchedMessages.clear(); @@ -358,7 +357,9 @@ public void onError(final Exception exception) { this.linkClose.complete(null); } else { - this.lastKnownLinkError = exception == null ? this.lastKnownLinkError : exception; + synchronized (this.errorCasesLock) { + this.lastKnownLinkError = exception == null ? this.lastKnownLinkError : exception; + } final Exception completionException = exception == null ? new ServiceBusException(true, "Client encountered transient error for unknown reasons, please retry the operation.") : exception; @@ -452,7 +453,9 @@ public void accept(Session session) { receiver.open(); - MessageReceiver.this.receiveLink = receiver; + synchronized (MessageReceiver.this.errorCasesLock) { + MessageReceiver.this.receiveLink = receiver; + } } }; @@ -460,7 +463,7 @@ public void accept(Session session) { @Override public void accept(ErrorCondition t, Exception u) { if (t != null) - onError(t); + onError((t != null && t.getCondition() != null) ? ExceptionUtil.toException(t) : null); else if (u != null) onError(u); } @@ -526,12 +529,19 @@ private void scheduleLinkOpenTimeout(final TimeoutTracker timeout) { new Runnable() { public void run() { if (!linkOpen.getWork().isDone()) { - Exception operationTimedout = new TimeoutException( - String.format(Locale.US, "%s operation on ReceiveLink(%s) to path(%s) timed out at %s.", "Open", MessageReceiver.this.receiveLink.getName(), MessageReceiver.this.receivePath, ZonedDateTime.now()), - MessageReceiver.this.lastKnownLinkError); + final Receiver link; + final Exception lastReportedLinkError; + synchronized (errorCasesLock) { + link = MessageReceiver.this.receiveLink; + lastReportedLinkError = MessageReceiver.this.lastKnownLinkError; + } + + final Exception operationTimedout = new TimeoutException( + String.format(Locale.US, "%s operation on ReceiveLink(%s) to path(%s) timed out at %s.", "Open", link.getName(), MessageReceiver.this.receivePath, ZonedDateTime.now()), + lastReportedLinkError); if (TRACE_LOGGER.isLoggable(Level.WARNING)) { TRACE_LOGGER.log(Level.WARNING, - String.format(Locale.US, "receiverPath[%s], linkName[%s], %s call timedout", MessageReceiver.this.receivePath, MessageReceiver.this.receiveLink.getName(), "Open"), + String.format(Locale.US, "receiverPath[%s], linkName[%s], %s call timedout", MessageReceiver.this.receivePath, link.getName(), "Open"), operationTimedout); } @@ -549,10 +559,15 @@ private void scheduleLinkCloseTimeout(final TimeoutTracker timeout) { new Runnable() { public void run() { if (!linkClose.isDone()) { - Exception operationTimedout = new TimeoutException(String.format(Locale.US, "%s operation on Receive Link(%s) timed out at %s", "Close", MessageReceiver.this.receiveLink.getName(), ZonedDateTime.now())); + final Receiver link; + synchronized (errorCasesLock) { + link = MessageReceiver.this.receiveLink; + } + + final Exception operationTimedout = new TimeoutException(String.format(Locale.US, "%s operation on Receive Link(%s) timed out at %s", "Close", link.getName(), ZonedDateTime.now())); if (TRACE_LOGGER.isLoggable(Level.WARNING)) { TRACE_LOGGER.log(Level.WARNING, - String.format(Locale.US, "receiverPath[%s], linkName[%s], %s call timedout", MessageReceiver.this.receivePath, MessageReceiver.this.receiveLink.getName(), "Close"), + String.format(Locale.US, "receiverPath[%s], linkName[%s], %s call timedout", MessageReceiver.this.receivePath, link.getName(), "Close"), operationTimedout); } @@ -567,25 +582,27 @@ public void run() { @Override public void onClose(ErrorCondition condition) { - if (condition == null || condition.getCondition() == null) { - this.onError((Exception) null); - } else { - this.onError(condition); - } + final Exception completionException = (condition != null && condition.getCondition() != null) ? ExceptionUtil.toException(condition) : null; + this.onError(completionException); } @Override public ErrorContext getContext() { + final Receiver link; + synchronized (this.errorCasesLock) { + link = this.receiveLink; + } + final boolean isLinkOpened = this.linkOpen != null && this.linkOpen.getWork().isDone(); - final String referenceId = this.receiveLink != null && this.receiveLink.getRemoteProperties() != null && this.receiveLink.getRemoteProperties().containsKey(ClientConstants.TRACKING_ID_PROPERTY) - ? this.receiveLink.getRemoteProperties().get(ClientConstants.TRACKING_ID_PROPERTY).toString() - : ((this.receiveLink != null) ? this.receiveLink.getName() : null); + final String referenceId = link != null && link.getRemoteProperties() != null && link.getRemoteProperties().containsKey(ClientConstants.TRACKING_ID_PROPERTY) + ? link.getRemoteProperties().get(ClientConstants.TRACKING_ID_PROPERTY).toString() + : ((link != null) ? link.getName() : null); - ReceiverContext errorContext = new ReceiverContext(this.underlyingFactory != null ? this.underlyingFactory.getHostName() : null, + final ReceiverContext errorContext = new ReceiverContext(this.underlyingFactory != null ? this.underlyingFactory.getHostName() : null, this.receivePath, referenceId, isLinkOpened ? this.prefetchCount : null, - isLinkOpened && this.receiveLink != null ? this.receiveLink.getCredit() : null, + isLinkOpened && link != null ? link.getCredit() : null, isLinkOpened && this.prefetchedMessages != null ? this.prefetchedMessages.size() : null); return errorContext; @@ -628,6 +645,13 @@ public void onEvent() { return this.linkClose; } + @Override + protected Exception getLastKnownError() { + synchronized (this.errorCasesLock) { + return this.lastKnownLinkError; + } + } + private final class ReceiveWork extends DispatchHandler { @Override diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageSender.java b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageSender.java index a28778351..ef658ebb7 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageSender.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageSender.java @@ -72,6 +72,7 @@ public class MessageSender extends ClientEntity implements IAmqpSender, IErrorCo private final DispatchHandler sendWork; private final ActiveClientTokenManager activeClientTokenManager; private final String tokenAudience; + private final Object errorConditionLock; private Sender sendLink; private CompletableFuture linkFirstOpen; @@ -117,6 +118,8 @@ private MessageSender(final MessagingFactory factory, final String sendLinkName, this.retryPolicy = factory.getRetryPolicy(); + this.errorConditionLock = new Object(); + this.pendingSendLock = new Object(); this.pendingSendsData = new ConcurrentHashMap<>(); this.pendingSends = new PriorityQueue<>(1000, new DeliveryTagComparator()); @@ -189,7 +192,7 @@ private CompletableFuture sendCore( final TimeoutTracker tracker, final Exception lastKnownError, final ScheduledFuture timeoutTask) { - this.throwIfClosed(this.lastKnownLinkError); + this.throwIfClosed(); final boolean isRetrySend = (onSend != null); @@ -381,8 +384,10 @@ public void onError(final Exception completionException) { return; } else { - this.lastKnownLinkError = completionException == null ? this.lastKnownLinkError : completionException; - this.lastKnownErrorReportedAt = Instant.now(); + synchronized (this.errorConditionLock) { + this.lastKnownLinkError = completionException == null ? this.lastKnownLinkError : completionException; + this.lastKnownErrorReportedAt = Instant.now(); + } final Exception finalCompletionException = completionException == null ? new ServiceBusException(true, "Client encountered transient error for unknown reasons, please retry the operation.") : completionException; @@ -534,7 +539,9 @@ public void accept(Session session) { MessageSender.this.underlyingFactory.registerForConnectionError(sender); sender.open(); - MessageSender.this.sendLink = sender; + synchronized (MessageSender.this.errorConditionLock) { + MessageSender.this.sendLink = sender; + } } }; @@ -542,7 +549,7 @@ public void accept(Session session) { @Override public void accept(ErrorCondition t, Exception u) { if (t != null) - MessageSender.this.onClose(t); + MessageSender.this.onError((t != null && t.getCondition() != null) ? ExceptionUtil.toException(t) : null); else if (u != null) MessageSender.this.onError(u); } @@ -584,9 +591,18 @@ private void initializeLinkOpen(TimeoutTracker timeout) { new Runnable() { public void run() { if (!MessageSender.this.linkFirstOpen.isDone()) { - Exception operationTimedout = new TimeoutException( - String.format(Locale.US, "Open operation on SendLink(%s) on Entity(%s) timed out at %s.", MessageSender.this.sendLink.getName(), MessageSender.this.getSendPath(), ZonedDateTime.now().toString()), - MessageSender.this.lastKnownErrorReportedAt.isAfter(Instant.now().minusSeconds(ClientConstants.SERVER_BUSY_BASE_SLEEP_TIME_IN_SECS)) ? MessageSender.this.lastKnownLinkError : null); + final Exception lastReportedError; + final Instant lastErrorReportedAt; + final Sender link; + synchronized (MessageSender.this.errorConditionLock) { + lastReportedError = MessageSender.this.lastKnownLinkError; + lastErrorReportedAt = MessageSender.this.lastKnownErrorReportedAt; + link = MessageSender.this.sendLink; + } + + final Exception operationTimedout = new TimeoutException( + String.format(Locale.US, "Open operation on SendLink(%s) on Entity(%s) timed out at %s.", link.getName(), MessageSender.this.getSendPath(), ZonedDateTime.now().toString()), + lastErrorReportedAt.isAfter(Instant.now().minusSeconds(ClientConstants.SERVER_BUSY_BASE_SLEEP_TIME_IN_SECS)) ? lastReportedError : null); if (TRACE_LOGGER.isLoggable(Level.WARNING)) { TRACE_LOGGER.log(Level.WARNING, @@ -604,16 +620,21 @@ public void run() { @Override public ErrorContext getContext() { + final Sender link; + synchronized (this.errorConditionLock) { + link = this.sendLink; + } + final boolean isLinkOpened = this.linkFirstOpen != null && this.linkFirstOpen.isDone(); - final String referenceId = this.sendLink != null && this.sendLink.getRemoteProperties() != null && this.sendLink.getRemoteProperties().containsKey(ClientConstants.TRACKING_ID_PROPERTY) - ? this.sendLink.getRemoteProperties().get(ClientConstants.TRACKING_ID_PROPERTY).toString() - : ((this.sendLink != null) ? this.sendLink.getName() : null); + final String referenceId = link != null && link.getRemoteProperties() != null && link.getRemoteProperties().containsKey(ClientConstants.TRACKING_ID_PROPERTY) + ? link.getRemoteProperties().get(ClientConstants.TRACKING_ID_PROPERTY).toString() + : ((link != null) ? link.getName() : null); - SenderContext errorContext = new SenderContext( + final SenderContext errorContext = new SenderContext( this.underlyingFactory != null ? this.underlyingFactory.getHostName() : null, this.sendPath, referenceId, - isLinkOpened && this.sendLink != null ? this.sendLink.getCredit() : null); + isLinkOpened && link != null ? link.getCredit() : null); return errorContext; } @@ -722,18 +743,28 @@ private void processSendWork() { } } - private void throwSenderTimeout(CompletableFuture pendingSendWork, Exception lastKnownException) { + private void throwSenderTimeout(final CompletableFuture pendingSendWork, final Exception lastKnownException) { + Exception cause = lastKnownException; - if (lastKnownException == null && this.lastKnownLinkError != null) { - boolean isServerBusy = ((this.lastKnownLinkError instanceof ServerBusyException) - && (this.lastKnownErrorReportedAt.isAfter(Instant.now().minusSeconds(ClientConstants.SERVER_BUSY_BASE_SLEEP_TIME_IN_SECS)))); - cause = isServerBusy || (this.lastKnownErrorReportedAt.isAfter(Instant.now().minusMillis(this.operationTimeout.toMillis()))) - ? this.lastKnownLinkError - : null; + if (lastKnownException == null) { + final Exception lastReportedLinkLevelError; + final Instant lastLinkErrorReportedAt; + synchronized (this.errorConditionLock) { + lastReportedLinkLevelError = this.lastKnownLinkError; + lastLinkErrorReportedAt = this.lastKnownErrorReportedAt; + } + + if (lastReportedLinkLevelError != null) { + boolean isServerBusy = ((lastReportedLinkLevelError instanceof ServerBusyException) + && (lastLinkErrorReportedAt.isAfter(Instant.now().minusSeconds(ClientConstants.SERVER_BUSY_BASE_SLEEP_TIME_IN_SECS)))); + cause = isServerBusy || (lastLinkErrorReportedAt.isAfter(Instant.now().minusMillis(this.operationTimeout.toMillis()))) + ? lastReportedLinkLevelError + : null; + } } - boolean isClientSideTimeout = (cause == null || !(cause instanceof ServiceBusException)); - ServiceBusException exception = isClientSideTimeout + final boolean isClientSideTimeout = (cause == null || !(cause instanceof ServiceBusException)); + final ServiceBusException exception = isClientSideTimeout ? new TimeoutException(String.format(Locale.US, "%s %s %s.", MessageSender.SEND_TIMED_OUT, " at ", ZonedDateTime.now(), cause)) : (ServiceBusException) cause; @@ -746,10 +777,15 @@ private void scheduleLinkCloseTimeout(final TimeoutTracker timeout) { new Runnable() { public void run() { if (!linkClose.isDone()) { - Exception operationTimedout = new TimeoutException(String.format(Locale.US, "%s operation on Receive Link(%s) timed out at %s", "Close", MessageSender.this.sendLink.getName(), ZonedDateTime.now())); + final Sender link; + synchronized (MessageSender.this.errorConditionLock) { + link = MessageSender.this.sendLink; + } + + final Exception operationTimedout = new TimeoutException(String.format(Locale.US, "%s operation on Receive Link(%s) timed out at %s", "Close", link.getName(), ZonedDateTime.now())); if (TRACE_LOGGER.isLoggable(Level.WARNING)) { TRACE_LOGGER.log(Level.WARNING, - String.format(Locale.US, "message recever(linkName: %s, path: %s) %s call timedout", MessageSender.this.sendLink.getName(), MessageSender.this.sendPath, "Close"), + String.format(Locale.US, "message recever(linkName: %s, path: %s) %s call timedout", link.getName(), MessageSender.this.sendPath, "Close"), operationTimedout); } @@ -790,6 +826,13 @@ public void onEvent() { return this.linkClose; } + @Override + protected Exception getLastKnownError() { + synchronized (this.errorConditionLock) { + return this.lastKnownLinkError; + } + } + private static class WeightedDeliveryTag { private final String deliveryTag; private final int priority; diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessagingFactory.java b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessagingFactory.java index a4d1d7b43..d4b7c4c5a 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessagingFactory.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessagingFactory.java @@ -11,6 +11,7 @@ import java.util.List; import java.util.Locale; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledFuture; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.logging.Level; @@ -61,11 +62,9 @@ public class MessagingFactory extends ClientEntity implements IAmqpConnection, I private Duration operationTimeout; private RetryPolicy retryPolicy; private CompletableFuture open; - private CompletableFuture openConnection; + private ScheduledFuture openTimer; + private ScheduledFuture closeTimer; - /** - * @param reactor parameter reactor is purely for testing purposes and the SDK code should always set it to null - */ MessagingFactory(final ConnectionStringBuilder builder, final RetryPolicy retryPolicy) { super("MessagingFactory".concat(StringUtil.getRandomString()), null); @@ -77,7 +76,6 @@ public class MessagingFactory extends ClientEntity implements IAmqpConnection, I this.registeredLinks = new LinkedList<>(); this.reactorLock = new Object(); this.connectionHandler = new ConnectionHandler(this); - this.openConnection = new CompletableFuture<>(); this.cbsChannelCreateLock = new Object(); this.tokenProvider = builder.getSharedAccessSignature() == null ? new SharedAccessSignatureTokenProvider(builder.getSasKeyName(), builder.getSasKey()) @@ -152,6 +150,7 @@ public Session getSession(final String path, final Consumer onRemoteSes if (this.getIsClosingOrClosed()) { onRemoteSessionOpenError.accept(null, new OperationCancelledException("underlying messagingFactory instance is closed")); + return null; } if (this.connection == null || this.connection.getLocalState() == EndpointState.CLOSED || this.connection.getRemoteState() == EndpointState.CLOSED) { @@ -180,7 +179,17 @@ public static CompletableFuture createFromConnectionString(fin public static CompletableFuture createFromConnectionString(final String connectionString, final RetryPolicy retryPolicy) throws IOException { final ConnectionStringBuilder builder = new ConnectionStringBuilder(connectionString); final MessagingFactory messagingFactory = new MessagingFactory(builder, (retryPolicy != null) ? retryPolicy : RetryPolicy.getDefault()); - + messagingFactory.openTimer = Timer.schedule(new Runnable() { + @Override + public void run() { + if (!messagingFactory.open.isDone()) { + messagingFactory.open.completeExceptionally(new TimeoutException("Opening MessagingFactory timed out.")); + messagingFactory.getReactor().stop(); + } + } + }, + messagingFactory.getOperationTimeout(), + TimerType.OneTimeRun); messagingFactory.createConnection(builder); return messagingFactory.open; } @@ -189,13 +198,16 @@ public static CompletableFuture createFromConnectionString(fin public void onOpenComplete(Exception exception) { if (exception == null) { this.open.complete(this); - this.openConnection.complete(this.connection); + + // if connection creation is in progress and then msgFactory.close call came thru if (this.getIsClosingOrClosed()) this.connection.close(); } else { this.open.completeExceptionally(exception); - this.openConnection.completeExceptionally(exception); } + + if (this.openTimer != null) + this.openTimer.cancel(false); } @Override @@ -204,21 +216,19 @@ public void onConnectionError(ErrorCondition error) { this.onOpenComplete(ExceptionUtil.toException(error)); } else { final Connection currentConnection = this.connection; - for (Link link : this.registeredLinks) { + final List registeredLinksCopy = new LinkedList<>(this.registeredLinks); + for (Link link : registeredLinksCopy) { if (link.getLocalState() != EndpointState.CLOSED && link.getRemoteState() != EndpointState.CLOSED) { link.close(); } } - this.openConnection = new CompletableFuture<>(); - + // if proton-j detects transport error - onConnectionError is invoked, but, the connection state is not set to closed + // in connection recreation we depend on currentConnection state to evaluate need for recreation if (currentConnection.getLocalState() != EndpointState.CLOSED && currentConnection.getRemoteState() != EndpointState.CLOSED) { currentConnection.close(); } - // Clone of the registeredLinks is needed here - // onClose of link will lead to un-register - which will result into iteratorCollectionModified error - final List registeredLinksCopy = new LinkedList<>(this.registeredLinks); for (Link link : registeredLinksCopy) { final Handler handler = BaseHandler.getHandler(link); if (handler != null && handler instanceof BaseLinkHandler) { @@ -273,6 +283,16 @@ private void onReactorError(Exception cause) { protected CompletableFuture onClose() { if (!this.getIsClosed()) { try { + this.closeTimer = Timer.schedule(new Runnable() { + @Override + public void run() { + if (!closeTask.isDone()) { + closeTask.completeExceptionally(new TimeoutException("Closing MessagingFactory timed out.")); + getReactor().stop(); + } + } + }, + operationTimeout, TimerType.OneTimeRun); this.scheduleOnReactorThread(new CloseWork()); } catch (IOException ioException) { this.closeTask.completeExceptionally(new ServiceBusException(false, "Failed to Close MessagingFactory, see cause for more details.", ioException)); @@ -320,18 +340,6 @@ public void onError(Exception error) { connection.close(); } } - - if (connection != null && connection.getRemoteState() != EndpointState.CLOSED) { - Timer.schedule(new Runnable() { - @Override - public void run() { - if (!closeTask.isDone()) { - closeTask.completeExceptionally(new TimeoutException("Closing MessagingFactory timed out.")); - } - } - }, - operationTimeout, TimerType.OneTimeRun); - } } } @@ -387,6 +395,9 @@ public void run() { if (getIsClosingOrClosed() && !closeTask.isDone()) { closeTask.complete(null); + + if (closeTimer != null) + closeTimer.cancel(false); } } }