From 0d14a390d38c8f7aee9c9565a5b7dcb8fb34d1a7 Mon Sep 17 00:00:00 2001 From: apraovjr Date: Wed, 12 Sep 2018 22:13:23 +0100 Subject: [PATCH] WebSocket implementation no proxy support (#275) * WebSocket implementation no proxy support (#275) --- azure-servicebus/pom.xml | 7 +- .../azure/servicebus/ClientFactory.java | 62 +- .../azure/servicebus/ClientSettings.java | 9 +- .../azure/servicebus/MessageReceiver.java | 3 +- .../azure/servicebus/MessageSender.java | 4 +- .../azure/servicebus/amqp/AmqpConstants.java | 1 + .../servicebus/amqp/ConnectionHandler.java | 18 +- .../servicebus/amqp/CustomIOHandler.java | 2 +- .../azure/servicebus/amqp/ProtonUtil.java | 8 +- .../amqp/WebSocketConnectionHandler.java | 53 + .../primitives/ClientConstants.java | 1 + .../primitives/ConnectionStringBuilder.java | 46 +- .../primitives/CoreMessageSender.java | 894 ++++++++-------- .../primitives/MessagingFactory.java | 92 +- .../primitives/RequestResponseLink.java | 994 +++++++++--------- .../primitives/RequestResponseLinkCache.java | 36 +- .../servicebus/primitives/TransportType.java | 46 + .../azure/servicebus/primitives/Util.java | 2 +- .../servicebus/ClientValidationTests.java | 88 +- .../azure/servicebus/SessionTests.java | 2 +- pom.xml | 16 +- 21 files changed, 1278 insertions(+), 1106 deletions(-) create mode 100644 azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/WebSocketConnectionHandler.java create mode 100644 azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/TransportType.java diff --git a/azure-servicebus/pom.xml b/azure-servicebus/pom.xml index 7327b47d9807f..f266086c15a0a 100644 --- a/azure-servicebus/pom.xml +++ b/azure-servicebus/pom.xml @@ -51,6 +51,11 @@ proton-j ${proton-j-version} + + com.microsoft.azure + qpid-proton-j-extensions + ${qpid-proton-j-extensions-version} + org.bouncycastle bcpkix-jdk15on @@ -92,5 +97,5 @@ async-http-client 2.5.2 - + diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/ClientFactory.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/ClientFactory.java index 032f3defaf959..8419ce68abc0c 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/ClientFactory.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/ClientFactory.java @@ -29,7 +29,7 @@ private ClientFactory() { * @return {@link IMessageSender} instance * @throws InterruptedException if the current thread was interrupted while waiting * @throws ServiceBusException if the sender cannot be created - */ + */ public static IMessageSender createMessageSenderFromConnectionString(String amqpConnectionString) throws InterruptedException, ServiceBusException { return Utils.completeFuture(createMessageSenderFromConnectionStringAsync(amqpConnectionString)); } @@ -48,11 +48,11 @@ public static IMessageSender createMessageSenderFromConnectionString(String amqp public static IMessageSender createMessageSenderFromConnectionStringBuilder(ConnectionStringBuilder amqpConnectionStringBuilder) throws InterruptedException, ServiceBusException { return Utils.completeFuture(createMessageSenderFromConnectionStringBuilderAsync(amqpConnectionStringBuilder)); } - + static IMessageSender createMessageSenderFromConnectionStringBuilder(ConnectionStringBuilder amqpConnectionStringBuilder, MessagingEntityType entityType) throws InterruptedException, ServiceBusException { return Utils.completeFuture(createMessageSenderFromConnectionStringBuilderAsync(amqpConnectionStringBuilder, entityType)); } - + /** * Creates a message sender to the entity using the client settings. * @param namespaceName namespace of entity @@ -65,7 +65,7 @@ static IMessageSender createMessageSenderFromConnectionStringBuilder(ConnectionS public static IMessageSender createMessageSenderFromEntityPath(String namespaceName, String entityPath, ClientSettings clientSettings) throws InterruptedException, ServiceBusException { return Utils.completeFuture(createMessageSenderFromEntityPathAsync(namespaceName, entityPath, clientSettings)); } - + /** * Creates a message sender to the entity using the client settings. * @param namespaceEndpointURI endpoint uri of entity namespace @@ -78,7 +78,7 @@ public static IMessageSender createMessageSenderFromEntityPath(String namespaceN public static IMessageSender createMessageSenderFromEntityPath(URI namespaceEndpointURI, String entityPath, ClientSettings clientSettings) throws InterruptedException, ServiceBusException { return Utils.completeFuture(createMessageSenderFromEntityPathAsync(namespaceEndpointURI, entityPath, clientSettings)); } - + static IMessageSender createMessageSenderFromEntityPath(URI namespaceEndpointURI, String entityPath, MessagingEntityType entityType, ClientSettings clientSettings) throws InterruptedException, ServiceBusException { return Utils.completeFuture(createMessageSenderFromEntityPathAsync(namespaceEndpointURI, entityPath, entityType, clientSettings)); } @@ -94,7 +94,7 @@ static IMessageSender createMessageSenderFromEntityPath(URI namespaceEndpointURI public static IMessageSender createMessageSenderFromEntityPath(MessagingFactory messagingFactory, String entityPath) throws InterruptedException, ServiceBusException { return Utils.completeFuture(createMessageSenderFromEntityPathAsync(messagingFactory, entityPath)); } - + static IMessageSender createMessageSenderFromEntityPath(MessagingFactory messagingFactory, String entityPath, MessagingEntityType entityType) throws InterruptedException, ServiceBusException { return Utils.completeFuture(createMessageSenderFromEntityPathAsync(messagingFactory, entityPath, entityType)); } @@ -139,12 +139,12 @@ public static CompletableFuture createMessageSenderFromConnectio Utils.assertNonNull("amqpConnectionStringBuilder", amqpConnectionStringBuilder); return createMessageSenderFromEntityPathAsync(amqpConnectionStringBuilder.getEndpoint(), amqpConnectionStringBuilder.getEntityPath(), Util.getClientSettingsFromConnectionStringBuilder(amqpConnectionStringBuilder)); } - + static CompletableFuture createMessageSenderFromConnectionStringBuilderAsync(ConnectionStringBuilder amqpConnectionStringBuilder, MessagingEntityType entityType) { Utils.assertNonNull("amqpConnectionStringBuilder", amqpConnectionStringBuilder); return createMessageSenderFromEntityPathAsync(amqpConnectionStringBuilder.getEndpoint(), amqpConnectionStringBuilder.getEntityPath(), entityType, Util.getClientSettingsFromConnectionStringBuilder(amqpConnectionStringBuilder)); } - + /** * Creates a message sender asynchronously to the entity using the client settings. * @param namespaceName namespace name of entity @@ -158,7 +158,7 @@ public static CompletableFuture createMessageSenderFromEntityPat Utils.assertNonNull("entityPath", entityPath); return createMessageSenderFromEntityPathAsync(Util.convertNamespaceToEndPointURI(namespaceName), entityPath, clientSettings); } - + /** * Creates a message sender asynchronously to the entity using the client settings. * @param namespaceEndpointURI endpoint uri of entity namespace @@ -170,7 +170,7 @@ public static CompletableFuture createMessageSenderFromEntityPat { return createMessageSenderFromEntityPathAsync(namespaceEndpointURI, entityPath, null, clientSettings); } - + static CompletableFuture createMessageSenderFromEntityPathAsync(URI namespaceEndpointURI, String entityPath, MessagingEntityType entityType, ClientSettings clientSettings) { Utils.assertNonNull("namespaceEndpointURI", namespaceEndpointURI); @@ -187,7 +187,7 @@ static CompletableFuture createMessageSenderFromEntityPathAsync( public static CompletableFuture createMessageSenderFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath) { return createMessageSenderFromEntityPathAsync(messagingFactory, entityPath, null); } - + static CompletableFuture createMessageSenderFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath, MessagingEntityType entityType) { Utils.assertNonNull("messagingFactory", messagingFactory); MessageSender sender = new MessageSender(messagingFactory, entityPath, entityType); @@ -268,7 +268,7 @@ public static IMessageReceiver createMessageReceiverFromConnectionStringBuilder( public static IMessageReceiver createMessageReceiverFromConnectionStringBuilder(ConnectionStringBuilder amqpConnectionStringBuilder, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException { return Utils.completeFuture(createMessageReceiverFromConnectionStringBuilderAsync(amqpConnectionStringBuilder, receiveMode)); } - + /** * Creates a message receiver to the entity using the client settings in PeekLock mode * @param namespaceName namespace of entity @@ -281,7 +281,7 @@ public static IMessageReceiver createMessageReceiverFromConnectionStringBuilder( public static IMessageReceiver createMessageReceiverFromEntityPath(String namespaceName, String entityPath, ClientSettings clientSettings) throws InterruptedException, ServiceBusException { return Utils.completeFuture(createMessageReceiverFromEntityPathAsync(namespaceName, entityPath, clientSettings)); } - + /** * Creates a message receiver to the entity using the client settings. * @param namespaceName namespace of entity @@ -295,7 +295,7 @@ public static IMessageReceiver createMessageReceiverFromEntityPath(String namesp public static IMessageReceiver createMessageReceiverFromEntityPath(String namespaceName, String entityPath, ClientSettings clientSettings, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException { return Utils.completeFuture(createMessageReceiverFromEntityPathAsync(namespaceName, entityPath, clientSettings, receiveMode)); } - + /** * Creates a message receiver to the entity using the client settings in PeekLock mode * @param namespaceEndpointURI endpoint uri of entity namespace @@ -308,7 +308,7 @@ public static IMessageReceiver createMessageReceiverFromEntityPath(String namesp public static IMessageReceiver createMessageReceiverFromEntityPath(URI namespaceEndpointURI, String entityPath, ClientSettings clientSettings) throws InterruptedException, ServiceBusException { return Utils.completeFuture(createMessageReceiverFromEntityPathAsync(namespaceEndpointURI, entityPath, clientSettings)); } - + /** * Creates a message receiver to the entity using the client settings. * @param namespaceEndpointURI endpoint uri of entity namespace @@ -347,7 +347,7 @@ public static IMessageReceiver createMessageReceiverFromEntityPath(MessagingFact public static IMessageReceiver createMessageReceiverFromEntityPath(MessagingFactory messagingFactory, String entityPath, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException { return Utils.completeFuture(createMessageReceiverFromEntityPathAsync(messagingFactory, entityPath, receiveMode)); } - + static IMessageReceiver createMessageReceiverFromEntityPath(MessagingFactory messagingFactory, String entityPath, MessagingEntityType entityType, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException { return Utils.completeFuture(createMessageReceiverFromEntityPathAsync(messagingFactory, entityPath, entityType, receiveMode)); } @@ -395,7 +395,7 @@ public static CompletableFuture createMessageReceiverFromConne Utils.assertNonNull("amqpConnectionStringBuilder", amqpConnectionStringBuilder); return createMessageReceiverFromEntityPathAsync(amqpConnectionStringBuilder.getEndpoint(), amqpConnectionStringBuilder.getEntityPath(), Util.getClientSettingsFromConnectionStringBuilder(amqpConnectionStringBuilder), receiveMode); } - + /** * Asynchronously creates a message receiver to the entity using the client settings in PeekLock mode * @param namespaceName namespace of entity @@ -406,7 +406,7 @@ public static CompletableFuture createMessageReceiverFromConne public static CompletableFuture createMessageReceiverFromEntityPathAsync(String namespaceName, String entityPath, ClientSettings clientSettings) { return createMessageReceiverFromEntityPathAsync(namespaceName, entityPath, clientSettings, DEFAULTRECEIVEMODE); } - + /** * Asynchronously creates a message receiver to the entity using the client settings * @param namespaceName namespace of entity @@ -419,7 +419,7 @@ public static CompletableFuture createMessageReceiverFromEntit Utils.assertNonNull("namespaceName", namespaceName); return createMessageReceiverFromEntityPathAsync(Util.convertNamespaceToEndPointURI(namespaceName),entityPath, clientSettings, receiveMode); } - + /** * Asynchronously creates a message receiver to the entity using the client settings in PeekLock mode * @param namespaceEndpointURI endpoint uri of entity namespace @@ -430,7 +430,7 @@ public static CompletableFuture createMessageReceiverFromEntit public static CompletableFuture createMessageReceiverFromEntityPathAsync(URI namespaceEndpointURI, String entityPath, ClientSettings clientSettings) { return createMessageReceiverFromEntityPathAsync(namespaceEndpointURI, entityPath, clientSettings, DEFAULTRECEIVEMODE); } - + /** * Asynchronously creates a message receiver to the entity using the client settings * @param namespaceEndpointURI endpoint uri of entity namespace @@ -466,7 +466,7 @@ public static CompletableFuture createMessageReceiverFromEntit public static CompletableFuture createMessageReceiverFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath, ReceiveMode receiveMode) { return createMessageReceiverFromEntityPathAsync(messagingFactory, entityPath, null, receiveMode); } - + static CompletableFuture createMessageReceiverFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath, MessagingEntityType entityType, ReceiveMode receiveMode) { Utils.assertNonNull("messagingFactory", messagingFactory); MessageReceiver receiver = new MessageReceiver(messagingFactory, entityPath, entityType, receiveMode); @@ -526,7 +526,7 @@ public static IMessageSession acceptSessionFromConnectionStringBuilder(Connectio public static IMessageSession acceptSessionFromConnectionStringBuilder(ConnectionStringBuilder amqpConnectionStringBuilder, String sessionId, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException { return Utils.completeFuture(acceptSessionFromConnectionStringBuilderAsync(amqpConnectionStringBuilder, sessionId, receiveMode)); } - + /** * Accept a {@link IMessageSession} from service bus using the client settings with specified session id in PeekLock mode. Session Id can be null, if null, service will return the first available session. * @param namespaceName namespace of entity @@ -540,7 +540,7 @@ public static IMessageSession acceptSessionFromConnectionStringBuilder(Connectio public static IMessageSession acceptSessionFromEntityPath(String namespaceName, String entityPath, String sessionId, ClientSettings clientSettings) throws InterruptedException, ServiceBusException { return Utils.completeFuture(acceptSessionFromEntityPathAsync(namespaceName, entityPath, sessionId, clientSettings)); } - + /** * Accept a {@link IMessageSession} from service bus using the client settings with specified session id. Session Id can be null, if null, service will return the first available session. * @param namespaceName namespace of entity @@ -555,7 +555,7 @@ public static IMessageSession acceptSessionFromEntityPath(String namespaceName, public static IMessageSession acceptSessionFromEntityPath(String namespaceName, String entityPath, String sessionId, ClientSettings clientSettings, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException { return Utils.completeFuture(acceptSessionFromEntityPathAsync(namespaceName, entityPath, sessionId, clientSettings, receiveMode)); } - + /** * Accept a {@link IMessageSession} from service bus using the client settings with specified session id in PeekLock mode. Session Id can be null, if null, service will return the first available session. * @param namespaceEndpointURI endpoint uri of entity namespace @@ -569,7 +569,7 @@ public static IMessageSession acceptSessionFromEntityPath(String namespaceName, public static IMessageSession acceptSessionFromEntityPath(URI namespaceEndpointURI, String entityPath, String sessionId, ClientSettings clientSettings) throws InterruptedException, ServiceBusException { return Utils.completeFuture(acceptSessionFromEntityPathAsync(namespaceEndpointURI, entityPath, sessionId, clientSettings)); } - + /** * Accept a {@link IMessageSession} from service bus using the client settings with specified session id. Session Id can be null, if null, service will return the first available session. * @param namespaceEndpointURI endpoint uri of entity namespace @@ -659,7 +659,7 @@ public static CompletableFuture acceptSessionFromConnectionStri Utils.assertNonNull("amqpConnectionStringBuilder", amqpConnectionStringBuilder); return acceptSessionFromEntityPathAsync(amqpConnectionStringBuilder.getEndpoint(), amqpConnectionStringBuilder.getEntityPath(), sessionId, Util.getClientSettingsFromConnectionStringBuilder(amqpConnectionStringBuilder), receiveMode); } - + /** * Asynchronously accepts a session in PeekLock mode from service bus using the client settings. Session Id can be null, if null, service will return the first available session. * @param namespaceName namespace of entity @@ -671,7 +671,7 @@ public static CompletableFuture acceptSessionFromConnectionStri public static CompletableFuture acceptSessionFromEntityPathAsync(String namespaceName, String entityPath, String sessionId, ClientSettings clientSettings) { return acceptSessionFromEntityPathAsync(namespaceName, entityPath, sessionId, clientSettings, DEFAULTRECEIVEMODE); } - + /** * Asynchronously accepts a session from service bus using the client settings. Session Id can be null, if null, service will return the first available session. * @param namespaceName namespace of entity @@ -685,7 +685,7 @@ public static CompletableFuture acceptSessionFromEntityPathAsyn Utils.assertNonNull("namespaceName", namespaceName); return acceptSessionFromEntityPathAsync(Util.convertNamespaceToEndPointURI(namespaceName),entityPath, sessionId, clientSettings, receiveMode); } - + /** * Asynchronously accepts a session in PeekLock mode from service bus using the client settings. Session Id can be null, if null, service will return the first available session. * @param namespaceEndpointURI endpoint uri of entity namespace @@ -697,7 +697,7 @@ public static CompletableFuture acceptSessionFromEntityPathAsyn public static CompletableFuture acceptSessionFromEntityPathAsync(URI namespaceEndpointURI, String entityPath, String sessionId, ClientSettings clientSettings) { return acceptSessionFromEntityPathAsync(namespaceEndpointURI, entityPath, sessionId, clientSettings, DEFAULTRECEIVEMODE); } - + /** * Asynchronously accepts a session from service bus using the client settings. Session Id can be null, if null, service will return the first available session. * @param namespaceEndpointURI endpoint uri of entity namespace @@ -736,10 +736,10 @@ public static CompletableFuture acceptSessionFromEntityPathAsyn public static CompletableFuture acceptSessionFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath, String sessionId, ReceiveMode receiveMode) { return acceptSessionFromEntityPathAsync(messagingFactory, entityPath, null, sessionId, receiveMode); } - + static CompletableFuture acceptSessionFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath, MessagingEntityType entityType, String sessionId, ReceiveMode receiveMode) { Utils.assertNonNull("messagingFactory", messagingFactory); MessageSession session = new MessageSession(messagingFactory, entityPath, entityType, sessionId, receiveMode); return session.initializeAsync().thenApply((v) -> session); } -} +} \ No newline at end of file diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/ClientSettings.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/ClientSettings.java index f570b9a0024f7..5cc040acff275 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/ClientSettings.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/ClientSettings.java @@ -4,6 +4,7 @@ import com.microsoft.azure.servicebus.primitives.ClientConstants; import com.microsoft.azure.servicebus.primitives.RetryPolicy; +import com.microsoft.azure.servicebus.primitives.TransportType; import com.microsoft.azure.servicebus.security.TokenProvider; /** @@ -16,6 +17,7 @@ public class ClientSettings { private TokenProvider tokenProvider; private RetryPolicy retryPolicy; private Duration operationTimeout; + private TransportType transportType; /** * Creates a new instance with the given token provider, default retry policy and default operation timeout. @@ -25,7 +27,7 @@ public class ClientSettings { */ public ClientSettings(TokenProvider tokenProvider) { - this(tokenProvider, RetryPolicy.getDefault(), Duration.ofSeconds(ClientConstants.DEFAULT_OPERATION_TIMEOUT_IN_SECONDS)); + this(tokenProvider, RetryPolicy.getDefault(), Duration.ofSeconds(ClientConstants.DEFAULT_OPERATION_TIMEOUT_IN_SECONDS), TransportType.AMQP); } /** @@ -34,11 +36,12 @@ public ClientSettings(TokenProvider tokenProvider) * @param retryPolicy {@link RetryPolicy} instance * @param operationTimeout default operation timeout to be used for all client operations. Client can override this value by explicitly specifying a timeout in the operation. */ - public ClientSettings(TokenProvider tokenProvider, RetryPolicy retryPolicy, Duration operationTimeout) + public ClientSettings(TokenProvider tokenProvider, RetryPolicy retryPolicy, Duration operationTimeout, TransportType transportType) { this.tokenProvider = tokenProvider; this.retryPolicy = retryPolicy; this.operationTimeout = operationTimeout; + this.transportType = transportType; } /** @@ -67,4 +70,6 @@ public Duration getOperationTimeout() { return operationTimeout; } + + public TransportType getTransportType() { return transportType; } } diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java index 4511dd7cc4c2a..8be364c0c7db6 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java @@ -493,8 +493,7 @@ else if (c.isEmpty()) return null; else return convertAmqpMessagesWithDeliveryTagsToBrokeredMessages(c); - }); - + }); } @Override diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageSender.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageSender.java index 23af02a82e711..9f978f4af5a1a 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageSender.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageSender.java @@ -29,7 +29,7 @@ final class MessageSender extends InitializableEntity implements IMessageSender private CoreMessageSender internalSender = null; private boolean isInitialized = false; private URI namespaceEndpointURI; - private ClientSettings clientSettings; + private ClientSettings clientSettings; private MessageSender() { super(StringUtil.getShortRandomString()); @@ -229,4 +229,4 @@ public void cancelScheduledMessage(long sequenceNumber) throws InterruptedExcept MessagingFactory getMessagingFactory() { return this.messagingFactory; } -} +} \ No newline at end of file diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpConstants.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpConstants.java index c0001a54070f5..3b3a40122405d 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpConstants.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpConstants.java @@ -19,6 +19,7 @@ private AmqpConstants() { } public static final int AMQP_BATCH_MESSAGE_FORMAT = 0x80013700; // 2147563264L; public static final int MAX_FRAME_SIZE = 65536; + public static final int WEBSOCKET_MAX_FRAME_SIZE = 4096; public static final String MANAGEMENT_NODE_ADDRESS_SEGMENT = "$management"; public static final String CBS_NODE_ADDRESS_SEGMENT = "$cbs"; diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java index 212f51ff66de3..e3eaf9fb224d5 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java @@ -17,6 +17,7 @@ import org.apache.qpid.proton.engine.Sasl; import org.apache.qpid.proton.engine.SslDomain; import org.apache.qpid.proton.engine.Transport; +import org.apache.qpid.proton.engine.impl.TransportInternal; import org.apache.qpid.proton.reactor.Handshaker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,7 +27,7 @@ // ServiceBus <-> ProtonReactor interaction handles all // amqp_connection/transport related events from reactor -public final class ConnectionHandler extends BaseHandler +public class ConnectionHandler extends BaseHandler { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ConnectionHandler.class); private final IAmqpConnection messagingFactory; @@ -55,12 +56,27 @@ public void onConnectionInit(Event event) connection.open(); } + public void addTransportLayers(final Event event, final TransportInternal transport) + { + } + public int getPort() + { + return ClientConstants.AMQPS_PORT; + } + public int getMaxFrameSize() + { + + return AmqpConstants.MAX_FRAME_SIZE; + } + @Override public void onConnectionBound(Event event) { TRACE_LOGGER.debug("onConnectionBound: hostname:{}", event.getConnection().getHostname()); Transport transport = event.getTransport(); + this.addTransportLayers(event, (TransportInternal) transport); + SslDomain domain = makeDomain(SslDomain.Mode.CLIENT); transport.ssl(domain); diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/CustomIOHandler.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/CustomIOHandler.java index 7fcfa49648ef5..dc390c259106f 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/CustomIOHandler.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/CustomIOHandler.java @@ -19,7 +19,7 @@ public void onConnectionLocalOpen(Event event) } Transport transport = Proton.transport(); - transport.setMaxFrameSize(AmqpConstants.MAX_FRAME_SIZE); + transport.setMaxFrameSize(AmqpConstants.WEBSOCKET_MAX_FRAME_SIZE); transport.sasl(); transport.setEmitFlowEventOnSend(false); transport.bind(connection); diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ProtonUtil.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ProtonUtil.java index 547c660f7c8dd..a757062da6870 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ProtonUtil.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ProtonUtil.java @@ -7,6 +7,7 @@ import java.io.IOException; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.reactor.Reactor; +import org.apache.qpid.proton.reactor.ReactorOptions; public final class ProtonUtil { @@ -14,9 +15,12 @@ private ProtonUtil() { } - public static Reactor reactor(ReactorHandler reactorHandler) throws IOException + public static Reactor reactor(ReactorHandler reactorHandler, final int maxFrameSize) throws IOException { - Reactor reactor = Proton.reactor(reactorHandler); + final ReactorOptions reactorOptions = new ReactorOptions(); + reactorOptions.setMaxFrameSize(maxFrameSize); + + Reactor reactor = Proton.reactor(reactorOptions, reactorHandler); reactor.setGlobalHandler(new CustomIOHandler()); reactor.getGlobalHandler().add(new LoggingHandler()); return reactor; diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/WebSocketConnectionHandler.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/WebSocketConnectionHandler.java new file mode 100644 index 0000000000000..85c42d62fd4e6 --- /dev/null +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/WebSocketConnectionHandler.java @@ -0,0 +1,53 @@ +package com.microsoft.azure.servicebus.amqp; + +import com.microsoft.azure.proton.transport.ws.impl.WebSocketImpl; +import com.microsoft.azure.servicebus.primitives.ClientConstants; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.impl.TransportInternal; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WebSocketConnectionHandler extends ConnectionHandler { + + private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ConnectionHandler.class); + + public WebSocketConnectionHandler(IAmqpConnection messagingFactory) + { + super(messagingFactory); + } + + @Override + public void addTransportLayers(final Event event, final TransportInternal transport) + { + final WebSocketImpl webSocket = new WebSocketImpl(); + webSocket.configure( + event.getConnection().getHostname(), + "/$servicebus/websocket", + null, + 0, + "AMQPWSB10", + null, + null); + + transport.addTransportLayer(webSocket); + + if (TRACE_LOGGER.isInfoEnabled()) + { + TRACE_LOGGER.info("addWebsocketHandshake: hostname[" + event.getConnection().getHostname() +"]"); + } + } + + @Override + public int getPort() + { + return ClientConstants.HTTPS_PORT; + } + + @Override + public int getMaxFrameSize() + { + // This is the current limitation of https://github.com/Azure/qpid-proton-j-extensions + // once, this library enables larger frames - this property can be removed. + return 4 * 1024; + } +} diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java index 52893410ec556..f1dac9d9dfa38 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java @@ -42,6 +42,7 @@ private ClientConstants() { } public static final UUID ZEROLOCKTOKEN = new UUID(0l, 0l); public final static int AMQPS_PORT = 5671; + public final static int HTTPS_PORT = 443; public final static int MAX_PARTITION_KEY_LENGTH = 128; public final static Symbol SERVER_BUSY_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":server-busy"); diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ConnectionStringBuilder.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ConnectionStringBuilder.java index d8326f86c4660..0570e11c4d99a 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ConnectionStringBuilder.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ConnectionStringBuilder.java @@ -48,6 +48,7 @@ public class ConnectionStringBuilder private final static String SHARED_ACCESS_KEY_CONFIG_NAME = "SharedAccessKey"; private final static String ALTERNATE_SHARED_ACCESS_SIGNATURE_TOKEN_CONFIG_NAME = "SharedAccessSignature"; private final static String SHARED_ACCESS_SIGNATURE_TOKEN_CONFIG_NAME = "SharedAccessSignatureToken"; + private final static String TRANSPORT_TYPE_CONFIG_NAME = "TransportType"; private final static String ENTITY_PATH_CONFIG_NAME = "EntityPath"; private final static String OPERATION_TIMEOUT_CONFIG_NAME = "OperationTimeout"; private final static String RETRY_POLICY_CONFIG_NAME = "RetryPolicy"; @@ -56,7 +57,7 @@ public class ConnectionStringBuilder private static final String ALL_KEY_ENUMERATE_REGEX = "(" + HOSTNAME_CONFIG_NAME + "|" + ENDPOINT_CONFIG_NAME + "|" + SHARED_ACCESS_KEY_NAME_CONFIG_NAME + "|" + SHARED_ACCESS_KEY_CONFIG_NAME + "|" + SHARED_ACCESS_SIGNATURE_TOKEN_CONFIG_NAME + "|" + ENTITY_PATH_CONFIG_NAME + "|" + OPERATION_TIMEOUT_CONFIG_NAME - + "|" + RETRY_POLICY_CONFIG_NAME + "|" + ALTERNATE_SHARED_ACCESS_SIGNATURE_TOKEN_CONFIG_NAME + ")"; + + "|" + RETRY_POLICY_CONFIG_NAME + "|" + ALTERNATE_SHARED_ACCESS_SIGNATURE_TOKEN_CONFIG_NAME + "|" + TRANSPORT_TYPE_CONFIG_NAME + "|" +")"; private static final String KEYS_WITH_DELIMITERS_REGEX = KEY_VALUE_PAIR_DELIMITER + ALL_KEY_ENUMERATE_REGEX + KEY_VALUE_SEPARATOR; @@ -69,6 +70,7 @@ public class ConnectionStringBuilder private String entityPath; private Duration operationTimeout; private RetryPolicy retryPolicy; + private TransportType transportType; /** * Default operation timeout if timeout is not specified in the connection string. 30 seconds. @@ -304,6 +306,30 @@ public void setRetryPolicy(final RetryPolicy retryPolicy) this.retryPolicy = retryPolicy; } + + /** + * TransportType on which all the communication for the Service Bus created using this ConnectionString. + * Default value is {@link TransportType#AMQP}. + * + * @return transportType + */ + public TransportType getTransportType() + { + return (this.transportType == null ? TransportType.AMQP : transportType); + } + + /** + * Set the TransportType value in the Connection String. If no TransportType is set, this defaults to {@link TransportType#AMQP}. + * + * @param transportType Transport Type + * @return the {@link ConnectionStringBuilder} instance being set. + */ + public ConnectionStringBuilder setTransportType(final TransportType transportType) + { + this.transportType = transportType; + return this; + } + /** * Returns an inter-operable connection string that can be used to connect to ServiceBus Namespace * @return connection string @@ -356,6 +382,12 @@ public String toString() KEY_VALUE_SEPARATOR, this.retryPolicy.toString())); } + if (this.transportType != null) + { + connectionStringBuilder.append(String.format(Locale.US, "%s%s%s%s", KEY_VALUE_PAIR_DELIMITER, TRANSPORT_TYPE_CONFIG_NAME, + KEY_VALUE_SEPARATOR, this.transportType.toString())); + } + this.connectionString = connectionStringBuilder.toString(); } @@ -485,6 +517,18 @@ else if (key.equalsIgnoreCase(RETRY_POLICY_CONFIG_NAME)) String.format(Locale.US, "Connection string parameter '%s'='%s' is not recognized", RETRY_POLICY_CONFIG_NAME, values[valueIndex])); } + else if (key.equalsIgnoreCase(TRANSPORT_TYPE_CONFIG_NAME)) + { + try + { + this.transportType = TransportType.fromString(values[valueIndex]); + } catch (IllegalArgumentException exception) + { + throw new IllegalConnectionStringFormatException( + String.format("Invalid value specified for property '%s' in the ConnectionString.", TRANSPORT_TYPE_CONFIG_NAME), + exception); + } + } else { throw new IllegalConnectionStringFormatException( diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java index 2900d887ee472..6fa6c5228173e 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java @@ -76,7 +76,7 @@ public class CoreMessageSender extends ClientEntity implements IAmqpSender, IErr private Sender sendLink; private RequestResponseLink requestResponseLink; - private CompletableFuture linkFirstOpen; + private CompletableFuture linkFirstOpen; private int linkCredit; private Exception lastKnownLinkError; private Instant lastKnownErrorReportedAt; @@ -98,10 +98,10 @@ public static CompletableFuture create( { return CoreMessageSender.create(factory, clientId, senderPath, transferDestinationPath, null); } - + public static CompletableFuture create( final MessagingFactory factory, - final String clientId, + final String clientId, final String senderPath, final String transferDestinationPath, final MessagingEntityType entityType) @@ -115,7 +115,7 @@ static CompletableFuture create( final MessagingEntityType entityType, final SenderLinkSettings linkSettings) { - TRACE_LOGGER.info("Creating core message sender to '{}'", linkSettings.linkPath); + TRACE_LOGGER.info("Creating core message sender to '{}'", linkSettings.linkPath); final Connection connection = factory.getConnection(); final String sendLinkNamePrefix = "Sender".concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(StringUtil.getShortRandomString()); @@ -123,7 +123,7 @@ static CompletableFuture create( sendLinkNamePrefix.concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(connection.getRemoteContainer()) : sendLinkNamePrefix; - final CoreMessageSender msgSender = new CoreMessageSender(factory, clientId, entityType, linkSettings); + final CoreMessageSender msgSender = new CoreMessageSender(factory, clientId, entityType, linkSettings); TimeoutTracker openLinkTracker = TimeoutTracker.create(factory.getOperationTimeout()); msgSender.initializeLinkOpen(openLinkTracker); @@ -155,55 +155,55 @@ public void onEvent() { return null; }); - + return msgSender.linkFirstOpen; } - + private CompletableFuture createRequestResponseLink() - { + { synchronized (this.requestResonseLinkCreationLock) { - if(this.requestResponseLinkCreationFuture == null) - { - this.requestResponseLinkCreationFuture = new CompletableFuture(); - this.underlyingFactory.obtainRequestResponseLinkAsync(this.sendPath, this.transferDestinationPath, this.entityType).handleAsync((rrlink, ex) -> - { - if(ex == null) - { - this.requestResponseLink = rrlink; - this.requestResponseLinkCreationFuture.complete(null); - } - else - { - Throwable cause = ExceptionUtil.extractAsyncCompletionCause(ex); - this.requestResponseLinkCreationFuture.completeExceptionally(cause); - // Set it to null so next call will retry rr link creation - synchronized (this.requestResonseLinkCreationLock) - { - this.requestResponseLinkCreationFuture = null; - } - } - return null; - }); - } - - return this.requestResponseLinkCreationFuture; - } + if(this.requestResponseLinkCreationFuture == null) + { + this.requestResponseLinkCreationFuture = new CompletableFuture(); + this.underlyingFactory.obtainRequestResponseLinkAsync(this.sendPath, this.transferDestinationPath, this.entityType).handleAsync((rrlink, ex) -> + { + if(ex == null) + { + this.requestResponseLink = rrlink; + this.requestResponseLinkCreationFuture.complete(null); + } + else + { + Throwable cause = ExceptionUtil.extractAsyncCompletionCause(ex); + this.requestResponseLinkCreationFuture.completeExceptionally(cause); + // Set it to null so next call will retry rr link creation + synchronized (this.requestResonseLinkCreationLock) + { + this.requestResponseLinkCreationFuture = null; + } + } + return null; + }); + } + + return this.requestResponseLinkCreationFuture; + } } - + private void closeRequestResponseLink() - { - synchronized (this.requestResonseLinkCreationLock) - { - if(this.requestResponseLinkCreationFuture != null) - { - this.requestResponseLinkCreationFuture.thenRun(() -> { - this.underlyingFactory.releaseRequestResponseLink(this.sendPath, this.transferDestinationPath); - this.requestResponseLink = null; - }); - this.requestResponseLinkCreationFuture = null; - } - } - } + { + synchronized (this.requestResonseLinkCreationLock) + { + if(this.requestResponseLinkCreationFuture != null) + { + this.requestResponseLinkCreationFuture.thenRun(() -> { + this.underlyingFactory.releaseRequestResponseLink(this.sendPath, this.transferDestinationPath); + this.requestResponseLink = null; + }); + this.requestResponseLinkCreationFuture = null; + } + } + } private CoreMessageSender(final MessagingFactory factory, final String sendLinkName, final MessagingEntityType entityType, final SenderLinkSettings linkSettings) { @@ -221,20 +221,20 @@ private CoreMessageSender(final MessagingFactory factory, final String sendLinkN this.transferSasTokenAudienceURI = String.format(ClientConstants.SAS_TOKEN_AUDIENCE_FORMAT, factory.getHostName(), transferDestinationPath); } else - { - // Ensure it is null. - this.transferDestinationPath = null; - } + { + // Ensure it is null. + this.transferDestinationPath = null; + } } this.sasTokenAudienceURI = String.format(ClientConstants.SAS_TOKEN_AUDIENCE_FORMAT, factory.getHostName(), linkSettings.linkPath); this.underlyingFactory = factory; this.operationTimeout = factory.getOperationTimeout(); this.linkSettings = linkSettings; - + this.lastKnownLinkError = null; this.lastKnownErrorReportedAt = Instant.EPOCH; - + this.retryPolicy = factory.getRetryPolicy(); this.pendingSendLock = new Object(); @@ -246,9 +246,9 @@ private CoreMessageSender(final MessagingFactory factory, final String sendLinkN this.sendLinkReopenFuture = null; this.isSendLoopRunning = false; this.sendWork = new DispatchHandler() - { + { @Override - public void onEvent() + public void onEvent() { CoreMessageSender.this.processSendWork(); } @@ -259,19 +259,19 @@ public String getSendPath() { return this.sendPath; } - + private static String generateRandomDeliveryTag() { - return UUID.randomUUID().toString().replace("-", StringUtil.EMPTY); + return UUID.randomUUID().toString().replace("-", StringUtil.EMPTY); } - + CompletableFuture sendCoreAsync( final byte[] bytes, final int arrayOffset, final int messageFormat, final TransactionContext transaction) { - this.throwIfClosed(this.lastKnownLinkError); + this.throwIfClosed(this.lastKnownLinkError); TRACE_LOGGER.debug("Sending message to '{}'", this.sendPath); String deliveryTag = CoreMessageSender.generateRandomDeliveryTag(); CompletableFuture onSendFuture = new CompletableFuture(); @@ -280,68 +280,68 @@ CompletableFuture sendCoreAsync( this.scheduleSendTimeout(sendWorkItem); return onSendFuture; } - + private void scheduleSendTimeout(SendWorkItem sendWorkItem) { - // Timer to timeout the request - ScheduledFuture timeoutTask = Timer.schedule(new Runnable() - { - @Override - public void run() - { - if (!sendWorkItem.getWork().isDone()) - { - TRACE_LOGGER.warn("Delivery '{}' to '{}' did not receive ack from service. Throwing timeout.", sendWorkItem.getDeliveryTag(), CoreMessageSender.this.sendPath); - CoreMessageSender.this.pendingSendsData.remove(sendWorkItem.getDeliveryTag()); - CoreMessageSender.this.throwSenderTimeout(sendWorkItem.getWork(), sendWorkItem.getLastKnownException()); - // Weighted delivery tag not removed from the pending sends queue, but send loop will ignore it anyway if it is present - } - } - }, - sendWorkItem.getTimeoutTracker().remaining(), - TimerType.OneTimeRun); - sendWorkItem.setTimeoutTask(timeoutTask); + // Timer to timeout the request + ScheduledFuture timeoutTask = Timer.schedule(new Runnable() + { + @Override + public void run() + { + if (!sendWorkItem.getWork().isDone()) + { + TRACE_LOGGER.warn("Delivery '{}' to '{}' did not receive ack from service. Throwing timeout.", sendWorkItem.getDeliveryTag(), CoreMessageSender.this.sendPath); + CoreMessageSender.this.pendingSendsData.remove(sendWorkItem.getDeliveryTag()); + CoreMessageSender.this.throwSenderTimeout(sendWorkItem.getWork(), sendWorkItem.getLastKnownException()); + // Weighted delivery tag not removed from the pending sends queue, but send loop will ignore it anyway if it is present + } + } + }, + sendWorkItem.getTimeoutTracker().remaining(), + TimerType.OneTimeRun); + sendWorkItem.setTimeoutTask(timeoutTask); } - + private void enlistSendRequest(String deliveryTag, SendWorkItem sendWorkItem, boolean isRetrySend) { - synchronized (this.pendingSendLock) - { - this.pendingSendsData.put(deliveryTag, sendWorkItem); - this.pendingSends.offer(new WeightedDeliveryTag(deliveryTag, isRetrySend ? 1 : 0)); - - if(!this.isSendLoopRunning) - { - try - { - this.underlyingFactory.scheduleOnReactorThread(this.sendWork); - } - catch (IOException ioException) - { - AsyncUtil.completeFutureExceptionally(sendWorkItem.getWork(), new ServiceBusException(false, "Send failed while dispatching to Reactor, see cause for more details.", ioException)); - } - } - } + synchronized (this.pendingSendLock) + { + this.pendingSendsData.put(deliveryTag, sendWorkItem); + this.pendingSends.offer(new WeightedDeliveryTag(deliveryTag, isRetrySend ? 1 : 0)); + + if(!this.isSendLoopRunning) + { + try + { + this.underlyingFactory.scheduleOnReactorThread(this.sendWork); + } + catch (IOException ioException) + { + AsyncUtil.completeFutureExceptionally(sendWorkItem.getWork(), new ServiceBusException(false, "Send failed while dispatching to Reactor, see cause for more details.", ioException)); + } + } + } } - + private void reSendAsync(String deliveryTag, SendWorkItem retryingSendWorkItem, boolean reuseDeliveryTag) - { - if(!retryingSendWorkItem.getWork().isDone() && retryingSendWorkItem.cancelTimeoutTask(false)) - { - Duration remainingTime = retryingSendWorkItem.getTimeoutTracker().remaining(); - if(!remainingTime.isNegative() && !remainingTime.isZero()) - { - if(!reuseDeliveryTag) - { - deliveryTag = CoreMessageSender.generateRandomDeliveryTag(); - retryingSendWorkItem.setDeliveryTag(deliveryTag); - } - - this.enlistSendRequest(deliveryTag, retryingSendWorkItem, true); - this.scheduleSendTimeout(retryingSendWorkItem); - } - } - } + { + if(!retryingSendWorkItem.getWork().isDone() && retryingSendWorkItem.cancelTimeoutTask(false)) + { + Duration remainingTime = retryingSendWorkItem.getTimeoutTracker().remaining(); + if(!remainingTime.isNegative() && !remainingTime.isZero()) + { + if(!reuseDeliveryTag) + { + deliveryTag = CoreMessageSender.generateRandomDeliveryTag(); + retryingSendWorkItem.setDeliveryTag(deliveryTag); + } + + this.enlistSendRequest(deliveryTag, retryingSendWorkItem, true); + this.scheduleSendTimeout(retryingSendWorkItem); + } + } + } public CompletableFuture sendAsync(final Iterable messages, TransactionContext transaction) { @@ -349,7 +349,7 @@ public CompletableFuture sendAsync(final Iterable messages, Trans { throw new IllegalArgumentException("Sending Empty batch of messages is not allowed."); } - + TRACE_LOGGER.debug("Sending a batch of messages to '{}'", this.sendPath); Message firstMessage = messages.iterator().next(); @@ -370,10 +370,10 @@ public CompletableFuture sendAsync(final Iterable messages, Trans Pair encodedPair = Util.encodeMessageToMaxSizeArray(batchMessage, this.maxMessageSize); bytes = encodedPair.getFirstItem(); byteArrayOffset = encodedPair.getSecondItem(); - + for(Message amqpMessage: messages) { - Message messageWrappedByData = Proton.message(); + Message messageWrappedByData = Proton.message(); encodedPair = Util.encodeMessageToOptimalSizeArray(amqpMessage, this.maxMessageSize); messageWrappedByData.setBody(new Data(new Binary(encodedPair.getFirstItem(), 0, encodedPair.getSecondItem()))); @@ -383,7 +383,7 @@ public CompletableFuture sendAsync(final Iterable messages, Trans } catch(PayloadSizeExceededException ex) { - TRACE_LOGGER.error("Payload size of batch of messages exceeded limit", ex); + TRACE_LOGGER.error("Payload size of batch of messages exceeded limit", ex); final CompletableFuture sendTask = new CompletableFuture(); sendTask.completeExceptionally(ex); return sendTask; @@ -391,7 +391,7 @@ public CompletableFuture sendAsync(final Iterable messages, Trans return this.sendCoreAsync(bytes, byteArrayOffset, AmqpConstants.AMQP_BATCH_MESSAGE_FORMAT, transaction).thenAccept((x) -> { /*Do nothing*/ }); } - + public CompletableFuture sendAsync(Message msg, TransactionContext transaction) { return this.sendAndReturnDeliveryStateAsync(msg, transaction).thenAccept((x) -> { /*Do nothing*/ }); @@ -407,7 +407,7 @@ CompletableFuture sendAndReturnDeliveryStateAsync(Message msg, Tr } catch(PayloadSizeExceededException exception) { - TRACE_LOGGER.error("Payload size of message exceeded limit", exception); + TRACE_LOGGER.error("Payload size of message exceeded limit", exception); final CompletableFuture sendTask = new CompletableFuture(); sendTask.completeExceptionally(exception); return sendTask; @@ -419,30 +419,30 @@ public void onOpenComplete(Exception completionException) { if (completionException == null) { - this.underlyingFactory.registerForConnectionError(this.sendLink); - this.maxMessageSize = Util.getMaxMessageSizeFromLink(this.sendLink); + this.underlyingFactory.registerForConnectionError(this.sendLink); + this.maxMessageSize = Util.getMaxMessageSizeFromLink(this.sendLink); this.lastKnownLinkError = null; this.retryPolicy.resetRetryCount(this.getClientId()); if(this.sendLinkReopenFuture != null && !this.sendLinkReopenFuture.isDone()) - { - AsyncUtil.completeFuture(this.sendLinkReopenFuture, null); - } - + { + AsyncUtil.completeFuture(this.sendLinkReopenFuture, null); + } + if (!this.linkFirstOpen.isDone()) { - TRACE_LOGGER.info("Opened send link to '{}'", this.sendPath); - AsyncUtil.completeFuture(this.linkFirstOpen, this); + TRACE_LOGGER.info("Opened send link to '{}'", this.sendPath); + AsyncUtil.completeFuture(this.linkFirstOpen, this); } else - { + { synchronized (this.pendingSendLock) { if (!this.pendingSendsData.isEmpty()) { LinkedList unacknowledgedSends = new LinkedList(); unacknowledgedSends.addAll(this.pendingSendsData.keySet()); - + if (unacknowledgedSends.size() > 0) { Iterator reverseReader = unacknowledgedSends.iterator(); @@ -455,7 +455,7 @@ public void onOpenComplete(Exception completionException) } } } - + unacknowledgedSends.clear(); } } @@ -463,28 +463,28 @@ public void onOpenComplete(Exception completionException) } else { - this.cancelSASTokenRenewTimer(); + this.cancelSASTokenRenewTimer(); if (!this.linkFirstOpen.isDone()) { - TRACE_LOGGER.error("Opening send link '{}' to '{}' failed", this.sendLink.getName(), this.sendPath, completionException); - this.setClosed(); + TRACE_LOGGER.error("Opening send link '{}' to '{}' failed", this.sendLink.getName(), this.sendPath, completionException); + this.setClosed(); ExceptionUtil.completeExceptionally(this.linkFirstOpen, completionException, this, true); } - + if(this.sendLinkReopenFuture != null && !this.sendLinkReopenFuture.isDone()) - { - TRACE_LOGGER.warn("Opening send link '{}' to '{}' failed", this.sendLink.getName(), this.sendPath, completionException); - AsyncUtil.completeFutureExceptionally(this.sendLinkReopenFuture, completionException); - } + { + TRACE_LOGGER.warn("Opening send link '{}' to '{}' failed", this.sendLink.getName(), this.sendPath, completionException); + AsyncUtil.completeFutureExceptionally(this.sendLinkReopenFuture, completionException); + } } } @Override public void onClose(ErrorCondition condition) - { - Exception completionException = condition != null ? ExceptionUtil.toException(condition) + { + Exception completionException = condition != null ? ExceptionUtil.toException(condition) : new ServiceBusException(ClientConstants.DEFAULT_IS_TRANSIENT, - "The entity has been closed due to transient failures (underlying link closed), please retry the operation."); + "The entity has been closed due to transient failures (underlying link closed), please retry the operation."); this.onError(completionException); } @@ -494,18 +494,18 @@ public void onError(Exception completionException) this.linkCredit = 0; if (this.getIsClosingOrClosed()) { - Exception failureException = completionException == null - ? new OperationCancelledException("Send cancelled as the Sender instance is Closed before the sendOperation completed.") - : completionException; + Exception failureException = completionException == null + ? new OperationCancelledException("Send cancelled as the Sender instance is Closed before the sendOperation completed.") + : completionException; this.clearAllPendingSendsWithException(failureException); - + TRACE_LOGGER.info("Send link to '{}' closed", this.sendPath); AsyncUtil.completeFuture(this.linkClose, null); return; } else { - this.underlyingFactory.deregisterForConnectionError(this.sendLink); + this.underlyingFactory.deregisterForConnectionError(this.sendLink); this.lastKnownLinkError = completionException; this.lastKnownErrorReportedAt = Instant.now(); @@ -514,11 +514,11 @@ public void onError(Exception completionException) if (completionException != null && (!(completionException instanceof ServiceBusException) || !((ServiceBusException) completionException).getIsTransient())) { - TRACE_LOGGER.warn("Send link '{}' to '{}' closed. Failing all pending send requests.", this.sendLink.getName(), this.sendPath); + TRACE_LOGGER.warn("Send link '{}' to '{}' closed. Failing all pending send requests.", this.sendLink.getName(), this.sendPath); this.clearAllPendingSendsWithException(completionException); } else - { + { final Map.Entry> pendingSendEntry = IteratorUtil.getFirst(this.pendingSendsData.entrySet()); if (pendingSendEntry != null && pendingSendEntry.getValue() != null) { @@ -528,8 +528,8 @@ public void onError(Exception completionException) final Duration nextRetryInterval = this.retryPolicy.getNextRetryInterval(this.getClientId(), completionException, tracker.remaining()); if (nextRetryInterval != null) { - TRACE_LOGGER.warn("Send link '{}' to '{}' closed. Will retry link creation after '{}'.", this.sendLink.getName(), this.sendPath, nextRetryInterval); - Timer.schedule(() -> {CoreMessageSender.this.ensureLinkIsOpen();}, nextRetryInterval, TimerType.OneTimeRun); + TRACE_LOGGER.warn("Send link '{}' to '{}' closed. Will retry link creation after '{}'.", this.sendLink.getName(), this.sendPath, nextRetryInterval); + Timer.schedule(() -> {CoreMessageSender.this.ensureLinkIsOpen();}, nextRetryInterval, TimerType.OneTimeRun); } } } @@ -542,7 +542,7 @@ public void onSendComplete(final Delivery delivery) { DeliveryState outcome = delivery.getRemoteState(); final String deliveryTag = new String(delivery.getTag()); - + TRACE_LOGGER.debug("Received ack for delivery. path:{}, linkName:{}, deliveryTag:{}, outcome:{}", CoreMessageSender.this.sendPath, this.sendLink.getName(), deliveryTag, outcome); final SendWorkItem pendingSendWorkItem = this.pendingSendsData.remove(deliveryTag); @@ -590,7 +590,7 @@ else if (outcome instanceof Rejected) } else { - TRACE_LOGGER.warn("Send failed for delivery '{}'. Will retry after '{}'", deliveryTag, retryInterval); + TRACE_LOGGER.warn("Send failed for delivery '{}'. Will retry after '{}'", deliveryTag, retryInterval); pendingSendWorkItem.setLastKnownException(exception); Timer.schedule(() -> {CoreMessageSender.this.reSendAsync(deliveryTag, pendingSendWorkItem, false);}, retryInterval, TimerType.OneTimeRun); } @@ -599,7 +599,7 @@ else if (outcome instanceof Released) { this.cleanupFailedSend(pendingSendWorkItem, new OperationCancelledException(outcome.toString())); } - else + else { this.cleanupFailedSend(pendingSendWorkItem, new ServiceBusException(false, outcome.toString())); } @@ -609,24 +609,24 @@ else if (outcome instanceof Released) TRACE_LOGGER.warn("Delivery mismatch. path:{}, linkName:{}, delivery:{}", this.sendPath, this.sendLink.getName(), deliveryTag); } } - + private void clearAllPendingSendsWithException(Exception failureException) { - synchronized (this.pendingSendLock) - { - for (Map.Entry> pendingSend: this.pendingSendsData.entrySet()) - { - this.cleanupFailedSend(pendingSend.getValue(), failureException); - } - - this.pendingSendsData.clear(); - this.pendingSends.clear(); - } + synchronized (this.pendingSendLock) + { + for (Map.Entry> pendingSend: this.pendingSendsData.entrySet()) + { + this.cleanupFailedSend(pendingSend.getValue(), failureException); + } + + this.pendingSendsData.clear(); + this.pendingSends.clear(); + } } - + private void cleanupFailedSend(final SendWorkItem failedSend, final Exception exception) { - failedSend.cancelTimeoutTask(false); + failedSend.cancelTimeoutTask(false); ExceptionUtil.completeExceptionally(failedSend.getWork(), exception, this, true); } @@ -661,7 +661,7 @@ private static SenderLinkSettings getDefaultLinkProperties(String sendPath, Stri private void createSendLink(SenderLinkSettings linkSettings) { - TRACE_LOGGER.info("Creating send link to '{}'", this.sendPath); + TRACE_LOGGER.info("Creating send link to '{}'", this.sendPath); final Connection connection = this.underlyingFactory.getConnection(); final Session session = connection.session(); session.setOutgoingWindow(Integer.MAX_VALUE); @@ -681,16 +681,16 @@ private void createSendLink(SenderLinkSettings linkSettings) sender.open(); this.sendLink = sender; } - + CompletableFuture sendTokenAndSetRenewTimer(boolean retryOnFailure) { - if(this.getIsClosingOrClosed()) - { - return CompletableFuture.completedFuture(null); - } - else - { - CompletableFuture> sendTokenFuture = this.underlyingFactory.sendSecurityTokenAndSetRenewTimer(this.sasTokenAudienceURI, retryOnFailure, () -> this.sendTokenAndSetRenewTimer(true)); + if(this.getIsClosingOrClosed()) + { + return CompletableFuture.completedFuture(null); + } + else + { + CompletableFuture> sendTokenFuture = this.underlyingFactory.sendSecurityTokenAndSetRenewTimer(this.sasTokenAudienceURI, retryOnFailure, () -> this.sendTokenAndSetRenewTimer(true)); CompletableFuture sasTokenFuture = sendTokenFuture.thenAccept((f) -> {this.sasTokenRenewTimerFuture = f; TRACE_LOGGER.debug("Sent SAS Token and set renew timer");}); if (this.transferDestinationPath!= null && !this.transferDestinationPath.isEmpty()) @@ -700,18 +700,18 @@ CompletableFuture sendTokenAndSetRenewTimer(boolean retryOnFailure) } return sasTokenFuture; - } + } } - + private void cancelSASTokenRenewTimer() - { - if(this.sasTokenRenewTimerFuture != null && !this.sasTokenRenewTimerFuture.isDone()) - { - this.sasTokenRenewTimerFuture.cancel(true); - TRACE_LOGGER.debug("Cancelled SAS Token renew timer"); - } - } - + { + if(this.sasTokenRenewTimerFuture != null && !this.sasTokenRenewTimerFuture.isDone()) + { + this.sasTokenRenewTimerFuture.cancel(true); + TRACE_LOGGER.debug("Cancelled SAS Token renew timer"); + } + } + // TODO: consolidate common-code written for timeouts in Sender/Receiver private void initializeLinkOpen(TimeoutTracker timeout) { @@ -725,9 +725,9 @@ public void run() { if (!CoreMessageSender.this.linkFirstOpen.isDone()) { - CoreMessageSender.this.closeInternals(false); - CoreMessageSender.this.setClosed(); - + CoreMessageSender.this.closeInternals(false); + CoreMessageSender.this.setClosed(); + Exception operationTimedout = new TimeoutException( String.format(Locale.US, "Open operation on SendLink(%s) on Entity(%s) timed out at %s.", CoreMessageSender.this.sendLink.getName(), CoreMessageSender.this.getSendPath(), ZonedDateTime.now().toString()), CoreMessageSender.this.lastKnownErrorReportedAt.isAfter(Instant.now().minusSeconds(ClientConstants.SERVER_BUSY_BASE_SLEEP_TIME_IN_SECS)) ? CoreMessageSender.this.lastKnownLinkError : null); @@ -746,13 +746,13 @@ public ErrorContext getContext() final boolean isLinkOpened = this.linkFirstOpen != null && this.linkFirstOpen.isDone(); final String referenceId = this.sendLink != null && this.sendLink.getRemoteProperties() != null && this.sendLink.getRemoteProperties().containsKey(ClientConstants.TRACKING_ID_PROPERTY) ? this.sendLink.getRemoteProperties().get(ClientConstants.TRACKING_ID_PROPERTY).toString() - : ((this.sendLink != null) ? this.sendLink.getName() : null); + : ((this.sendLink != null) ? this.sendLink.getName() : null); SenderErrorContext errorContext = new SenderErrorContext( this.underlyingFactory!=null ? this.underlyingFactory.getHostName() : null, - this.sendPath, - referenceId, - isLinkOpened && this.sendLink != null ? this.sendLink.getCredit() : null); + this.sendPath, + referenceId, + isLinkOpened && this.sendLink != null ? this.sendLink.getCredit() : null); return errorContext; } @@ -762,42 +762,42 @@ public void onFlow(final int creditIssued) this.lastKnownLinkError = null; if (creditIssued <= 0) - return; - + return; + TRACE_LOGGER.debug("Received flow frame. path:{}, linkName:{}, remoteLinkCredit:{}, pendingSendsWaitingForCredit:{}, pendingSendsWaitingDelivery:{}", - this.sendPath, this.sendLink.getName(), creditIssued, this.pendingSends.size(), this.pendingSendsData.size() - this.pendingSends.size()); + this.sendPath, this.sendLink.getName(), creditIssued, this.pendingSends.size(), this.pendingSendsData.size() - this.pendingSends.size()); this.linkCredit = this.linkCredit + creditIssued; this.sendWork.onEvent(); } - + private synchronized CompletableFuture ensureLinkIsOpen() - { - // Send SAS token before opening a link as connection might have been closed and reopened - if (!(this.sendLink.getLocalState() == EndpointState.ACTIVE && this.sendLink.getRemoteState() == EndpointState.ACTIVE)) - { - if(this.sendLinkReopenFuture == null || this.sendLinkReopenFuture.isDone()) - { - TRACE_LOGGER.info("Recreating send link to '{}'", this.sendPath); - this.retryPolicy.incrementRetryCount(CoreMessageSender.this.getClientId()); - this.sendLinkReopenFuture = new CompletableFuture(); - // Variable just to closed over by the scheduled runnable. The runnable should cancel only the closed over future, not the parent's instance variable which can change - final CompletableFuture linkReopenFutureThatCanBeCancelled = this.sendLinkReopenFuture; - Timer.schedule( - () -> { - if (!linkReopenFutureThatCanBeCancelled.isDone()) - { - CoreMessageSender.this.cancelSASTokenRenewTimer(); - Exception operationTimedout = new TimeoutException( - String.format(Locale.US, "%s operation on SendLink(%s) to path(%s) timed out at %s.", "Open", CoreMessageSender.this.sendLink.getName(), CoreMessageSender.this.sendPath, ZonedDateTime.now())); - - TRACE_LOGGER.warn(operationTimedout.getMessage()); - linkReopenFutureThatCanBeCancelled.completeExceptionally(operationTimedout); - } - } - , CoreMessageSender.LINK_REOPEN_TIMEOUT - , TimerType.OneTimeRun); - this.cancelSASTokenRenewTimer(); + { + // Send SAS token before opening a link as connection might have been closed and reopened + if (!(this.sendLink.getLocalState() == EndpointState.ACTIVE && this.sendLink.getRemoteState() == EndpointState.ACTIVE)) + { + if(this.sendLinkReopenFuture == null || this.sendLinkReopenFuture.isDone()) + { + TRACE_LOGGER.info("Recreating send link to '{}'", this.sendPath); + this.retryPolicy.incrementRetryCount(CoreMessageSender.this.getClientId()); + this.sendLinkReopenFuture = new CompletableFuture(); + // Variable just to closed over by the scheduled runnable. The runnable should cancel only the closed over future, not the parent's instance variable which can change + final CompletableFuture linkReopenFutureThatCanBeCancelled = this.sendLinkReopenFuture; + Timer.schedule( + () -> { + if (!linkReopenFutureThatCanBeCancelled.isDone()) + { + CoreMessageSender.this.cancelSASTokenRenewTimer(); + Exception operationTimedout = new TimeoutException( + String.format(Locale.US, "%s operation on SendLink(%s) to path(%s) timed out at %s.", "Open", CoreMessageSender.this.sendLink.getName(), CoreMessageSender.this.sendPath, ZonedDateTime.now())); + + TRACE_LOGGER.warn(operationTimedout.getMessage()); + linkReopenFutureThatCanBeCancelled.completeExceptionally(operationTimedout); + } + } + , CoreMessageSender.LINK_REOPEN_TIMEOUT + , TimerType.OneTimeRun); + this.cancelSASTokenRenewTimer(); CompletableFuture authenticationFuture = null; if (linkSettings.requiresAuthentication) { @@ -807,175 +807,175 @@ private synchronized CompletableFuture ensureLinkIsOpen() } authenticationFuture.handleAsync((v, sendTokenEx) -> { - if(sendTokenEx != null) - { - Throwable cause = ExceptionUtil.extractAsyncCompletionCause(sendTokenEx); - TRACE_LOGGER.error("Sending SAS Token to '{}' failed.", this.sendPath, cause); - this.sendLinkReopenFuture.completeExceptionally(sendTokenEx); - } - else - { - try - { - this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() - { - @Override - public void onEvent() - { - CoreMessageSender.this.createSendLink(CoreMessageSender.this.linkSettings); - } - }); - } - catch (IOException ioEx) - { - this.sendLinkReopenFuture.completeExceptionally(ioEx); - } - } - return null; - }); - } - - return this.sendLinkReopenFuture; - } - else - { - return CompletableFuture.completedFuture(null); - } - } - + if(sendTokenEx != null) + { + Throwable cause = ExceptionUtil.extractAsyncCompletionCause(sendTokenEx); + TRACE_LOGGER.error("Sending SAS Token to '{}' failed.", this.sendPath, cause); + this.sendLinkReopenFuture.completeExceptionally(sendTokenEx); + } + else + { + try + { + this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() + { + @Override + public void onEvent() + { + CoreMessageSender.this.createSendLink(CoreMessageSender.this.linkSettings); + } + }); + } + catch (IOException ioEx) + { + this.sendLinkReopenFuture.completeExceptionally(ioEx); + } + } + return null; + }); + } + + return this.sendLinkReopenFuture; + } + else + { + return CompletableFuture.completedFuture(null); + } + } + // actual send on the SenderLink should happen only in this method & should run on Reactor Thread private void processSendWork() { - synchronized (this.pendingSendLock) - { - if(!this.isSendLoopRunning) - { - this.isSendLoopRunning = true; - } - else - { - return; - } - } - - TRACE_LOGGER.debug("Processing pending sends to '{}'. Available link credit '{}'", this.sendPath, this.linkCredit); - try - { - if(!this.ensureLinkIsOpen().isDone()) - { - // Link recreation is pending - return; - } - - final Sender sendLinkCurrent = this.sendLink; - while (sendLinkCurrent != null - && sendLinkCurrent.getLocalState() == EndpointState.ACTIVE && sendLinkCurrent.getRemoteState() == EndpointState.ACTIVE - && this.linkCredit > 0) - { - final WeightedDeliveryTag deliveryTag; - final SendWorkItem sendData; - synchronized (this.pendingSendLock) - { - deliveryTag = this.pendingSends.poll(); - if (deliveryTag == null) - { - TRACE_LOGGER.debug("There are no pending sends to '{}'.", this.sendPath); - // Must be done inside this synchronized block - this.isSendLoopRunning = false; - break; - } - else - { - sendData = this.pendingSendsData.get(deliveryTag.getDeliveryTag()); - if(sendData == null) - { - TRACE_LOGGER.debug("SendData not found for this delivery. path:{}, linkName:{}, deliveryTag:{}", this.sendPath, this.sendLink.getName(), deliveryTag); - continue; - } - } - } - - if (sendData.getWork() != null && sendData.getWork().isDone()) - { - // CoreSend could enqueue Sends into PendingSends Queue and can fail the SendCompletableFuture - // (when It fails to schedule the ProcessSendWork on reactor Thread) - this.pendingSendsData.remove(sendData); - continue; - } - - Delivery delivery = null; - boolean linkAdvance = false; - int sentMsgSize = 0; - Exception sendException = null; - - try - { - delivery = sendLinkCurrent.delivery(deliveryTag.getDeliveryTag().getBytes()); - delivery.setMessageFormat(sendData.getMessageFormat()); - - TransactionContext transaction = sendData.getTransaction(); - if (transaction != TransactionContext.NULL_TXN) { + synchronized (this.pendingSendLock) + { + if(!this.isSendLoopRunning) + { + this.isSendLoopRunning = true; + } + else + { + return; + } + } + + TRACE_LOGGER.debug("Processing pending sends to '{}'. Available link credit '{}'", this.sendPath, this.linkCredit); + try + { + if(!this.ensureLinkIsOpen().isDone()) + { + // Link recreation is pending + return; + } + + final Sender sendLinkCurrent = this.sendLink; + while (sendLinkCurrent != null + && sendLinkCurrent.getLocalState() == EndpointState.ACTIVE && sendLinkCurrent.getRemoteState() == EndpointState.ACTIVE + && this.linkCredit > 0) + { + final WeightedDeliveryTag deliveryTag; + final SendWorkItem sendData; + synchronized (this.pendingSendLock) + { + deliveryTag = this.pendingSends.poll(); + if (deliveryTag == null) + { + TRACE_LOGGER.debug("There are no pending sends to '{}'.", this.sendPath); + // Must be done inside this synchronized block + this.isSendLoopRunning = false; + break; + } + else + { + sendData = this.pendingSendsData.get(deliveryTag.getDeliveryTag()); + if(sendData == null) + { + TRACE_LOGGER.debug("SendData not found for this delivery. path:{}, linkName:{}, deliveryTag:{}", this.sendPath, this.sendLink.getName(), deliveryTag); + continue; + } + } + } + + if (sendData.getWork() != null && sendData.getWork().isDone()) + { + // CoreSend could enqueue Sends into PendingSends Queue and can fail the SendCompletableFuture + // (when It fails to schedule the ProcessSendWork on reactor Thread) + this.pendingSendsData.remove(sendData); + continue; + } + + Delivery delivery = null; + boolean linkAdvance = false; + int sentMsgSize = 0; + Exception sendException = null; + + try + { + delivery = sendLinkCurrent.delivery(deliveryTag.getDeliveryTag().getBytes()); + delivery.setMessageFormat(sendData.getMessageFormat()); + + TransactionContext transaction = sendData.getTransaction(); + if (transaction != TransactionContext.NULL_TXN) { TransactionalState transactionalState = new TransactionalState(); transactionalState.setTxnId(new Binary(transaction.getTransactionId().array())); - delivery.disposition(transactionalState); + delivery.disposition(transactionalState); + } + + TRACE_LOGGER.debug("Sending message delivery '{}' to '{}'", deliveryTag.getDeliveryTag(), this.sendPath); + sentMsgSize = sendLinkCurrent.send(sendData.getMessage(), 0, sendData.getEncodedMessageSize()); + assert sentMsgSize == sendData.getEncodedMessageSize() : "Contract of the ProtonJ library for Sender.Send API changed"; + + linkAdvance = sendLinkCurrent.advance(); + } + catch(Exception exception) + { + sendException = exception; + } + + if (linkAdvance) + { + this.linkCredit--; + sendData.setWaitingForAck(); + } + else + { + TRACE_LOGGER.warn("Sendlink advance failed. path:{}, linkName:{}, deliveryTag:{}, sentMessageSize:{}, payloadActualSiz:{}", + this.sendPath, this.sendLink.getName(), deliveryTag, sentMsgSize, sendData.getEncodedMessageSize()); + + if (delivery != null) + { + delivery.free(); } - TRACE_LOGGER.debug("Sending message delivery '{}' to '{}'", deliveryTag.getDeliveryTag(), this.sendPath); - sentMsgSize = sendLinkCurrent.send(sendData.getMessage(), 0, sendData.getEncodedMessageSize()); - assert sentMsgSize == sendData.getEncodedMessageSize() : "Contract of the ProtonJ library for Sender.Send API changed"; - - linkAdvance = sendLinkCurrent.advance(); - } - catch(Exception exception) - { - sendException = exception; - } - - if (linkAdvance) - { - this.linkCredit--; - sendData.setWaitingForAck(); - } - else - { - TRACE_LOGGER.warn("Sendlink advance failed. path:{}, linkName:{}, deliveryTag:{}, sentMessageSize:{}, payloadActualSiz:{}", - this.sendPath, this.sendLink.getName(), deliveryTag, sentMsgSize, sendData.getEncodedMessageSize()); - - if (delivery != null) - { - delivery.free(); - } - - Exception completionException = sendException != null ? new OperationCancelledException("Send operation failed. Please see cause for more details", sendException) - : new OperationCancelledException(String.format(Locale.US, "Send operation failed while advancing delivery(tag: %s) on SendLink(path: %s).", this.sendPath, deliveryTag)); - AsyncUtil.completeFutureExceptionally(sendData.getWork(), completionException); - } - } - } - finally - { - synchronized (this.pendingSendLock) - { - if(this.isSendLoopRunning) - { - this.isSendLoopRunning = false; - } - } - } + Exception completionException = sendException != null ? new OperationCancelledException("Send operation failed. Please see cause for more details", sendException) + : new OperationCancelledException(String.format(Locale.US, "Send operation failed while advancing delivery(tag: %s) on SendLink(path: %s).", this.sendPath, deliveryTag)); + AsyncUtil.completeFutureExceptionally(sendData.getWork(), completionException); + } + } + } + finally + { + synchronized (this.pendingSendLock) + { + if(this.isSendLoopRunning) + { + this.isSendLoopRunning = false; + } + } + } } private void throwSenderTimeout(CompletableFuture pendingSendWork, Exception lastKnownException) { Exception cause = lastKnownException; if (lastKnownException == null && this.lastKnownLinkError != null) - { + { cause = this.lastKnownErrorReportedAt.isAfter(Instant.now().minusMillis(this.operationTimeout.toMillis())) ? this.lastKnownLinkError : null; } boolean isClientSideTimeout = (cause == null || !(cause instanceof ServiceBusException)); ServiceBusException exception = isClientSideTimeout - ? new TimeoutException(String.format(Locale.US, "%s %s %s.", CoreMessageSender.SEND_TIMED_OUT, " at ", ZonedDateTime.now(), cause)) - : (ServiceBusException) cause; + ? new TimeoutException(String.format(Locale.US, "%s %s %s.", CoreMessageSender.SEND_TIMED_OUT, " at ", ZonedDateTime.now(), cause)) + : (ServiceBusException) cause; TRACE_LOGGER.error("Send timed out", exception); ExceptionUtil.completeExceptionally(pendingSendWork, exception, this, true); @@ -991,7 +991,7 @@ public void run() { if (!linkClose.isDone()) { - Exception operationTimedout = new TimeoutException(String.format(Locale.US, "%s operation on Send Link(%s) timed out at %s", "Close", CoreMessageSender.this.sendLink.getName(), ZonedDateTime.now())); + Exception operationTimedout = new TimeoutException(String.format(Locale.US, "%s operation on Send Link(%s) timed out at %s", "Close", CoreMessageSender.this.sendLink.getName(), ZonedDateTime.now())); TRACE_LOGGER.warn(operationTimedout.getMessage()); ExceptionUtil.completeExceptionally(linkClose, operationTimedout, CoreMessageSender.this, false); @@ -1008,90 +1008,90 @@ protected CompletableFuture onClose() this.closeInternals(true); return this.linkClose; } - + private void closeInternals(boolean waitForCloseCompletion) { - if (!this.getIsClosed()) - { - if (this.sendLink != null && this.sendLink.getLocalState() != EndpointState.CLOSED) - { - try { - this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() { - - @Override - public void onEvent() { - if (CoreMessageSender.this.sendLink != null && CoreMessageSender.this.sendLink.getLocalState() != EndpointState.CLOSED) - { - TRACE_LOGGER.info("Closing send link to '{}'", CoreMessageSender.this.sendPath); - CoreMessageSender.this.underlyingFactory.deregisterForConnectionError(CoreMessageSender.this.sendLink); - CoreMessageSender.this.sendLink.close(); - if(waitForCloseCompletion) - { - CoreMessageSender.this.scheduleLinkCloseTimeout(TimeoutTracker.create(CoreMessageSender.this.operationTimeout)); - } - else - { - AsyncUtil.completeFuture(CoreMessageSender.this.linkClose, null); - } - - } - } - }); - } catch (IOException e) { - AsyncUtil.completeFutureExceptionally(this.linkClose, e); - } - } - else - { - AsyncUtil.completeFuture(this.linkClose, null); - } - - this.cancelSASTokenRenewTimer(); - this.closeRequestResponseLink(); - } + if (!this.getIsClosed()) + { + if (this.sendLink != null && this.sendLink.getLocalState() != EndpointState.CLOSED) + { + try { + this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() { + + @Override + public void onEvent() { + if (CoreMessageSender.this.sendLink != null && CoreMessageSender.this.sendLink.getLocalState() != EndpointState.CLOSED) + { + TRACE_LOGGER.info("Closing send link to '{}'", CoreMessageSender.this.sendPath); + CoreMessageSender.this.underlyingFactory.deregisterForConnectionError(CoreMessageSender.this.sendLink); + CoreMessageSender.this.sendLink.close(); + if(waitForCloseCompletion) + { + CoreMessageSender.this.scheduleLinkCloseTimeout(TimeoutTracker.create(CoreMessageSender.this.operationTimeout)); + } + else + { + AsyncUtil.completeFuture(CoreMessageSender.this.linkClose, null); + } + + } + } + }); + } catch (IOException e) { + AsyncUtil.completeFutureExceptionally(this.linkClose, e); + } + } + else + { + AsyncUtil.completeFuture(this.linkClose, null); + } + + this.cancelSASTokenRenewTimer(); + this.closeRequestResponseLink(); + } } private static class WeightedDeliveryTag { private final String deliveryTag; private final int priority; - + WeightedDeliveryTag(final String deliveryTag, final int priority) { this.deliveryTag = deliveryTag; this.priority = priority; } - + public String getDeliveryTag() { return this.deliveryTag; } - + public int getPriority() { return this.priority; } } - + private static class DeliveryTagComparator implements Comparator { @Override public int compare(WeightedDeliveryTag deliveryTag0, WeightedDeliveryTag deliveryTag1) { return deliveryTag1.getPriority() - deliveryTag0.getPriority(); - } + } } public CompletableFuture scheduleMessageAsync(Message[] messages, TransactionContext transaction, Duration timeout) { - TRACE_LOGGER.debug("Sending '{}' scheduled message(s) to '{}'", messages.length, this.sendPath); + TRACE_LOGGER.debug("Sending '{}' scheduled message(s) to '{}'", messages.length, this.sendPath); return this.createRequestResponseLink().thenComposeAsync((v) -> { HashMap requestBodyMap = new HashMap(); Collection messageList = new LinkedList(); for(Message message : messages) { HashMap messageEntry = new HashMap(); - + Pair encodedPair = null; try { @@ -1104,28 +1104,28 @@ public CompletableFuture scheduleMessageAsync(Message[] messages, Transa scheduleMessagesTask.completeExceptionally(exception); return scheduleMessagesTask; } - + messageEntry.put(ClientConstants.REQUEST_RESPONSE_MESSAGE, new Binary(encodedPair.getFirstItem(), 0, encodedPair.getSecondItem())); messageEntry.put(ClientConstants.REQUEST_RESPONSE_MESSAGE_ID, message.getMessageId()); - + String sessionId = message.getGroupId(); if(!StringUtil.isNullOrEmpty(sessionId)) { messageEntry.put(ClientConstants.REQUEST_RESPONSE_SESSION_ID, sessionId); } - + Object partitionKey = message.getMessageAnnotations().getValue().get(Symbol.valueOf(ClientConstants.PARTITIONKEYNAME)); if(partitionKey != null && !((String)partitionKey).isEmpty()) { messageEntry.put(ClientConstants.REQUEST_RESPONSE_PARTITION_KEY, (String)partitionKey); } - Object viaPartitionKey = message.getMessageAnnotations().getValue().get(Symbol.valueOf(ClientConstants.VIAPARTITIONKEYNAME)); - if(viaPartitionKey != null && !((String)viaPartitionKey).isEmpty()) - { - messageEntry.put(ClientConstants.REQUEST_RESPONSE_VIA_PARTITION_KEY, (String)viaPartitionKey); - } - + Object viaPartitionKey = message.getMessageAnnotations().getValue().get(Symbol.valueOf(ClientConstants.VIAPARTITIONKEYNAME)); + if(viaPartitionKey != null && !((String)viaPartitionKey).isEmpty()) + { + messageEntry.put(ClientConstants.REQUEST_RESPONSE_VIA_PARTITION_KEY, (String)viaPartitionKey); + } + messageList.add(messageEntry); } requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_MESSAGES, messageList); @@ -1139,7 +1139,7 @@ public CompletableFuture scheduleMessageAsync(Message[] messages, Transa long[] sequenceNumbers = (long[])RequestResponseUtils.getResponseBody(responseMessage).get(ClientConstants.REQUEST_RESPONSE_SEQUENCE_NUMBERS); if(TRACE_LOGGER.isDebugEnabled()) { - TRACE_LOGGER.debug("Scheduled messages sent. Received sequence numbers '{}'", Arrays.toString(sequenceNumbers)); + TRACE_LOGGER.debug("Scheduled messages sent. Received sequence numbers '{}'", Arrays.toString(sequenceNumbers)); } returningFuture.complete(sequenceNumbers); @@ -1147,26 +1147,26 @@ public CompletableFuture scheduleMessageAsync(Message[] messages, Transa else { // error response - Exception scheduleException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage); - TRACE_LOGGER.error("Sending scheduled messages to '{}' failed.", this.sendPath, scheduleException); + Exception scheduleException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage); + TRACE_LOGGER.error("Sending scheduled messages to '{}' failed.", this.sendPath, scheduleException); returningFuture.completeExceptionally(scheduleException); } return returningFuture; }); }); } - + public CompletableFuture cancelScheduledMessageAsync(Long[] sequenceNumbers, Duration timeout) { - if(TRACE_LOGGER.isDebugEnabled()) - { - TRACE_LOGGER.debug("Cancelling scheduled message(s) '{}' to '{}'", Arrays.toString(sequenceNumbers), this.sendPath); - } - + if(TRACE_LOGGER.isDebugEnabled()) + { + TRACE_LOGGER.debug("Cancelling scheduled message(s) '{}' to '{}'", Arrays.toString(sequenceNumbers), this.sendPath); + } + return this.createRequestResponseLink().thenComposeAsync((v) -> { HashMap requestBodyMap = new HashMap(); requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SEQUENCE_NUMBERS, sequenceNumbers); - + Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_CANCEL_CHEDULE_MESSAGE_OPERATION, requestBodyMap, Util.adjustServerTimeout(timeout), this.sendLink.getName()); CompletableFuture responseFuture = this.requestResponseLink.requestAysnc(requestMessage, TransactionContext.NULL_TXN, timeout); return responseFuture.thenComposeAsync((responseMessage) -> { @@ -1174,28 +1174,28 @@ public CompletableFuture cancelScheduledMessageAsync(Long[] sequenceNumber int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage); if(statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) { - TRACE_LOGGER.debug("Cancelled scheduled messages in '{}'", this.sendPath); + TRACE_LOGGER.debug("Cancelled scheduled messages in '{}'", this.sendPath); returningFuture.complete(null); } else { // error response - Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage); - TRACE_LOGGER.error("Cancelling scheduled messages in '{}' failed.", this.sendPath, failureException); + Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage); + TRACE_LOGGER.error("Cancelling scheduled messages in '{}' failed.", this.sendPath, failureException); returningFuture.completeExceptionally(failureException); } return returningFuture; }); }); } - + // In case we need to support peek on a topic public CompletableFuture> peekMessagesAsync(long fromSequenceNumber, int messageCount) { - TRACE_LOGGER.debug("Peeking '{}' messages in '{}' from sequence number '{}'", messageCount, this.sendPath, fromSequenceNumber); + TRACE_LOGGER.debug("Peeking '{}' messages in '{}' from sequence number '{}'", messageCount, this.sendPath, fromSequenceNumber); return this.createRequestResponseLink().thenComposeAsync((v) -> { return CommonRequestResponseOperations.peekMessagesAsync(this.requestResponseLink, this.operationTimeout, fromSequenceNumber, messageCount, null, this.sendLink.getName()); }); } -} +} \ No newline at end of file diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java index 1585cf25308d9..a2025db3384fc 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java @@ -18,6 +18,7 @@ import com.microsoft.azure.servicebus.TransactionContext; import com.microsoft.azure.servicebus.Utils; +import com.microsoft.azure.servicebus.amqp.*; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.BaseHandler; @@ -34,13 +35,6 @@ import org.slf4j.MarkerFactory; import com.microsoft.azure.servicebus.ClientSettings; -import com.microsoft.azure.servicebus.amqp.BaseLinkHandler; -import com.microsoft.azure.servicebus.amqp.ConnectionHandler; -import com.microsoft.azure.servicebus.amqp.DispatchHandler; -import com.microsoft.azure.servicebus.amqp.IAmqpConnection; -import com.microsoft.azure.servicebus.amqp.ProtonUtil; -import com.microsoft.azure.servicebus.amqp.ReactorDispatcher; -import com.microsoft.azure.servicebus.amqp.ReactorHandler; import com.microsoft.azure.servicebus.security.SecurityToken; /** @@ -87,7 +81,9 @@ private MessagingFactory(URI namespaceEndpointUri, ClientSettings clientSettings this.registeredLinks = new LinkedList(); this.connetionCloseFuture = new CompletableFuture(); this.reactorLock = new Object(); - this.connectionHandler = new ConnectionHandler(this); + this.connectionHandler = clientSettings.getTransportType() == TransportType.AMQP + ? new ConnectionHandler(this) + : new WebSocketConnectionHandler(this); this.factoryOpenFuture = new CompletableFuture(); this.cbsLinkCreationFuture = new CompletableFuture(); this.managementLinksCache = new RequestResponseLinkCache(this); @@ -99,8 +95,8 @@ public void onReactorInit(Event e) super.onReactorInit(e); final Reactor r = e.getReactor(); - TRACE_LOGGER.info("Creating connection to host '{}:{}'", hostName, ClientConstants.AMQPS_PORT); - connection = r.connectionToHost(hostName, ClientConstants.AMQPS_PORT, connectionHandler); + TRACE_LOGGER.info("Creating connection to host '{}:{}'", hostName, connectionHandler.getPort()); + connection = r.connectionToHost(hostName, connectionHandler.getPort(), connectionHandler); } }; Timer.register(this.getClientId()); @@ -203,7 +199,7 @@ private ReactorDispatcher getReactorScheduler() private void startReactor(ReactorHandler reactorHandler) throws IOException { TRACE_LOGGER.info("Creating and starting reactor"); - Reactor newReactor = ProtonUtil.reactor(reactorHandler); + Reactor newReactor = ProtonUtil.reactor(reactorHandler, this.connectionHandler.getMaxFrameSize()); synchronized (this.reactorLock) { this.reactor = newReactor; @@ -221,7 +217,7 @@ Connection getConnection() if (this.connection == null || this.connection.getLocalState() == EndpointState.CLOSED || this.connection.getRemoteState() == EndpointState.CLOSED) { TRACE_LOGGER.info("Creating connection to host '{}:{}'", hostName, ClientConstants.AMQPS_PORT); - this.connection = this.getReactor().connectionToHost(this.hostName, ClientConstants.AMQPS_PORT, this.connectionHandler); + this.connection = this.getReactor().connectionToHost(this.hostName, connectionHandler.getPort(), this.connectionHandler); } return this.connection; @@ -692,11 +688,11 @@ private static ScheduledFuture scheduleRenewTimer(Instant currentTokenValidUn int renewInterval = Util.getTokenRenewIntervalInSeconds((int)Duration.between(Instant.now(), currentTokenValidUntil).getSeconds()); return Timer.schedule(validityRenewer, Duration.ofSeconds(renewInterval), TimerType.OneTimeRun); } - + CompletableFuture obtainRequestResponseLinkAsync(String entityPath, MessagingEntityType entityType) { - this.throwIfClosed(null); - return this.managementLinksCache.obtainRequestResponseLinkAsync(entityPath, null, entityType); + this.throwIfClosed(null); + return this.managementLinksCache.obtainRequestResponseLinkAsync(entityPath, null, entityType); } CompletableFuture obtainRequestResponseLinkAsync(String entityPath, String transferDestinationPath, MessagingEntityType entityType) @@ -720,40 +716,40 @@ void releaseRequestResponseLink(String entityPath, String transferDestinationPat this.managementLinksCache.releaseRequestResponseLink(entityPath, transferDestinationPath); } } - + private CompletableFuture createCBSLinkAsync() - { - if(++this.cbsLinkCreationAttempts > MAX_CBS_LINK_CREATION_ATTEMPTS ) - { - Throwable completionEx = this.lastCBSLinkCreationException == null ? new Exception("CBS link creation failed multiple times.") : this.lastCBSLinkCreationException; - this.cbsLinkCreationFuture.completeExceptionally(completionEx); - return CompletableFuture.completedFuture(null); - } - else - { - String requestResponseLinkPath = RequestResponseLink.getCBSNodeLinkPath(); - TRACE_LOGGER.info("Creating CBS link to {}", requestResponseLinkPath); - CompletableFuture crateAndAssignRequestResponseLink = - RequestResponseLink.createAsync(this, this.getClientId() + "-cbs", requestResponseLinkPath, null, null, null, null) - .handleAsync((cbsLink, ex) -> - { - if(ex == null) - { - TRACE_LOGGER.info("Created CBS link to {}", requestResponseLinkPath); - this.cbsLink = cbsLink; - this.cbsLinkCreationFuture.complete(null); - } - else - { - this.lastCBSLinkCreationException = ExceptionUtil.extractAsyncCompletionCause(ex); - TRACE_LOGGER.warn("Creating CBS link to {} failed. Attempts '{}'", requestResponseLinkPath, this.cbsLinkCreationAttempts); - this.createCBSLinkAsync(); - } - return null; - }); - return crateAndAssignRequestResponseLink; - } - } + { + if(++this.cbsLinkCreationAttempts > MAX_CBS_LINK_CREATION_ATTEMPTS ) + { + Throwable completionEx = this.lastCBSLinkCreationException == null ? new Exception("CBS link creation failed multiple times.") : this.lastCBSLinkCreationException; + this.cbsLinkCreationFuture.completeExceptionally(completionEx); + return CompletableFuture.completedFuture(null); + } + else + { + String requestResponseLinkPath = RequestResponseLink.getCBSNodeLinkPath(); + TRACE_LOGGER.info("Creating CBS link to {}", requestResponseLinkPath); + CompletableFuture crateAndAssignRequestResponseLink = + RequestResponseLink.createAsync(this, this.getClientId() + "-cbs", requestResponseLinkPath, null, null, null, null) + .handleAsync((cbsLink, ex) -> + { + if(ex == null) + { + TRACE_LOGGER.info("Created CBS link to {}", requestResponseLinkPath); + this.cbsLink = cbsLink; + this.cbsLinkCreationFuture.complete(null); + } + else + { + this.lastCBSLinkCreationException = ExceptionUtil.extractAsyncCompletionCause(ex); + TRACE_LOGGER.warn("Creating CBS link to {} failed. Attempts '{}'", requestResponseLinkPath, this.cbsLinkCreationAttempts); + this.createCBSLinkAsync(); + } + return null; + }); + return crateAndAssignRequestResponseLink; + } + } private static T completeFuture(CompletableFuture future) throws InterruptedException, ServiceBusException { try { diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java index 82918b391ac79..940828a62ca4a 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java @@ -43,7 +43,7 @@ class RequestResponseLink extends ClientEntity{ private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(RequestResponseLink.class); - + private final Object recreateLinksLock; private final MessagingFactory underlyingFactory; private final String linkPath; @@ -51,16 +51,16 @@ class RequestResponseLink extends ClientEntity{ private final String additionalAudienceURI; private final CompletableFuture createFuture; private final ConcurrentHashMap pendingRequests; - private final AtomicInteger requestCounter; - private final String replyTo; - - private ScheduledFuture sasTokenRenewTimerFuture; + private final AtomicInteger requestCounter; + private final String replyTo; + + private ScheduledFuture sasTokenRenewTimerFuture; private InternalReceiver amqpReceiver; private InternalSender amqpSender; private boolean isRecreateLinksInProgress; private Map additionalProperties; private MessagingEntityType entityType; - + public static CompletableFuture createAsync( MessagingFactory messagingFactory, String linkName, @@ -78,7 +78,7 @@ public static CompletableFuture createAsync( additionalAudience, additionalProperties, entityType); - + Timer.schedule( new Runnable() { @@ -86,10 +86,10 @@ public void run() { if (!requestReponseLink.createFuture.isDone()) { - requestReponseLink.amqpSender.closeInternals(false); - requestReponseLink.amqpReceiver.closeInternals(false); - requestReponseLink.cancelSASTokenRenewTimer(); - + requestReponseLink.amqpSender.closeInternals(false); + requestReponseLink.amqpReceiver.closeInternals(false); + requestReponseLink.cancelSASTokenRenewTimer(); + Exception operationTimedout = new TimeoutException( String.format(Locale.US, "Open operation on RequestResponseLink(%s) on Entity(%s) timed out at %s.", requestReponseLink.getClientId(), requestReponseLink.linkPath, ZonedDateTime.now().toString())); TRACE_LOGGER.error("RequestResponseLink open timed out.", operationTimedout); @@ -99,66 +99,66 @@ public void run() } , messagingFactory.getOperationTimeout() , TimerType.OneTimeRun); - + requestReponseLink.sendTokenAndSetRenewTimer(false).handleAsync((v, sasTokenEx) -> { - if(sasTokenEx != null) - { - Throwable cause = ExceptionUtil.extractAsyncCompletionCause(sasTokenEx); - TRACE_LOGGER.error("Sending SAS Token failed. RequestResponseLink path:{}", requestReponseLink.linkPath, cause); - requestReponseLink.createFuture.completeExceptionally(cause); - } - else - { - try - { - messagingFactory.scheduleOnReactorThread(new DispatchHandler() - { - @Override - public void onEvent() - { - requestReponseLink.createInternalLinks(); - } - }); - } - catch (IOException ioException) - { - requestReponseLink.cancelSASTokenRenewTimer(); - requestReponseLink.createFuture.completeExceptionally(new ServiceBusException(false, "Failed to create internal links, see cause for more details.", ioException)); - } - } - - return null; - }); - - CompletableFuture.allOf(requestReponseLink.amqpSender.openFuture, requestReponseLink.amqpReceiver.openFuture).handleAsync((v, ex) -> - { - if(ex == null) - { - TRACE_LOGGER.info("Opened requestresponselink to {}", requestReponseLink.linkPath); - requestReponseLink.createFuture.complete(requestReponseLink); - } - else - { - requestReponseLink.cancelSASTokenRenewTimer(); - requestReponseLink.createFuture.completeExceptionally(ex); - } - - return null; - }); - + if(sasTokenEx != null) + { + Throwable cause = ExceptionUtil.extractAsyncCompletionCause(sasTokenEx); + TRACE_LOGGER.error("Sending SAS Token failed. RequestResponseLink path:{}", requestReponseLink.linkPath, cause); + requestReponseLink.createFuture.completeExceptionally(cause); + } + else + { + try + { + messagingFactory.scheduleOnReactorThread(new DispatchHandler() + { + @Override + public void onEvent() + { + requestReponseLink.createInternalLinks(); + } + }); + } + catch (IOException ioException) + { + requestReponseLink.cancelSASTokenRenewTimer(); + requestReponseLink.createFuture.completeExceptionally(new ServiceBusException(false, "Failed to create internal links, see cause for more details.", ioException)); + } + } + + return null; + }); + + CompletableFuture.allOf(requestReponseLink.amqpSender.openFuture, requestReponseLink.amqpReceiver.openFuture).handleAsync((v, ex) -> + { + if(ex == null) + { + TRACE_LOGGER.info("Opened requestresponselink to {}", requestReponseLink.linkPath); + requestReponseLink.createFuture.complete(requestReponseLink); + } + else + { + requestReponseLink.cancelSASTokenRenewTimer(); + requestReponseLink.createFuture.completeExceptionally(ex); + } + + return null; + }); + return requestReponseLink.createFuture; } - + public static String getManagementNodeLinkPath(String entityPath) { return String.format("%s/%s", entityPath, AmqpConstants.MANAGEMENT_NODE_ADDRESS_SEGMENT); } - + public static String getCBSNodeLinkPath() - { - return AmqpConstants.CBS_NODE_ADDRESS_SEGMENT; - } - + { + return AmqpConstants.CBS_NODE_ADDRESS_SEGMENT; + } + private RequestResponseLink( MessagingFactory messagingFactory, String linkName, @@ -169,7 +169,7 @@ private RequestResponseLink( MessagingEntityType entityType) { super(linkName); - + this.recreateLinksLock = new Object(); this.isRecreateLinksInProgress = false; this.underlyingFactory = messagingFactory; @@ -185,21 +185,21 @@ private RequestResponseLink( this.createFuture = new CompletableFuture(); this.entityType = entityType; } - + public String getLinkPath() { - return this.linkPath; + return this.linkPath; } - + private CompletableFuture sendTokenAndSetRenewTimer(boolean retryOnFailure) - { - if(this.getIsClosingOrClosed() || this.sasTokenAudienceURI == null) - { - return CompletableFuture.completedFuture(null); - } - else - { - CompletableFuture> sendTokenFuture = this.underlyingFactory.sendSecurityTokenAndSetRenewTimer(this.sasTokenAudienceURI, retryOnFailure, () -> this.sendTokenAndSetRenewTimer(true)); + { + if(this.getIsClosingOrClosed() || this.sasTokenAudienceURI == null) + { + return CompletableFuture.completedFuture(null); + } + else + { + CompletableFuture> sendTokenFuture = this.underlyingFactory.sendSecurityTokenAndSetRenewTimer(this.sasTokenAudienceURI, retryOnFailure, () -> this.sendTokenAndSetRenewTimer(true)); CompletableFuture sasTokenFuture = sendTokenFuture.thenAccept((f) -> {this.sasTokenRenewTimerFuture = f; TRACE_LOGGER.debug("Set SAS Token renew timer");}); if (additionalAudienceURI != null) { @@ -207,19 +207,19 @@ private CompletableFuture sendTokenAndSetRenewTimer(boolean retryOnFailure return CompletableFuture.allOf(sasTokenFuture, transferSendTokenFuture); } - return sasTokenFuture; - } - } - + return sasTokenFuture; + } + } + private void cancelSASTokenRenewTimer() - { - if(this.sasTokenRenewTimerFuture != null && !this.sasTokenRenewTimerFuture.isDone()) - { - TRACE_LOGGER.debug("Cancelling SAS Token renew timer"); - this.sasTokenRenewTimerFuture.cancel(true); - } - } - + { + if(this.sasTokenRenewTimerFuture != null && !this.sasTokenRenewTimerFuture.isDone()) + { + TRACE_LOGGER.debug("Cancelling SAS Token renew timer"); + this.sasTokenRenewTimerFuture.cancel(true); + } + } + private void createInternalLinks() { Map commonLinkProperties = new HashMap<>(); @@ -245,7 +245,7 @@ private void createInternalLinks() String sendLinkName = !StringUtil.isNullOrEmpty(connection.getRemoteContainer()) ? sendLinkNamePrefix.concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(connection.getRemoteContainer()) : sendLinkNamePrefix; - + Sender sender = session.sender(sendLinkName); Target sednerTarget = new Target(); sednerTarget.setAddress(this.linkPath); @@ -260,15 +260,15 @@ private void createInternalLinks() this.amqpSender.setSendLink(sender); TRACE_LOGGER.debug("RequestReponseLink - opening send link to {}", this.linkPath); sender.open(); - + // Create receive link session = connection.session(); - session.setOutgoingWindow(Integer.MAX_VALUE); - session.open(); - BaseHandler.setHandler(session, new SessionHandler(this.linkPath)); - + session.setOutgoingWindow(Integer.MAX_VALUE); + session.open(); + BaseHandler.setHandler(session, new SessionHandler(this.linkPath)); + String receiveLinkNamePrefix = "RequestResponseLink-Receiver".concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(StringUtil.getShortRandomString()); - String receiveLinkName = !StringUtil.isNullOrEmpty(connection.getRemoteContainer()) ? + String receiveLinkName = !StringUtil.isNullOrEmpty(connection.getRemoteContainer()) ? receiveLinkNamePrefix.concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(connection.getRemoteContainer()) : receiveLinkNamePrefix; Receiver receiver = session.receiver(receiveLinkName); @@ -285,131 +285,131 @@ private void createInternalLinks() receiver.setProperties(commonLinkProperties); final ReceiveLinkHandler receiveLinkHandler = new ReceiveLinkHandler(this.amqpReceiver); - BaseHandler.setHandler(receiver, receiveLinkHandler); + BaseHandler.setHandler(receiver, receiveLinkHandler); this.amqpReceiver.setReceiveLink(receiver); TRACE_LOGGER.debug("RequestReponseLink - opening receive link to {}", this.linkPath); receiver.open(); } - + private CompletableFuture recreateInternalLinks() { - TRACE_LOGGER.info("RequestResponseLink - recreating internal send and receive links to {}", this.linkPath); + TRACE_LOGGER.info("RequestResponseLink - recreating internal send and receive links to {}", this.linkPath); this.amqpSender.closeInternals(false); this.amqpReceiver.closeInternals(false); this.cancelSASTokenRenewTimer(); - + // Create new internal sender and receiver objects, as old ones are closed - this.amqpSender = new InternalSender(this.getClientId() + ":internalSender", this, this.amqpSender); - this.amqpReceiver = new InternalReceiver(this.getClientId() + ":interalReceiver", this); - CompletableFuture recreateInternalLinksFuture = new CompletableFuture(); - this.sendTokenAndSetRenewTimer(false).handleAsync((v, sasTokenEx) -> { - if(sasTokenEx != null) - { - Throwable cause = ExceptionUtil.extractAsyncCompletionCause(sasTokenEx); - TRACE_LOGGER.error("Sending SAS Token failed. RequestResponseLink path:{}", this.linkPath, cause); - recreateInternalLinksFuture.completeExceptionally(cause); - } - else - { - try - { - this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() - { - @Override - public void onEvent() - { - RequestResponseLink.this.createInternalLinks(); - } - }); - } - catch (IOException ioException) - { - this.cancelSASTokenRenewTimer(); - recreateInternalLinksFuture.completeExceptionally(new ServiceBusException(false, "Failed to create internal links, see cause for more details.", ioException)); - } - } - - return null; - }); - - CompletableFuture.allOf(this.amqpSender.openFuture, this.amqpReceiver.openFuture).handleAsync((v, ex) -> - { - if(ex == null) - { - TRACE_LOGGER.info("Recreated internal links to {}", this.linkPath); - recreateInternalLinksFuture.complete(null); - } - else - { - this.cancelSASTokenRenewTimer(); - recreateInternalLinksFuture.completeExceptionally(ex); - } - - return null; - }); - + this.amqpSender = new InternalSender(this.getClientId() + ":internalSender", this, this.amqpSender); + this.amqpReceiver = new InternalReceiver(this.getClientId() + ":interalReceiver", this); + CompletableFuture recreateInternalLinksFuture = new CompletableFuture(); + this.sendTokenAndSetRenewTimer(false).handleAsync((v, sasTokenEx) -> { + if(sasTokenEx != null) + { + Throwable cause = ExceptionUtil.extractAsyncCompletionCause(sasTokenEx); + TRACE_LOGGER.error("Sending SAS Token failed. RequestResponseLink path:{}", this.linkPath, cause); + recreateInternalLinksFuture.completeExceptionally(cause); + } + else + { + try + { + this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() + { + @Override + public void onEvent() + { + RequestResponseLink.this.createInternalLinks(); + } + }); + } + catch (IOException ioException) + { + this.cancelSASTokenRenewTimer(); + recreateInternalLinksFuture.completeExceptionally(new ServiceBusException(false, "Failed to create internal links, see cause for more details.", ioException)); + } + } + + return null; + }); + + CompletableFuture.allOf(this.amqpSender.openFuture, this.amqpReceiver.openFuture).handleAsync((v, ex) -> + { + if(ex == null) + { + TRACE_LOGGER.info("Recreated internal links to {}", this.linkPath); + recreateInternalLinksFuture.complete(null); + } + else + { + this.cancelSASTokenRenewTimer(); + recreateInternalLinksFuture.completeExceptionally(ex); + } + + return null; + }); + Timer.schedule( - new Runnable() - { - public void run() - { - if (!recreateInternalLinksFuture.isDone()) - { - Exception operationTimedout = new TimeoutException( - String.format(Locale.US, "Recreating internal links of requestresponselink to %s timed out.", RequestResponseLink.this.linkPath)); - TRACE_LOGGER.warn("Recreating internal links of requestresponselink timed out.", operationTimedout); - RequestResponseLink.this.cancelSASTokenRenewTimer(); - recreateInternalLinksFuture.completeExceptionally(operationTimedout); - } - } - } - , this.underlyingFactory.getOperationTimeout() - , TimerType.OneTimeRun); - + new Runnable() + { + public void run() + { + if (!recreateInternalLinksFuture.isDone()) + { + Exception operationTimedout = new TimeoutException( + String.format(Locale.US, "Recreating internal links of requestresponselink to %s timed out.", RequestResponseLink.this.linkPath)); + TRACE_LOGGER.warn("Recreating internal links of requestresponselink timed out.", operationTimedout); + RequestResponseLink.this.cancelSASTokenRenewTimer(); + recreateInternalLinksFuture.completeExceptionally(operationTimedout); + } + } + } + , this.underlyingFactory.getOperationTimeout() + , TimerType.OneTimeRun); + return recreateInternalLinksFuture; } - + private void completeAllPendingRequestsWithException(Exception exception) { - TRACE_LOGGER.warn("Completing all pending requests with exception in request response link to {}", this.linkPath); + TRACE_LOGGER.warn("Completing all pending requests with exception in request response link to {}", this.linkPath); for(RequestResponseWorkItem workItem : this.pendingRequests.values()) - { + { workItem.getWork().completeExceptionally(exception); workItem.cancelTimeoutTask(true); } - + this.pendingRequests.clear(); } - + public CompletableFuture requestAysnc(Message requestMessage, TransactionContext transaction, Duration timeout) - { + { this.throwIfClosed(null); // Check and recreate links if necessary - if(!((this.amqpSender.sendLink.getLocalState() == EndpointState.ACTIVE && this.amqpSender.sendLink.getRemoteState() == EndpointState.ACTIVE) - && (this.amqpReceiver.receiveLink.getLocalState() == EndpointState.ACTIVE && this.amqpReceiver.receiveLink.getRemoteState() == EndpointState.ACTIVE))) - { - synchronized (this.recreateLinksLock) { - if(!this.isRecreateLinksInProgress) - { - this.isRecreateLinksInProgress = true; - this.recreateInternalLinks().handleAsync((v, recreationEx) -> - { - if(recreationEx != null) - { - TRACE_LOGGER.warn("Recreating internal links of reqestresponselink '{}' failed.", this.linkPath, ExceptionUtil.extractAsyncCompletionCause(recreationEx)); - } - - synchronized (this.recreateLinksLock) - { - this.isRecreateLinksInProgress = false; - } - - return null; - }); - } - } - } - + if(!((this.amqpSender.sendLink.getLocalState() == EndpointState.ACTIVE && this.amqpSender.sendLink.getRemoteState() == EndpointState.ACTIVE) + && (this.amqpReceiver.receiveLink.getLocalState() == EndpointState.ACTIVE && this.amqpReceiver.receiveLink.getRemoteState() == EndpointState.ACTIVE))) + { + synchronized (this.recreateLinksLock) { + if(!this.isRecreateLinksInProgress) + { + this.isRecreateLinksInProgress = true; + this.recreateInternalLinks().handleAsync((v, recreationEx) -> + { + if(recreationEx != null) + { + TRACE_LOGGER.warn("Recreating internal links of reqestresponselink '{}' failed.", this.linkPath, ExceptionUtil.extractAsyncCompletionCause(recreationEx)); + } + + synchronized (this.recreateLinksLock) + { + this.isRecreateLinksInProgress = false; + } + + return null; + }); + } + } + } + CompletableFuture responseFuture = new CompletableFuture(); RequestResponseWorkItem workItem = new RequestResponseWorkItem(requestMessage, transaction, responseFuture, timeout); String requestId = "request:" + this.requestCounter.incrementAndGet(); @@ -421,21 +421,21 @@ public CompletableFuture requestAysnc(Message requestMessage, Transacti this.amqpSender.sendRequest(requestId, false); return responseFuture; } - + private ScheduledFuture scheduleRequestTimeout(String requestId, Duration timeout) { return Timer.schedule(new Runnable() { - public void run() - { - TRACE_LOGGER.warn("Request with id:{} timed out", requestId); - RequestResponseWorkItem completedWorkItem = RequestResponseLink.this.exceptionallyCompleteRequest(requestId, new TimeoutException("Request timed out."), true); - boolean isRetriedWorkItem = completedWorkItem.getLastKnownException() != null; - RequestResponseLink.this.amqpSender.removeEnqueuedRequest(requestId, isRetriedWorkItem); - } - }, timeout, TimerType.OneTimeRun); + public void run() + { + TRACE_LOGGER.warn("Request with id:{} timed out", requestId); + RequestResponseWorkItem completedWorkItem = RequestResponseLink.this.exceptionallyCompleteRequest(requestId, new TimeoutException("Request timed out."), true); + boolean isRetriedWorkItem = completedWorkItem.getLastKnownException() != null; + RequestResponseLink.this.amqpSender.removeEnqueuedRequest(requestId, isRetriedWorkItem); + } + }, timeout, TimerType.OneTimeRun); } - - + + private RequestResponseWorkItem exceptionallyCompleteRequest(String requestId, Exception exception, boolean useLastKnownException) { RequestResponseWorkItem workItem = this.pendingRequests.remove(requestId); @@ -446,14 +446,14 @@ private RequestResponseWorkItem exceptionallyCompleteRequest(String requestId, E { exceptionToReport = workItem.getLastKnownException(); } - + workItem.getWork().completeExceptionally(exceptionToReport); workItem.cancelTimeoutTask(true); } - + return workItem; } - + private RequestResponseWorkItem completeRequestWithResponse(String requestId, Message responseMessage) { RequestResponseWorkItem workItem = this.pendingRequests.get(requestId); @@ -464,7 +464,7 @@ private RequestResponseWorkItem completeRequestWithResponse(String requestId, Me // Retry on server busy and other retry-able status codes (what are other codes??) if(statusCode == ClientConstants.REQUEST_RESPONSE_SERVER_BUSY_STATUS_CODE) { - TRACE_LOGGER.warn("Request with id:{} received ServerBusy response from '{}'", requestId, this.linkPath); + TRACE_LOGGER.warn("Request with id:{} received ServerBusy response from '{}'", requestId, this.linkPath); // error response Exception responseException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage); this.underlyingFactory.getRetryPolicy().incrementRetryCount(this.getClientId()); @@ -472,13 +472,13 @@ private RequestResponseWorkItem completeRequestWithResponse(String requestId, Me if (retryInterval == null) { // Either not retry-able or not enough time to retry - TRACE_LOGGER.error("Request with id:{} cannot be retried. So completing with excetion.", requestId, responseException); + TRACE_LOGGER.error("Request with id:{} cannot be retried. So completing with excetion.", requestId, responseException); this.exceptionallyCompleteRequest(requestId, responseException, false); } else { // Retry - TRACE_LOGGER.info("Request with id:{} will be retried after {}.", requestId, retryInterval); + TRACE_LOGGER.info("Request with id:{} will be retried after {}.", requestId, retryInterval); workItem.setLastKnownException(responseException); try { this.underlyingFactory.scheduleOnReactorThread((int) retryInterval.toMillis(), @@ -497,7 +497,7 @@ public void onEvent() } else { - TRACE_LOGGER.debug("Completing request with id:{}", requestId); + TRACE_LOGGER.debug("Completing request with id:{}", requestId); this.underlyingFactory.getRetryPolicy().resetRetryCount(this.getClientId()); this.pendingRequests.remove(requestId); workItem.getWork().complete(responseMessage); @@ -507,20 +507,20 @@ public void onEvent() else { TRACE_LOGGER.warn("Request with id:{} not found in the requestresponse link.", requestId); - } - + } + return workItem; } @Override protected CompletableFuture onClose() { - TRACE_LOGGER.info("Closing requestresponselink to {} by closing both internal sender and receiver links.", this.linkPath); - this.cancelSASTokenRenewTimer(); + TRACE_LOGGER.info("Closing requestresponselink to {} by closing both internal sender and receiver links.", this.linkPath); + this.cancelSASTokenRenewTimer(); return this.amqpSender.closeAsync().thenComposeAsync((v) -> this.amqpReceiver.closeAsync()); } - + private static void scheduleLinkCloseTimeout(CompletableFuture closeFuture, Duration timeout, String linkName) - { + { Timer.schedule( new Runnable() { @@ -530,7 +530,7 @@ public void run() { Exception operationTimedout = new TimeoutException(String.format(Locale.US, "%s operation on Link(%s) timed out at %s", "Close", linkName, ZonedDateTime.now())); TRACE_LOGGER.warn("Closing link timed out", operationTimedout); - + closeFuture.completeExceptionally(operationTimedout); } } @@ -538,81 +538,81 @@ public void run() , timeout , TimerType.OneTimeRun); } - + private class InternalReceiver extends ClientEntity implements IAmqpReceiver { private RequestResponseLink parent; private Receiver receiveLink; private CompletableFuture openFuture; private CompletableFuture closeFuture; - + protected InternalReceiver(String clientId, RequestResponseLink parent) { super(clientId); this.parent = parent; this.openFuture = new CompletableFuture(); this.closeFuture = new CompletableFuture(); - } + } - @Override + @Override protected CompletableFuture onClose() { this.closeInternals(true); return this.closeFuture; } - + void closeInternals(boolean waitForCloseCompletion) { - if (!this.getIsClosed()) - { - if (this.receiveLink != null && this.receiveLink.getLocalState() != EndpointState.CLOSED) - { - try { - this.parent.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() { - - @Override - public void onEvent() { - if (InternalReceiver.this.receiveLink != null && InternalReceiver.this.receiveLink.getLocalState() != EndpointState.CLOSED) - { - TRACE_LOGGER.debug("Closing internal receive link of requestresponselink to {}", RequestResponseLink.this.linkPath); - InternalReceiver.this.receiveLink.close(); - InternalReceiver.this.parent.underlyingFactory.deregisterForConnectionError(InternalReceiver.this.receiveLink); - if(waitForCloseCompletion) - { - RequestResponseLink.scheduleLinkCloseTimeout(InternalReceiver.this.closeFuture, InternalReceiver.this.parent.underlyingFactory.getOperationTimeout(), InternalReceiver.this.receiveLink.getName()); - } - else - { - InternalReceiver.this.closeFuture.complete(null); - } - } - } - }); - } catch (IOException e) { - this.closeFuture.completeExceptionally(e); - } - } - else - { - this.closeFuture.complete(null); - } - } + if (!this.getIsClosed()) + { + if (this.receiveLink != null && this.receiveLink.getLocalState() != EndpointState.CLOSED) + { + try { + this.parent.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() { + + @Override + public void onEvent() { + if (InternalReceiver.this.receiveLink != null && InternalReceiver.this.receiveLink.getLocalState() != EndpointState.CLOSED) + { + TRACE_LOGGER.debug("Closing internal receive link of requestresponselink to {}", RequestResponseLink.this.linkPath); + InternalReceiver.this.receiveLink.close(); + InternalReceiver.this.parent.underlyingFactory.deregisterForConnectionError(InternalReceiver.this.receiveLink); + if(waitForCloseCompletion) + { + RequestResponseLink.scheduleLinkCloseTimeout(InternalReceiver.this.closeFuture, InternalReceiver.this.parent.underlyingFactory.getOperationTimeout(), InternalReceiver.this.receiveLink.getName()); + } + else + { + InternalReceiver.this.closeFuture.complete(null); + } + } + } + }); + } catch (IOException e) { + this.closeFuture.completeExceptionally(e); + } + } + else + { + this.closeFuture.complete(null); + } + } } @Override public void onOpenComplete(Exception completionException) { if(completionException == null) { - TRACE_LOGGER.debug("Opened internal receive link of requestresponselink to {}", parent.linkPath); - this.parent.underlyingFactory.registerForConnectionError(this.receiveLink); + TRACE_LOGGER.debug("Opened internal receive link of requestresponselink to {}", parent.linkPath); + this.parent.underlyingFactory.registerForConnectionError(this.receiveLink); this.openFuture.complete(null); - + // Send unlimited credit this.receiveLink.flow(Integer.MAX_VALUE); } else { - TRACE_LOGGER.error("Opening internal receive link '{}' of requestresponselink to {} failed.", this.receiveLink.getName(), this.parent.linkPath, completionException); - this.setClosed(); - this.closeFuture.complete(null); + TRACE_LOGGER.error("Opening internal receive link '{}' of requestresponselink to {} failed.", this.receiveLink.getName(), this.parent.linkPath, completionException); + this.setClosed(); + this.closeFuture.complete(null); this.openFuture.completeExceptionally(completionException); } } @@ -621,26 +621,26 @@ public void onOpenComplete(Exception completionException) { public void onError(Exception exception) { if(!this.openFuture.isDone()) { - this.onOpenComplete(exception); + this.onOpenComplete(exception); } - + if(this.getIsClosingOrClosed()) { if(!this.closeFuture.isDone()) { - TRACE_LOGGER.error("Closing internal receive link '{}' of requestresponselink to {} failed.", this.receiveLink.getName(), this.parent.linkPath, exception); + TRACE_LOGGER.error("Closing internal receive link '{}' of requestresponselink to {} failed.", this.receiveLink.getName(), this.parent.linkPath, exception); this.closeFuture.completeExceptionally(exception); } } - + TRACE_LOGGER.warn("Internal receive link '{}' of requestresponselink to '{}' encountered error.", this.receiveLink.getName(), this.parent.linkPath, exception); this.parent.underlyingFactory.deregisterForConnectionError(this.receiveLink); if(this.parent.amqpSender.sendLink != null) - { - this.parent.amqpSender.sendLink.close(); - this.parent.underlyingFactory.deregisterForConnectionError(this.parent.amqpSender.sendLink); - } - this.parent.cancelSASTokenRenewTimer(); + { + this.parent.amqpSender.sendLink.close(); + this.parent.underlyingFactory.deregisterForConnectionError(this.parent.amqpSender.sendLink); + } + this.parent.cancelSASTokenRenewTimer(); this.parent.completeAllPendingRequestsWithException(exception); } @@ -652,7 +652,7 @@ public void onClose(ErrorCondition condition) { { if(condition == null || condition.getCondition() == null) { - TRACE_LOGGER.info("Closed internal receive link of requestresponselink to {}", parent.linkPath); + TRACE_LOGGER.info("Closed internal receive link of requestresponselink to {}", parent.linkPath); this.closeFuture.complete(null); } else @@ -670,19 +670,19 @@ public void onClose(ErrorCondition condition) { Exception exception = ExceptionUtil.toException(condition); if(!this.openFuture.isDone()) { - this.onOpenComplete(exception); + this.onOpenComplete(exception); } else { - TRACE_LOGGER.warn("Internal receive link '{}' of requestresponselink to '{}' closed with error.", this.receiveLink.getName(), this.parent.linkPath, exception); - this.parent.underlyingFactory.deregisterForConnectionError(this.receiveLink); - if(this.parent.amqpSender.sendLink != null) - { - this.parent.amqpSender.sendLink.close(); - this.parent.underlyingFactory.deregisterForConnectionError(this.parent.amqpSender.sendLink); - } - this.parent.cancelSASTokenRenewTimer(); - this.parent.completeAllPendingRequestsWithException(exception); + TRACE_LOGGER.warn("Internal receive link '{}' of requestresponselink to '{}' closed with error.", this.receiveLink.getName(), this.parent.linkPath, exception); + this.parent.underlyingFactory.deregisterForConnectionError(this.receiveLink); + if(this.parent.amqpSender.sendLink != null) + { + this.parent.amqpSender.sendLink.close(); + this.parent.underlyingFactory.deregisterForConnectionError(this.parent.amqpSender.sendLink); + } + this.parent.cancelSASTokenRenewTimer(); + this.parent.completeAllPendingRequestsWithException(exception); } } } @@ -691,44 +691,44 @@ public void onClose(ErrorCondition condition) { @Override public void onReceiveComplete(Delivery delivery) { - Message responseMessage = null; - try - { - responseMessage = Util.readMessageFromDelivery(this.receiveLink, delivery); - delivery.disposition(Accepted.getInstance()); - delivery.settle(); - } + Message responseMessage = null; + try + { + responseMessage = Util.readMessageFromDelivery(this.receiveLink, delivery); + delivery.disposition(Accepted.getInstance()); + delivery.settle(); + } catch(Exception e) - { - TRACE_LOGGER.warn("Reading message from delivery failed with unexpected exception.", e); - - // release the delivery ?? - delivery.disposition(Released.getInstance()); - delivery.settle(); - return; - } - - // Return response in a separate thread so reactor thread is free to handle reactor events - final Message finalResponseMessage = responseMessage; - AsyncUtil.executorService.submit(() -> { - String requestMessageId = (String)finalResponseMessage.getCorrelationId(); - if(requestMessageId != null) - { - TRACE_LOGGER.debug("RequestRespnseLink received response for request with id :{}", requestMessageId); - this.parent.completeRequestWithResponse(requestMessageId, finalResponseMessage); - } - else - { - TRACE_LOGGER.warn("RequestRespnseLink received a message with null correlationId"); - } - }); + { + TRACE_LOGGER.warn("Reading message from delivery failed with unexpected exception.", e); + + // release the delivery ?? + delivery.disposition(Released.getInstance()); + delivery.settle(); + return; + } + + // Return response in a separate thread so reactor thread is free to handle reactor events + final Message finalResponseMessage = responseMessage; + AsyncUtil.executorService.submit(() -> { + String requestMessageId = (String)finalResponseMessage.getCorrelationId(); + if(requestMessageId != null) + { + TRACE_LOGGER.debug("RequestRespnseLink received response for request with id :{}", requestMessageId); + this.parent.completeRequestWithResponse(requestMessageId, finalResponseMessage); + } + else + { + TRACE_LOGGER.warn("RequestRespnseLink received a message with null correlationId"); + } + }); } public void setReceiveLink(Receiver receiveLink) { this.receiveLink = receiveLink; } } - + private class InternalSender extends ClientEntity implements IAmqpSender { private Sender sendLink; @@ -743,85 +743,85 @@ private class InternalSender extends ClientEntity implements IAmqpSender private int maxMessageSize; protected InternalSender(String clientId, RequestResponseLink parent, InternalSender senderToBeCopied) { - super(clientId); + super(clientId); this.parent = parent; this.availableCredit = new AtomicInteger(0); this.pendingSendsSyncLock = new Object(); this.isSendLoopRunning = false; this.openFuture = new CompletableFuture(); this.closeFuture = new CompletableFuture(); - + if(senderToBeCopied == null) { - this.pendingFreshSends = new LinkedList<>(); - this.pendingRetrySends = new LinkedList<>(); + this.pendingFreshSends = new LinkedList<>(); + this.pendingRetrySends = new LinkedList<>(); } else { - this.pendingFreshSends = senderToBeCopied.pendingFreshSends; - this.pendingRetrySends = senderToBeCopied.pendingRetrySends; + this.pendingFreshSends = senderToBeCopied.pendingFreshSends; + this.pendingRetrySends = senderToBeCopied.pendingRetrySends; } - } + } @Override protected CompletableFuture onClose() { this.closeInternals(true); return this.closeFuture; } - + void closeInternals(boolean waitForCloseCompletion) { - if (!this.getIsClosed()) - { - if (this.sendLink != null && this.sendLink.getLocalState() != EndpointState.CLOSED) - { - try { - this.parent.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() { - - @Override - public void onEvent() { - if (InternalSender.this.sendLink != null && InternalSender.this.sendLink.getLocalState() != EndpointState.CLOSED) - { - TRACE_LOGGER.debug("Closing internal send link of requestresponselink to {}", RequestResponseLink.this.linkPath); - InternalSender.this.sendLink.close(); - InternalSender.this.parent.underlyingFactory.deregisterForConnectionError(InternalSender.this.sendLink); - if(waitForCloseCompletion) - { - RequestResponseLink.scheduleLinkCloseTimeout(InternalSender.this.closeFuture, InternalSender.this.parent.underlyingFactory.getOperationTimeout(), InternalSender.this.sendLink.getName()); - } - else - { - InternalSender.this.closeFuture.complete(null); - } - } - } - }); - } catch (IOException e) { - this.closeFuture.completeExceptionally(e); - } - } - else - { - this.closeFuture.complete(null); - } - } + if (!this.getIsClosed()) + { + if (this.sendLink != null && this.sendLink.getLocalState() != EndpointState.CLOSED) + { + try { + this.parent.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() { + + @Override + public void onEvent() { + if (InternalSender.this.sendLink != null && InternalSender.this.sendLink.getLocalState() != EndpointState.CLOSED) + { + TRACE_LOGGER.debug("Closing internal send link of requestresponselink to {}", RequestResponseLink.this.linkPath); + InternalSender.this.sendLink.close(); + InternalSender.this.parent.underlyingFactory.deregisterForConnectionError(InternalSender.this.sendLink); + if(waitForCloseCompletion) + { + RequestResponseLink.scheduleLinkCloseTimeout(InternalSender.this.closeFuture, InternalSender.this.parent.underlyingFactory.getOperationTimeout(), InternalSender.this.sendLink.getName()); + } + else + { + InternalSender.this.closeFuture.complete(null); + } + } + } + }); + } catch (IOException e) { + this.closeFuture.completeExceptionally(e); + } + } + else + { + this.closeFuture.complete(null); + } + } } @Override public void onOpenComplete(Exception completionException) { if(completionException == null) { - TRACE_LOGGER.debug("Opened internal send link of requestresponselink to {}", parent.linkPath); - this.parent.underlyingFactory.registerForConnectionError(this.sendLink); - this.maxMessageSize = Util.getMaxMessageSizeFromLink(this.sendLink); + TRACE_LOGGER.debug("Opened internal send link of requestresponselink to {}", parent.linkPath); + this.parent.underlyingFactory.registerForConnectionError(this.sendLink); + this.maxMessageSize = Util.getMaxMessageSizeFromLink(this.sendLink); this.openFuture.complete(null); this.runSendLoop(); } else { - TRACE_LOGGER.error("Opening internal send link '{}' of requestresponselink to {} failed.", this.sendLink.getName(), this.parent.linkPath, completionException); - this.setClosed(); - this.closeFuture.complete(null); + TRACE_LOGGER.error("Opening internal send link '{}' of requestresponselink to {} failed.", this.sendLink.getName(), this.parent.linkPath, completionException); + this.setClosed(); + this.closeFuture.complete(null); this.openFuture.completeExceptionally(completionException); } } @@ -830,26 +830,26 @@ public void onOpenComplete(Exception completionException) { public void onError(Exception exception) { if(!this.openFuture.isDone()) { - this.onOpenComplete(exception); + this.onOpenComplete(exception); } - + if(this.getIsClosingOrClosed()) { if(!this.closeFuture.isDone()) { - TRACE_LOGGER.error("Closing internal send link '{}' of requestresponselink to {} failed.", this.sendLink.getName(), this.parent.linkPath, exception); + TRACE_LOGGER.error("Closing internal send link '{}' of requestresponselink to {} failed.", this.sendLink.getName(), this.parent.linkPath, exception); this.closeFuture.completeExceptionally(exception); } } - + TRACE_LOGGER.warn("Internal send link '{}' of requestresponselink to '{}' encountered error.", this.sendLink.getName(), this.parent.linkPath, exception); this.parent.underlyingFactory.deregisterForConnectionError(this.sendLink); if(this.parent.amqpReceiver.receiveLink != null) - { - this.parent.amqpReceiver.receiveLink.close(); - this.parent.underlyingFactory.deregisterForConnectionError(this.parent.amqpReceiver.receiveLink); - } - this.parent.cancelSASTokenRenewTimer(); + { + this.parent.amqpReceiver.receiveLink.close(); + this.parent.underlyingFactory.deregisterForConnectionError(this.parent.amqpReceiver.receiveLink); + } + this.parent.cancelSASTokenRenewTimer(); this.parent.completeAllPendingRequestsWithException(exception); } @@ -861,7 +861,7 @@ public void onClose(ErrorCondition condition) { { if(condition == null || condition.getCondition() == null) { - TRACE_LOGGER.info("Closed internal send link of requestresponselink to {}", this.parent.linkPath); + TRACE_LOGGER.info("Closed internal send link of requestresponselink to {}", this.parent.linkPath); this.closeFuture.complete(null); } else @@ -879,26 +879,26 @@ public void onClose(ErrorCondition condition) { Exception exception = ExceptionUtil.toException(condition); if(!this.openFuture.isDone()) { - this.onOpenComplete(exception); + this.onOpenComplete(exception); } else { - TRACE_LOGGER.warn("Internal send link '{}' of requestresponselink to '{}' closed with error.", this.sendLink.getName(), this.parent.linkPath, exception); - this.parent.underlyingFactory.deregisterForConnectionError(this.sendLink); - - if(this.parent.amqpReceiver.receiveLink != null) - { - this.parent.amqpReceiver.receiveLink.close(); - this.parent.underlyingFactory.deregisterForConnectionError(this.parent.amqpReceiver.receiveLink); - } - - this.parent.cancelSASTokenRenewTimer(); - this.parent.completeAllPendingRequestsWithException(exception); + TRACE_LOGGER.warn("Internal send link '{}' of requestresponselink to '{}' closed with error.", this.sendLink.getName(), this.parent.linkPath, exception); + this.parent.underlyingFactory.deregisterForConnectionError(this.sendLink); + + if(this.parent.amqpReceiver.receiveLink != null) + { + this.parent.amqpReceiver.receiveLink.close(); + this.parent.underlyingFactory.deregisterForConnectionError(this.parent.amqpReceiver.receiveLink); + } + + this.parent.cancelSASTokenRenewTimer(); + this.parent.completeAllPendingRequestsWithException(exception); } } } } - + public void sendRequest(String requestId, boolean isRetry) { synchronized(this.pendingSendsSyncLock) @@ -911,26 +911,26 @@ public void sendRequest(String requestId, boolean isRetry) { this.pendingFreshSends.add(requestId); } - + // This check must be done inside lock if(this.isSendLoopRunning) - { - return; - } + { + return; + } } - + try { - this.parent.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() { - @Override - public void onEvent() { - InternalSender.this.runSendLoop(); - } - }); - } catch (IOException e) { - this.parent.exceptionallyCompleteRequest(requestId, e, true); - } + this.parent.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() { + @Override + public void onEvent() { + InternalSender.this.runSendLoop(); + } + }); + } catch (IOException e) { + this.parent.exceptionallyCompleteRequest(requestId, e, true); + } } - + public void removeEnqueuedRequest(String requestId, boolean isRetry) { synchronized(this.pendingSendsSyncLock) @@ -943,13 +943,13 @@ public void removeEnqueuedRequest(String requestId, boolean isRetry) else { this.pendingFreshSends.remove(requestId); - } + } } } @Override public void onFlow(int creditIssued) { - TRACE_LOGGER.debug("RequestResonseLink {} internal sender received credit :{}", this.parent.linkPath, creditIssued); + TRACE_LOGGER.debug("RequestResonseLink {} internal sender received credit :{}", this.parent.linkPath, creditIssued); this.availableCredit.addAndGet(creditIssued); TRACE_LOGGER.debug("RequestResonseLink {} internal sender available credit :{}", this.parent.linkPath, this.availableCredit.get()); this.runSendLoop(); @@ -958,104 +958,104 @@ public void onFlow(int creditIssued) { @Override public void onSendComplete(Delivery delivery) { // Doesn't happen as sends are settled on send - } + } - public void setSendLink(Sender sendLink) { + public void setSendLink(Sender sendLink) { this.sendLink = sendLink; this.availableCredit = new AtomicInteger(0); } - + private void runSendLoop() { - synchronized(this.pendingSendsSyncLock) - { - if(this.isSendLoopRunning) - { - return; - } - else - { - this.isSendLoopRunning = true; - } - } - - TRACE_LOGGER.debug("Starting requestResponseLink {} internal sender send loop", this.parent.linkPath); - - try - { - while(this.sendLink != null && this.sendLink.getLocalState() == EndpointState.ACTIVE && this.sendLink.getRemoteState() == EndpointState.ACTIVE && this.availableCredit.get() > 0) - { - String requestIdToBeSent = null; - synchronized(pendingSendsSyncLock) - { - // First send retries and then fresh ones - requestIdToBeSent = this.pendingRetrySends.poll(); - if(requestIdToBeSent == null) - { - requestIdToBeSent = this.pendingFreshSends.poll(); - if(requestIdToBeSent == null) - { - // Set to false inside the synchronized block to avoid race condition - this.isSendLoopRunning = false; - TRACE_LOGGER.debug("RequestResponseLink {} internal sender send loop ending as there are no more requests enqueued.", this.parent.linkPath); - break; - } - } - } - - RequestResponseWorkItem requestToBeSent = this.parent.pendingRequests.get(requestIdToBeSent); - if(requestToBeSent != null) - { - Delivery delivery = this.sendLink.delivery(UUID.randomUUID().toString().getBytes()); - delivery.setMessageFormat(DeliveryImpl.DEFAULT_MESSAGE_FORMAT); - TransactionContext transaction = requestToBeSent.getTransaction(); - if (transaction != TransactionContext.NULL_TXN) { - TransactionalState transactionalState = new TransactionalState(); - transactionalState.setTxnId(new Binary(transaction.getTransactionId().array())); - delivery.disposition(transactionalState); - } - - Pair encodedPair = null; - try - { - encodedPair = Util.encodeMessageToOptimalSizeArray(requestToBeSent.getRequest(), this.maxMessageSize); - } - catch(PayloadSizeExceededException exception) - { - this.parent.exceptionallyCompleteRequest((String)requestToBeSent.getRequest().getMessageId(), new PayloadSizeExceededException(String.format("Size of the payload exceeded Maximum message size: %s kb", this.maxMessageSize / 1024), exception), false); - } - - try - { - int sentMsgSize = this.sendLink.send(encodedPair.getFirstItem(), 0, encodedPair.getSecondItem()); - assert sentMsgSize == encodedPair.getSecondItem() : "Contract of the ProtonJ library for Sender.Send API changed"; - delivery.settle(); - this.availableCredit.decrementAndGet(); - TRACE_LOGGER.debug("RequestResonseLink {} internal sender sent a request. available credit :{}", this.parent.linkPath, this.availableCredit.get()); - } - catch(Exception e) - { - TRACE_LOGGER.error("RequestResonseLink {} failed to send request with request id:{}.", this.parent.linkPath, requestIdToBeSent, e); - this.parent.exceptionallyCompleteRequest(requestIdToBeSent, e, false); - } - } - else - { - TRACE_LOGGER.warn("Request with id:{} not found in the requestresponse link.", requestIdToBeSent); - } - } - } - finally - { - synchronized (this.pendingSendsSyncLock) { - if(this.isSendLoopRunning) - { - this.isSendLoopRunning = false; - } - } - - TRACE_LOGGER.debug("RequestResponseLink {} internal sender send loop stopped.", this.parent.linkPath); - } + synchronized(this.pendingSendsSyncLock) + { + if(this.isSendLoopRunning) + { + return; + } + else + { + this.isSendLoopRunning = true; + } + } + + TRACE_LOGGER.debug("Starting requestResponseLink {} internal sender send loop", this.parent.linkPath); + + try + { + while(this.sendLink != null && this.sendLink.getLocalState() == EndpointState.ACTIVE && this.sendLink.getRemoteState() == EndpointState.ACTIVE && this.availableCredit.get() > 0) + { + String requestIdToBeSent = null; + synchronized(pendingSendsSyncLock) + { + // First send retries and then fresh ones + requestIdToBeSent = this.pendingRetrySends.poll(); + if(requestIdToBeSent == null) + { + requestIdToBeSent = this.pendingFreshSends.poll(); + if(requestIdToBeSent == null) + { + // Set to false inside the synchronized block to avoid race condition + this.isSendLoopRunning = false; + TRACE_LOGGER.debug("RequestResponseLink {} internal sender send loop ending as there are no more requests enqueued.", this.parent.linkPath); + break; + } + } + } + + RequestResponseWorkItem requestToBeSent = this.parent.pendingRequests.get(requestIdToBeSent); + if(requestToBeSent != null) + { + Delivery delivery = this.sendLink.delivery(UUID.randomUUID().toString().getBytes()); + delivery.setMessageFormat(DeliveryImpl.DEFAULT_MESSAGE_FORMAT); + TransactionContext transaction = requestToBeSent.getTransaction(); + if (transaction != TransactionContext.NULL_TXN) { + TransactionalState transactionalState = new TransactionalState(); + transactionalState.setTxnId(new Binary(transaction.getTransactionId().array())); + delivery.disposition(transactionalState); + } + + Pair encodedPair = null; + try + { + encodedPair = Util.encodeMessageToOptimalSizeArray(requestToBeSent.getRequest(), this.maxMessageSize); + } + catch(PayloadSizeExceededException exception) + { + this.parent.exceptionallyCompleteRequest((String)requestToBeSent.getRequest().getMessageId(), new PayloadSizeExceededException(String.format("Size of the payload exceeded Maximum message size: %s kb", this.maxMessageSize / 1024), exception), false); + } + + try + { + int sentMsgSize = this.sendLink.send(encodedPair.getFirstItem(), 0, encodedPair.getSecondItem()); + assert sentMsgSize == encodedPair.getSecondItem() : "Contract of the ProtonJ library for Sender.Send API changed"; + delivery.settle(); + this.availableCredit.decrementAndGet(); + TRACE_LOGGER.debug("RequestResonseLink {} internal sender sent a request. available credit :{}", this.parent.linkPath, this.availableCredit.get()); + } + catch(Exception e) + { + TRACE_LOGGER.error("RequestResonseLink {} failed to send request with request id:{}.", this.parent.linkPath, requestIdToBeSent, e); + this.parent.exceptionallyCompleteRequest(requestIdToBeSent, e, false); + } + } + else + { + TRACE_LOGGER.warn("Request with id:{} not found in the requestresponse link.", requestIdToBeSent); + } + } + } + finally + { + synchronized (this.pendingSendsSyncLock) { + if(this.isSendLoopRunning) + { + this.isSendLoopRunning = false; + } + } + + TRACE_LOGGER.debug("RequestResponseLink {} internal sender send loop stopped.", this.parent.linkPath); + } } } -} +} \ No newline at end of file diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLinkCache.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLinkCache.java index 39c2d77d9601e..3dd81a7a08bdc 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLinkCache.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLinkCache.java @@ -12,17 +12,17 @@ class RequestResponseLinkCache { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(RequestResponseLinkCache.class); - + private Object lock = new Object(); - private final MessagingFactory underlyingFactory; + private final MessagingFactory underlyingFactory; private HashMap pathToRRLinkMap; - + public RequestResponseLinkCache(MessagingFactory underlyingFactory) { this.underlyingFactory = underlyingFactory; this.pathToRRLinkMap = new HashMap<>(); } - + public CompletableFuture obtainRequestResponseLinkAsync(String entityPath, String transferEntityPath, MessagingEntityType entityType) { RequestResponseLinkWrapper wrapper; @@ -35,7 +35,7 @@ public CompletableFuture obtainRequestResponseLinkAsync(Str { mapKey = entityPath; } - + synchronized (lock) { wrapper = this.pathToRRLinkMap.get(mapKey); @@ -47,7 +47,7 @@ public CompletableFuture obtainRequestResponseLinkAsync(Str } return wrapper.acquireReferenceAsync(); } - + public void releaseRequestResponseLink(String entityPath, String transferEntityPath) { String mapKey; @@ -70,7 +70,7 @@ public void releaseRequestResponseLink(String entityPath, String transferEntityP wrapper.releaseReference(); } } - + public CompletableFuture freeAsync() { TRACE_LOGGER.info("Closing all cached request-response links"); @@ -79,11 +79,11 @@ public CompletableFuture freeAsync() { closeFutures.add(wrapper.forceCloseAsync()); } - + this.pathToRRLinkMap.clear(); return CompletableFuture.allOf(closeFutures.toArray(new CompletableFuture[0])); } - + private void removeWrapperFromCache(String entityPath) { synchronized (lock) @@ -91,7 +91,7 @@ private void removeWrapperFromCache(String entityPath) this.pathToRRLinkMap.remove(entityPath); } } - + private class RequestResponseLinkWrapper { private Object lock = new Object(); @@ -102,7 +102,7 @@ private class RequestResponseLinkWrapper private RequestResponseLink requestResponseLink; private int referenceCount; private ArrayList> waiters; - + public RequestResponseLinkWrapper(MessagingFactory underlyingFactory, String entityPath, String transferEntityPath, MessagingEntityType entityType) { this.underlyingFactory = underlyingFactory; @@ -114,7 +114,7 @@ public RequestResponseLinkWrapper(MessagingFactory underlyingFactory, String ent this.waiters = new ArrayList<>(); this.createRequestResponseLinkAsync(); } - + private void createRequestResponseLinkAsync() { String requestResponseLinkPath = RequestResponseLink.getManagementNodeLinkPath(this.entityPath); @@ -161,11 +161,11 @@ private void createRequestResponseLinkAsync() } } } - + return null; }); } - + public CompletableFuture acquireReferenceAsync() { synchronized (this.lock) @@ -183,9 +183,9 @@ public CompletableFuture acquireReferenceAsync() } } } - + public void releaseReference() - { + { synchronized (this.lock) { if(--this.referenceCount == 0) @@ -196,11 +196,11 @@ public void releaseReference() } } } - + public CompletableFuture forceCloseAsync() { TRACE_LOGGER.info("Force closing requestresponselink to '{}'", this.requestResponseLink.getLinkPath()); return this.requestResponseLink.closeAsync(); } } -} +} \ No newline at end of file diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/TransportType.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/TransportType.java new file mode 100644 index 0000000000000..e4622b3f5c3e9 --- /dev/null +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/TransportType.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) Microsoft. All rights reserved. + * Licensed under the MIT license. See LICENSE file in the project root for full license information. + */ +package com.microsoft.azure.servicebus.primitives; + +/** + * All TransportType switches available for communicating to EventHubs service. + */ +public enum TransportType { + /** + * AMQP over TCP. Uses port 5671 - assigned by IANA for secure AMQP (AMQPS). + */ + AMQP("Amqp"), + + /** + * AMQP over Web Sockets. Uses port 443. + */ + AMQP_WEB_SOCKETS("AmqpWebSockets"); + + private final String value; + + TransportType(final String value) + { + this.value = value; + } + + @Override + public String toString() + { + return this.value; + } + + static TransportType fromString(final String value) + { + for (TransportType transportType : values()) + { + if (transportType.value.equalsIgnoreCase(value)) + { + return transportType; + } + } + + throw new IllegalArgumentException(); + } +} \ No newline at end of file diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/Util.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/Util.java index b3aa40810da37..3511e170b5e44 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/Util.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/Util.java @@ -447,7 +447,7 @@ public static ClientSettings getClientSettingsFromConnectionStringBuilder(Connec tokenProvider = new SharedAccessSignatureTokenProvider(builder.getSharedAccessSignatureToken(), Instant.now().plus(Duration.ofSeconds(SecurityConstants.DEFAULT_SAS_TOKEN_VALIDITY_IN_SECONDS))); } - return new ClientSettings(tokenProvider, builder.getRetryPolicy(), builder.getOperationTimeout()); + return new ClientSettings(tokenProvider, builder.getRetryPolicy(), builder.getOperationTimeout(), builder.getTransportType()); } static int getTokenRenewIntervalInSeconds(int tokenValidityInSeconds) diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ClientValidationTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ClientValidationTests.java index ac449a2d1a324..7eb00cb696725 100644 --- a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ClientValidationTests.java +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ClientValidationTests.java @@ -16,18 +16,18 @@ public class ClientValidationTests { - + private static final String ENTITY_NAME_PREFIX = "ClientValidationTests"; - + private static String queuePath; private static String sessionfulQueuePath; private static String topicPath; private static String subscriptionPath; private static String sessionfulSubscriptionPath; private static ManagementClientAsync managementClient; - + @BeforeClass - public static void createEntities() throws ExecutionException, InterruptedException { + public static void createEntities() throws ExecutionException, InterruptedException { // Create a queue, a topic and a subscription queuePath = TestUtils.randomizeEntityName(ENTITY_NAME_PREFIX); sessionfulQueuePath = TestUtils.randomizeEntityName(ENTITY_NAME_PREFIX); @@ -39,23 +39,23 @@ public static void createEntities() throws ExecutionException, InterruptedExcept QueueDescription queueDescription = new QueueDescription(queuePath); queueDescription.setEnablePartitioning(false); managementClient.createQueueAsync(queueDescription).get(); - + QueueDescription queueDescription2 = new QueueDescription(sessionfulQueuePath); queueDescription2.setEnablePartitioning(false); queueDescription2.setRequiresSession(true); managementClient.createQueueAsync(queueDescription2).get(); - + TopicDescription topicDescription = new TopicDescription(topicPath); topicDescription.setEnablePartitioning(false); managementClient.createTopicAsync(topicDescription).get(); SubscriptionDescription subDescription = new SubscriptionDescription(topicPath, TestUtils.FIRST_SUBSCRIPTION_NAME); subscriptionPath = subDescription.getPath(); - managementClient.createSubscriptionAsync(subDescription).get(); - SubscriptionDescription subDescription2 = new SubscriptionDescription(topicPath, "subscription2"); - subDescription2.setRequiresSession(true); + managementClient.createSubscriptionAsync(subDescription).get(); + SubscriptionDescription subDescription2 = new SubscriptionDescription(topicPath, "subscription2"); + subDescription2.setRequiresSession(true); sessionfulSubscriptionPath = subDescription2.getPath(); - managementClient.createSubscriptionAsync(subDescription2).get(); - } + managementClient.createSubscriptionAsync(subDescription2).get(); + } @AfterClass public static void deleteEntities() throws ExecutionException, InterruptedException, IOException { @@ -67,7 +67,7 @@ public static void deleteEntities() throws ExecutionException, InterruptedExcept @Test public void testTopicClientCreationToQueue() throws InterruptedException, ServiceBusException - { + { try { TopicClient tc = new TopicClient(TestUtils.getNamespaceEndpointURI(), queuePath, TestUtils.getManagementClientSettings()); @@ -80,15 +80,15 @@ public void testTopicClientCreationToQueue() throws InterruptedException, Servic finally { tc.close(); - } + } } catch (UnsupportedOperationException e) { // Expected } } - + @Test public void testQueueClientCreationToTopic() throws InterruptedException, ServiceBusException - { + { try { QueueClient qc = new QueueClient(TestUtils.getNamespaceEndpointURI(), topicPath, TestUtils.getManagementClientSettings(), ReceiveMode.PEEKLOCK); try { @@ -98,27 +98,27 @@ public void testQueueClientCreationToTopic() throws InterruptedException, Servic } finally { qc.close(); - } + } } catch (UnsupportedOperationException e) { // Expected } } - + @Test public void testQueueClientCreationToSubscription() throws InterruptedException, ServiceBusException - { + { try { QueueClient qc = new QueueClient(TestUtils.getNamespaceEndpointURI(), subscriptionPath, TestUtils.getManagementClientSettings(), ReceiveMode.PEEKLOCK); try { - qc.registerMessageHandler(new IMessageHandler() { + qc.registerMessageHandler(new IMessageHandler() { @Override public CompletableFuture onMessageAsync(IMessage message) { return CompletableFuture.completedFuture(null); } - + @Override - public void notifyException(Throwable exception, ExceptionPhase phase) { + public void notifyException(Throwable exception, ExceptionPhase phase) { } }); Assert.fail("QueueClient created to a subscription which shouldn't be allowed."); @@ -126,37 +126,37 @@ public void notifyException(Throwable exception, ExceptionPhase phase) { finally { qc.close(); - } + } } catch (UnsupportedOperationException e) { // Expected } } - + @Test public void testSubscriptionClientCreationToQueue() throws InterruptedException, ServiceBusException - { + { try { SubscriptionClient sc = new SubscriptionClient(TestUtils.getNamespaceEndpointURI(), queuePath, TestUtils.getManagementClientSettings(), ReceiveMode.PEEKLOCK); try { - sc.registerMessageHandler(new IMessageHandler() { + sc.registerMessageHandler(new IMessageHandler() { @Override public CompletableFuture onMessageAsync(IMessage message) { return CompletableFuture.completedFuture(null); } - + @Override - public void notifyException(Throwable exception, ExceptionPhase phase) { + public void notifyException(Throwable exception, ExceptionPhase phase) { } }); Assert.fail("SubscriptionClient created to a queue which shouldn't be allowed."); } finally { sc.close(); - } + } } catch (UnsupportedOperationException e) { // Expected } } - + @Test public void testQueueClientCreationToSessionfulSubscription() throws InterruptedException, ServiceBusException { @@ -165,12 +165,12 @@ public void testQueueClientCreationToSessionfulSubscription() throws Interrupted try { final AtomicBoolean unsupportedExceptionOccured = new AtomicBoolean(false); qc.registerSessionHandler(new ISessionHandler() { - + @Override public CompletableFuture onMessageAsync(IMessageSession session, IMessage message) { return CompletableFuture.completedFuture(null); } - + @Override public void notifyException(Throwable exception, ExceptionPhase phase) { if(exception instanceof UnsupportedOperationException && phase == ExceptionPhase.ACCEPTSESSION) @@ -178,23 +178,23 @@ public void notifyException(Throwable exception, ExceptionPhase phase) { unsupportedExceptionOccured.set(true); } } - + @Override public CompletableFuture OnCloseSessionAsync(IMessageSession session) { return CompletableFuture.completedFuture(null); } }); - - Thread.sleep(1000); // Sleep for a second for the exception + + Thread.sleep(1000); // Sleep for a second for the exception Assert.assertTrue("QueueClient created to a subscription which shouldn't be allowed.", unsupportedExceptionOccured.get()); } finally { qc.close(); - } + } } catch (UnsupportedOperationException e) { // Expected } } - + @Test public void testSubscriptionClientCreationToSessionfulQueue() throws InterruptedException, ServiceBusException { @@ -203,12 +203,12 @@ public void testSubscriptionClientCreationToSessionfulQueue() throws Interrupted try { final AtomicBoolean unsupportedExceptionOccured = new AtomicBoolean(false); sc.registerSessionHandler(new ISessionHandler() { - + @Override public CompletableFuture onMessageAsync(IMessageSession session, IMessage message) { return CompletableFuture.completedFuture(null); } - + @Override public void notifyException(Throwable exception, ExceptionPhase phase) { if(exception instanceof UnsupportedOperationException && phase == ExceptionPhase.ACCEPTSESSION) @@ -216,22 +216,22 @@ public void notifyException(Throwable exception, ExceptionPhase phase) { unsupportedExceptionOccured.set(true); } } - + @Override public CompletableFuture OnCloseSessionAsync(IMessageSession session) { return CompletableFuture.completedFuture(null); } }); - - Thread.sleep(1000); // Sleep for a second for the exception + + Thread.sleep(1000); // Sleep for a second for the exception Assert.assertTrue("SubscriptionClient created to a queue which shouldn't be allowed.", unsupportedExceptionOccured.get()); } finally { sc.close(); } - + } catch (UnsupportedOperationException e) { // Expected } } - -} + +} \ No newline at end of file diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SessionTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SessionTests.java index 438289c65106f..b3d0c07a913fb 100644 --- a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SessionTests.java +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SessionTests.java @@ -285,7 +285,7 @@ public void testAcceptSessionTimeoutShouldNotLockSession() throws InterruptedExc { ClientSettings commonClientSettings = TestUtils.getClientSettings(); // Timeout should be less than default session wait timeout on the service - ClientSettings shortTimeoutClientSettings = new ClientSettings(commonClientSettings.getTokenProvider(), commonClientSettings.getRetryPolicy(), Duration.ofSeconds(10)); + ClientSettings shortTimeoutClientSettings = new ClientSettings(commonClientSettings.getTokenProvider(), commonClientSettings.getRetryPolicy(), Duration.ofSeconds(10), null); try { this.session = ClientFactory.acceptSessionFromEntityPath(TestUtils.getNamespaceEndpointURI(), this.receiveEntityPath, null, shortTimeoutClientSettings, ReceiveMode.PEEKLOCK); diff --git a/pom.xml b/pom.xml index ec60c9903c199..03f4e21e36f75 100644 --- a/pom.xml +++ b/pom.xml @@ -1,22 +1,24 @@ azure-servicebus-parent - Java library for Azure Service Bus - 4.0.0 + Java library for Azure Service Bus + 4.0.0 com.microsoft.azure azure-servicebus-parent 2.0.0 pom - + https://github.com/Azure/azure-service-bus-java - + - 0.22.0 + 0.29.0 4.12 1.7.0 2.0.0-PREVIEW-5 + 1.0.0 + 1.2.7 - + azure-servicebus - + \ No newline at end of file