diff --git a/sdk/core/azure-core-amqp/CHANGELOG.md b/sdk/core/azure-core-amqp/CHANGELOG.md index d3970bb394945..afc08a6eba2b7 100644 --- a/sdk/core/azure-core-amqp/CHANGELOG.md +++ b/sdk/core/azure-core-amqp/CHANGELOG.md @@ -2,23 +2,33 @@ ## 2.1.0-beta.1 (Unreleased) +### New Features + +- Updates RetryUtil to use RetrySpec. +- Adds the ability to configure the `hostname` and `port` that will be used when connecting to a service via + `ConnectionOptions`. The `hostname` field refers to the DNS host or IP address of the service, whereas the + `fullyQualifiedNamespace` is the fully qualified host name of the service. + Normally `hostname` and `fullyQualifiedNamespace` will be the same. However, if your network does not allow + connecting to the service via the public host, you can specify a custom host (e.g. an application gateway) via the + `hostname` field and continue using the public host as the `fullyQualifiedNamespace`. ## 2.0.1 (2021-01-11) ### New Features -- Changed connections from sharing the global `Schedulers.single()` to having a `Scheduler.newSingle()` per connection to improve performance. +- Changed connections from sharing the global `Schedulers.single()` to having a `Scheduler.newSingle()` per connection + to improve performance. ## 2.0.0 (2020-11-30) ### New Features - Added 'AmqpAddress' as a type to support 'AmqpMessageProperties#replyTo' and 'AmqpMessageProperties#to' properties. -- Added 'AmqpMessageId' as a type to support 'AmqpMessageProperties#correlationId' and 'AmqpMessageProperties#messageId' +- Added 'AmqpMessageId' as a type to support 'AmqpMessageProperties#correlationId' and 'AmqpMessageProperties#messageId' properties. - Added static methods to instantiate 'AmqpMessageBody' for example 'AmqpMessageBody#fromData(byte[])'. ### Breaking Changes -- Changed 'AmqpMessageBody' from interface to a class. User can use 'getBodyType()' to know what is the 'AmqpBodyType' - of the message. +- Changed 'AmqpMessageBody' from an interface to a class. User can use 'getBodyType()' to know what is the + 'AmqpBodyType' of the message. - Changed type of 'correlationId' and 'messageId' in type 'AmqpMessageProperties' from 'String' to 'AmqpMessageId'. - Changed type of 'replyTo' and 'to' in type 'AmqpMessageProperties' from 'String' to 'AmqpAddress'. - Removed copy constructor for 'AmqpAnnotatedMessage'. @@ -29,8 +39,8 @@ ## 1.7.0-beta.2 (2020-11-10) ### New Features -- Optionally enable idempotency of a send link to send AMQP messages with producer group id, producer owner level and -producer sequence number in the message annotations. +- Optionally enable idempotency of a send link to send AMQP messages with producer group id, producer owner level and + producer sequence number in the message annotations. ## 1.7.0-beta.1 (2020-11-03) ### Dependency Updates @@ -40,7 +50,7 @@ producer sequence number in the message annotations. ### New Features - Added peer certificate verification options when connecting to an AMQP endpoint. ### Breaking Changes -- Removed `BinaryData` type which was used for `AmqpAnnotatedMessage`. +- Removed `BinaryData` type which was used for `AmqpAnnotatedMessage`. ### Dependency Updates - Upgraded `azure-core` dependency to `1.9.0`. @@ -154,4 +164,3 @@ producer sequence number in the message annotations. This package's [documentation](https://github.com/Azure/azure-sdk-for-java/blob/azure-core-amqp_1.0.0-preview.1/core/azure-core-amqp/README.md) - diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ConnectionOptions.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ConnectionOptions.java index c7f45e4b1710a..7f80ccc9024b8 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ConnectionOptions.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ConnectionOptions.java @@ -6,9 +6,12 @@ import com.azure.core.amqp.AmqpRetryOptions; import com.azure.core.amqp.AmqpTransportType; import com.azure.core.amqp.ProxyOptions; +import com.azure.core.amqp.implementation.handler.ConnectionHandler; +import com.azure.core.amqp.implementation.handler.WebSocketsConnectionHandler; import com.azure.core.annotation.Immutable; import com.azure.core.credential.TokenCredential; import com.azure.core.util.ClientOptions; +import com.azure.core.util.logging.ClientLogger; import org.apache.qpid.proton.engine.SslDomain; import reactor.core.scheduler.Scheduler; @@ -28,11 +31,63 @@ public class ConnectionOptions { private final CbsAuthorizationType authorizationType; private final ClientOptions clientOptions; private final SslDomain.VerifyMode verifyMode; + private final String hostname; + private final int port; + /** + * Creates an instance with the following options set. The AMQP connection is created to the + * {@code fullyQualifiedNamespace} using a port based on the {@code transport}. + * + * @param fullyQualifiedNamespace Fully qualified namespace for the AMQP broker. (ie. + * namespace.servicebus.windows.net) + * @param tokenCredential The credential for connecting to the AMQP broker. + * @param authorizationType The authorisation type used for authorizing with the CBS node. + * @param transport The type connection used for the AMQP connection. + * @param retryOptions Retry options for the connection. + * @param proxyOptions Any proxy options to set. + * @param scheduler Scheduler for async operations. + * @param clientOptions Client options for the connection. + * @param verifyMode How to verify SSL information. + * + * @throws NullPointerException in the case that {@code fullyQualifiedNamespace}, {@code tokenCredential}, + * {@code authorizationType}, {@code transport}, {@code retryOptions}, {@code scheduler}, {@code clientOptions} + * {@code proxyOptions} or {@code verifyMode} is null. + */ public ConnectionOptions(String fullyQualifiedNamespace, TokenCredential tokenCredential, - CbsAuthorizationType authorizationType, AmqpTransportType transport, AmqpRetryOptions retryOptions, - ProxyOptions proxyOptions, Scheduler scheduler, ClientOptions clientOptions, - SslDomain.VerifyMode verifyMode) { + CbsAuthorizationType authorizationType, AmqpTransportType transport, AmqpRetryOptions retryOptions, + ProxyOptions proxyOptions, Scheduler scheduler, ClientOptions clientOptions, + SslDomain.VerifyMode verifyMode) { + this(fullyQualifiedNamespace, tokenCredential, authorizationType, transport, retryOptions, + proxyOptions, scheduler, clientOptions, verifyMode, fullyQualifiedNamespace, getPort(transport)); + } + + /** + * Creates an instance with the connection options set. Used when an alternative address should be made for the + * connection rather than through the fullyQualifiedNamespace. + * + * @param fullyQualifiedNamespace Fully qualified namespace for the AMQP broker. (ie. + * namespace.servicebus.windows.net) + * @param tokenCredential The credential for connecting to the AMQP broker. + * @param authorizationType The authorisation type used for authorizing with the CBS node. + * @param transport The type connection used for the AMQP connection. + * @param retryOptions Retry options for the connection. + * @param proxyOptions (Optional) Any proxy options to set. + * @param scheduler Scheduler for async operations. + * @param clientOptions Client options for the connection. + * @param verifyMode How to verify SSL information. + * @param hostname Connection hostname. Used to create the connection to in the case that we cannot + * connect directly to the AMQP broker. + * @param port Connection port. Used to create the connection to in the case we cannot connect directly + * to the AMQP broker. + * + * @throws NullPointerException in the case that {@code fullyQualifiedNamespace}, {@code tokenCredential}, + * {@code authorizationType}, {@code transport}, {@code retryOptions}, {@code scheduler}, + * {@code clientOptions}, {@code hostname}, or {@code verifyMode} is null. + */ + public ConnectionOptions(String fullyQualifiedNamespace, TokenCredential tokenCredential, + CbsAuthorizationType authorizationType, AmqpTransportType transport, AmqpRetryOptions retryOptions, + ProxyOptions proxyOptions, Scheduler scheduler, ClientOptions clientOptions, + SslDomain.VerifyMode verifyMode, String hostname, int port) { this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace, "'fullyQualifiedNamespace' is required."); @@ -40,20 +95,38 @@ public ConnectionOptions(String fullyQualifiedNamespace, TokenCredential tokenCr this.authorizationType = Objects.requireNonNull(authorizationType, "'authorizationType' is required."); this.transport = Objects.requireNonNull(transport, "'transport' is required."); this.retryOptions = Objects.requireNonNull(retryOptions, "'retryOptions' is required."); - this.proxyOptions = Objects.requireNonNull(proxyOptions, "'proxyConfiguration' is required."); this.scheduler = Objects.requireNonNull(scheduler, "'scheduler' is required."); this.clientOptions = Objects.requireNonNull(clientOptions, "'clientOptions' is required."); this.verifyMode = Objects.requireNonNull(verifyMode, "'verifyMode' is required."); + this.hostname = Objects.requireNonNull(hostname, "'hostname' cannot be null."); + this.port = port != -1 ? port : getPort(transport); + this.proxyOptions = proxyOptions; } + /** + * Gets the authorisation type for the CBS node. + * + * @return The authorisation type for the CBS node. + */ public CbsAuthorizationType getAuthorizationType() { return authorizationType; } + /** + * Gets the client options. + * + * @return The client options. + */ public ClientOptions getClientOptions() { return clientOptions; } + /** + * The fully qualified domain name for the AMQP broker. Typically of the form + * {@literal ".service.windows.net"}. + * + * @return The fully qualified domain name for the AMQP broker. + */ public String getFullyQualifiedNamespace() { return fullyQualifiedNamespace; } @@ -62,23 +135,80 @@ public AmqpRetryOptions getRetry() { return retryOptions; } + /** + * Gets the proxy options set. + * + * @return The proxy options set. {@code null} if there are no options set. + */ public ProxyOptions getProxyOptions() { return proxyOptions; } + /** + * Gets the scheduler to execute tasks on. + * + * @return The scheduler to execute tasks on. + */ public Scheduler getScheduler() { return scheduler; } + /** + * Gets the verification mode for the SSL certificate. + * + * @return The verification mode for the SSL certificate. + */ public SslDomain.VerifyMode getSslVerifyMode() { return verifyMode; } + /** + * Gets the credential for authorising with Event Hubs. + * + * @return The credential for authorising with Event Hubs. + */ public TokenCredential getTokenCredential() { return tokenCredential; } + /** + * Gets the transport type for the AMQP connection. + * + * @return The transport type for the AMQP connection. + */ public AmqpTransportType getTransportType() { return transport; } + + /** + * Gets the DNS hostname or IP address of the service. Typically of the form + * {@literal ".service.windows.net"}, unless connecting to the service through an intermediary. + * + * @return The DNS hostname or IP address to connect to. + */ + public String getHostname() { + return hostname; + } + + /** + * Gets the port to connect to when creating the connection to the service. This is usually the port for the + * AMQP protocol or 443 for web sockets, but can differ if connecting through an intermediary. + * + * @return The port to connect to when creating the connection to the service. + */ + public int getPort() { + return port; + } + + private static int getPort(AmqpTransportType transport) { + switch (transport) { + case AMQP: + return ConnectionHandler.AMQPS_PORT; + case AMQP_WEB_SOCKETS: + return WebSocketsConnectionHandler.HTTPS_PORT; + default: + throw new ClientLogger(ConnectionOptions.class).logThrowableAsError( + new IllegalArgumentException("Transport Type is not supported: " + transport)); + } + } } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java index e95de0c66c5c6..eb76d9d94cdf8 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java @@ -97,10 +97,8 @@ public ReactorConnection(String connectionId, ConnectionOptions connectionOption this.tokenManagerProvider = Objects.requireNonNull(tokenManagerProvider, "'tokenManagerProvider' cannot be null."); this.messageSerializer = messageSerializer; - this.handler = handlerProvider.createConnectionHandler(connectionId, - connectionOptions.getFullyQualifiedNamespace(), connectionOptions.getTransportType(), - connectionOptions.getProxyOptions(), product, clientVersion, connectionOptions.getSslVerifyMode(), - connectionOptions.getClientOptions()); + this.handler = handlerProvider.createConnectionHandler(connectionId, product, clientVersion, + connectionOptions); this.retryPolicy = RetryUtil.getRetryPolicy(connectionOptions.getRetry()); this.senderSettleMode = senderSettleMode; diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorHandlerProvider.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorHandlerProvider.java index 6bf8146727ac5..30d211c6a75c2 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorHandlerProvider.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorHandlerProvider.java @@ -11,13 +11,12 @@ import com.azure.core.amqp.implementation.handler.SessionHandler; import com.azure.core.amqp.implementation.handler.WebSocketsConnectionHandler; import com.azure.core.amqp.implementation.handler.WebSocketsProxyConnectionHandler; -import com.azure.core.util.ClientOptions; import com.azure.core.util.logging.ClientLogger; -import org.apache.qpid.proton.engine.SslDomain; import org.apache.qpid.proton.reactor.Reactor; import java.time.Duration; import java.util.Locale; +import java.util.Objects; /** * Provides handlers for the various types of links. @@ -31,45 +30,68 @@ public class ReactorHandlerProvider { * generated handlers. * * @param provider The provider that creates and manages {@link Reactor} instances. + * + * @throws NullPointerException If {@code provider} is {@code null}. */ public ReactorHandlerProvider(ReactorProvider provider) { - this.provider = provider; + this.provider = Objects.requireNonNull(provider, "'provider' cannot be null."); } /** * Creates a new connection handler with the given {@code connectionId} and {@code hostname}. * * @param connectionId Identifier associated with this connection. - * @param hostname Host for the connection handler. - * @param transportType Transport type used for the connection. - * @param proxyOptions The options to use for proxy. - * @param product The name of the product this connection handler is created for. - * @param clientVersion The version of the client library creating the connection handler. - * @param clientOptions provided by user. + * @param options Options for the connection. * @return A new {@link ConnectionHandler}. + * + * @throws NullPointerException If {@code connectionId}, {@code productName}, {@code clientVersion}, + * {@code options} is {@code null}. */ - public ConnectionHandler createConnectionHandler(String connectionId, String hostname, - AmqpTransportType transportType, ProxyOptions proxyOptions, String product, String clientVersion, - SslDomain.VerifyMode verifyMode, ClientOptions clientOptions) { - switch (transportType) { - case AMQP: - return new ConnectionHandler(connectionId, hostname, product, clientVersion, verifyMode, clientOptions); - case AMQP_WEB_SOCKETS: - if (proxyOptions != null && proxyOptions.isProxyAddressConfigured()) { - return new WebSocketsProxyConnectionHandler(connectionId, hostname, proxyOptions, product, - clientVersion, verifyMode, clientOptions); - } else if (WebSocketsProxyConnectionHandler.shouldUseProxy(hostname)) { - logger.info("System default proxy configured for hostname '{}'. Using proxy.", hostname); - return new WebSocketsProxyConnectionHandler(connectionId, hostname, - ProxyOptions.SYSTEM_DEFAULTS, product, clientVersion, verifyMode, clientOptions); - } else { - return new WebSocketsConnectionHandler(connectionId, hostname, product, clientVersion, verifyMode, - clientOptions); - } - default: - throw logger.logExceptionAsWarning(new IllegalArgumentException(String.format(Locale.US, - "This transport type '%s' is not supported.", transportType))); + public ConnectionHandler createConnectionHandler(String connectionId, String productName, String clientVersion, + ConnectionOptions options) { + Objects.requireNonNull(connectionId, "'connectionId' cannot be null."); + Objects.requireNonNull(options, "'options' cannot be null."); + Objects.requireNonNull(productName, "'productName' cannot be null."); + Objects.requireNonNull(clientVersion, "'clientVersion' cannot be null."); + + if (options.getTransportType() == AmqpTransportType.AMQP) { + return new ConnectionHandler(connectionId, productName, clientVersion, options); + } + + if (options.getTransportType() != AmqpTransportType.AMQP_WEB_SOCKETS) { + throw logger.logExceptionAsWarning(new IllegalArgumentException(String.format(Locale.US, + "This transport type '%s' is not supported.", options.getTransportType()))); + } + + final boolean isCustomEndpointConfigured = !options.getFullyQualifiedNamespace().equals(options.getHostname()); + final boolean isUserProxyConfigured = options.getProxyOptions() != null + && options.getProxyOptions().isProxyAddressConfigured(); + final boolean isSystemProxyConfigured = WebSocketsProxyConnectionHandler.shouldUseProxy( + options.getFullyQualifiedNamespace(), options.getPort()); + + // TODO (conniey): See if we this is supported later on. + if (isCustomEndpointConfigured && (isUserProxyConfigured || isSystemProxyConfigured)) { + throw logger.logExceptionAsError(new UnsupportedOperationException(String.format( + "Unable to proxy connection to custom endpoint. Custom endpoint: %s. Proxy settings: %s. " + + "Namespace: %s", options.getHostname(), options.getProxyOptions().getProxyAddress(), + options.getFullyQualifiedNamespace()))); } + + if (isUserProxyConfigured) { + logger.info("Using user configured proxy to connect to: '{}:{}'. Proxy: {}", + options.getFullyQualifiedNamespace(), options.getPort(), options.getProxyOptions().getProxyAddress()); + + return new WebSocketsProxyConnectionHandler(connectionId, productName, clientVersion, options, + options.getProxyOptions()); + } else if (isSystemProxyConfigured) { + logger.info("System default proxy configured for hostname:port '{}:{}'. Using proxy.", + options.getFullyQualifiedNamespace(), options.getPort()); + + return new WebSocketsProxyConnectionHandler(connectionId, productName, clientVersion, options, + ProxyOptions.SYSTEM_DEFAULTS); + } + + return new WebSocketsConnectionHandler(connectionId, productName, clientVersion, options); } /** @@ -82,33 +104,37 @@ public ConnectionHandler createConnectionHandler(String connectionId, String hos * @return A new {@link SessionHandler}. */ public SessionHandler createSessionHandler(String connectionId, String hostname, String sessionName, - Duration openTimeout) { - return new SessionHandler(connectionId, hostname, sessionName, provider.getReactorDispatcher(), openTimeout); + Duration openTimeout) { + + return new SessionHandler(connectionId, hostname, sessionName, provider.getReactorDispatcher(), + openTimeout); } /** * Creates a new link handler for sending messages. * * @param connectionId Identifier of the parent connection that created this session. - * @param fullyQualifiedNamespace Fully qualified namespace of the parent connection. + * @param hostname Fully qualified namespace of the parent connection. * @param senderName Name of the send link. + * * @return A new {@link SendLinkHandler}. */ - public SendLinkHandler createSendLinkHandler(String connectionId, String fullyQualifiedNamespace, String senderName, - String entityPath) { - return new SendLinkHandler(connectionId, fullyQualifiedNamespace, senderName, entityPath); + public SendLinkHandler createSendLinkHandler(String connectionId, String hostname, + String senderName, String entityPath) { + return new SendLinkHandler(connectionId, hostname, senderName, entityPath); } /** * Creates a new link handler for receiving messages. * * @param connectionId Identifier of the parent connection that created this session. - * @param fullyQualifiedNamespace Fully qualified namespace of the parent connection. + * @param hostname Fully qualified namespace of the parent connection. * @param receiverName Name of the send link. * @return A new {@link ReceiveLinkHandler}. */ - public ReceiveLinkHandler createReceiveLinkHandler(String connectionId, String fullyQualifiedNamespace, - String receiverName, String entityPath) { - return new ReceiveLinkHandler(connectionId, fullyQualifiedNamespace, receiverName, entityPath); + public ReceiveLinkHandler createReceiveLinkHandler(String connectionId, String hostname, + String receiverName, String entityPath) { + + return new ReceiveLinkHandler(connectionId, hostname, receiverName, entityPath); } } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java index be85bf9a4246d..45cae61bdfad3 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java @@ -99,7 +99,7 @@ public ReactorSession(Session session, SessionHandler sessionHandler, String ses this.endpointStates = sessionHandler.getEndpointStates() .map(state -> { - logger.verbose("connectionId[{}], sessionName[{}]: State ", sessionHandler.getConnectionId(), + logger.verbose("connectionId[{}], sessionName[{}], state[{}]", sessionHandler.getConnectionId(), sessionName, state); return AmqpEndpointStateUtil.getConnectionState(state); }) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ConnectionHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ConnectionHandler.java index d38cca0aa4dad..a0a03606b0df9 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ConnectionHandler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ConnectionHandler.java @@ -5,6 +5,7 @@ import com.azure.core.amqp.exception.AmqpErrorContext; import com.azure.core.amqp.implementation.ClientConstants; +import com.azure.core.amqp.implementation.ConnectionOptions; import com.azure.core.amqp.implementation.ExceptionUtil; import com.azure.core.util.ClientOptions; import com.azure.core.util.CoreUtils; @@ -29,55 +30,54 @@ import java.util.Objects; /** - * Creates an AMQP connection using sockets and the default AMQP protocol port 5671. + * Creates an AMQP connection using sockets. */ public class ConnectionHandler extends Handler { + public static final int AMQPS_PORT = 5671; + static final Symbol PRODUCT = Symbol.valueOf("product"); static final Symbol VERSION = Symbol.valueOf("version"); static final Symbol PLATFORM = Symbol.valueOf("platform"); static final Symbol FRAMEWORK = Symbol.valueOf("framework"); static final Symbol USER_AGENT = Symbol.valueOf("user-agent"); - static final int AMQPS_PORT = 5671; static final int MAX_FRAME_SIZE = 65536; private final Map connectionProperties; private final ClientLogger logger = new ClientLogger(ConnectionHandler.class); - private final SslDomain.VerifyMode verifyMode; + private final ConnectionOptions connectionOptions; /** * Creates a handler that handles proton-j's connection events. * * @param connectionId Identifier for this connection. - * @param hostname Hostname of the AMQP message broker to create a connection to. - * @param product The name of the product this connection handler is created for. + * @param productName The name of the product this connection handler is created for. * @param clientVersion The version of the client library creating the connection handler. - * @param clientOptions provided by user. + * @param connectionOptions Options used when creating the AMQP connection. */ - public ConnectionHandler(final String connectionId, final String hostname, final String product, - final String clientVersion, final SslDomain.VerifyMode verifyMode, final ClientOptions clientOptions) { - super(connectionId, hostname); + public ConnectionHandler(final String connectionId, final String productName, final String clientVersion, + final ConnectionOptions connectionOptions) { + super(connectionId, + Objects.requireNonNull(connectionOptions, "'connectionOptions' cannot be null.").getHostname()); add(new Handshaker()); - Objects.requireNonNull(connectionId, "'connectionId' cannot be null."); - Objects.requireNonNull(hostname, "'hostname' cannot be null."); - Objects.requireNonNull(product, "'product' cannot be null."); + this.connectionOptions = connectionOptions; + + Objects.requireNonNull(productName, "'product' cannot be null."); Objects.requireNonNull(clientVersion, "'clientVersion' cannot be null."); - Objects.requireNonNull(verifyMode, "'verifyMode' cannot be null."); - Objects.requireNonNull(clientOptions, "'clientOptions' cannot be null."); - this.verifyMode = Objects.requireNonNull(verifyMode, "'verifyMode' cannot be null"); this.connectionProperties = new HashMap<>(); - this.connectionProperties.put(PRODUCT.toString(), product); + this.connectionProperties.put(PRODUCT.toString(), productName); this.connectionProperties.put(VERSION.toString(), clientVersion); this.connectionProperties.put(PLATFORM.toString(), ClientConstants.PLATFORM_INFO); this.connectionProperties.put(FRAMEWORK.toString(), ClientConstants.FRAMEWORK_INFO); + final ClientOptions clientOptions = connectionOptions.getClientOptions(); final String applicationId = !CoreUtils.isNullOrEmpty(clientOptions.getApplicationId()) ? clientOptions.getApplicationId() : null; - String userAgent = UserAgentUtil.toUserAgentString(applicationId, product, clientVersion, null); + String userAgent = UserAgentUtil.toUserAgentString(applicationId, productName, clientVersion, null); this.connectionProperties.put(USER_AGENT.toString(), userAgent); } @@ -96,7 +96,7 @@ public Map getConnectionProperties() { * @return The port used to open connection. */ public int getProtocolPort() { - return AMQPS_PORT; + return connectionOptions.getPort(); } /** @@ -108,10 +108,17 @@ public int getMaxFrameSize() { return MAX_FRAME_SIZE; } + /** + * Configures the SSL transport layer for the connection based on the {@link ConnectionOptions#getSslVerifyMode()}. + * + * @param event The proton-j event. + * @param transport Transport to add layers to. + */ protected void addTransportLayers(final Event event, final TransportInternal transport) { final SslDomain sslDomain = Proton.sslDomain(); sslDomain.init(SslDomain.Mode.CLIENT); + final SslDomain.VerifyMode verifyMode = connectionOptions.getSslVerifyMode(); final SSLContext defaultSslContext; if (verifyMode == SslDomain.VerifyMode.ANONYMOUS_PEER) { @@ -129,7 +136,10 @@ protected void addTransportLayers(final Event event, final TransportInternal tra final StrictTlsContextSpi serviceProvider = new StrictTlsContextSpi(defaultSslContext); final SSLContext context = new StrictTlsContext(serviceProvider, defaultSslContext.getProvider(), defaultSslContext.getProtocol()); - final SslPeerDetails peerDetails = Proton.sslPeerDetails(getHostname(), getProtocolPort()); + + final String theHostname = getHostname(); + final int theProtocol = getProtocolPort(); + final SslPeerDetails peerDetails = Proton.sslPeerDetails(theHostname, theProtocol); sslDomain.setSslContext(context); transport.ssl(sslDomain, peerDetails); @@ -151,12 +161,14 @@ protected void addTransportLayers(final Event event, final TransportInternal tra @Override public void onConnectionInit(Event event) { - logger.info("onConnectionInit hostname[{}], connectionId[{}]", getHostname(), getConnectionId()); + logger.info("onConnectionInit hostname[{}], connectionId[{}], amqpHostname[{}]", getHostname(), + getConnectionId(), connectionOptions.getFullyQualifiedNamespace()); final Connection connection = event.getConnection(); - final String hostName = getHostname() + ":" + getProtocolPort(); - connection.setHostname(hostName); + // Set the hostname of the AMQP message broker. This may be different from the actual underlying transport + // in the case we are using an intermediary to connect to Event Hubs. + connection.setHostname(connectionOptions.getFullyQualifiedNamespace()); connection.setContainer(getConnectionId()); final Map properties = new HashMap<>(); diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/Handler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/Handler.java index 4ddbde2218a9b..a1bbdb4fe5770 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/Handler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/Handler.java @@ -10,8 +10,12 @@ import reactor.core.publisher.ReplayProcessor; import java.io.Closeable; +import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; +/** + * Base class for all proton-j handlers. + */ public abstract class Handler extends BaseHandler implements Closeable { private final AtomicBoolean isTerminal = new AtomicBoolean(); private final ReplayProcessor endpointStateProcessor = @@ -20,19 +24,46 @@ public abstract class Handler extends BaseHandler implements Closeable { private final String connectionId; private final String hostname; + /** + * Creates an instance with the parameters. + * + * @param connectionId Identifier for the connection. + * @param hostname Hostname of the connection. This could be the DNS hostname or the IP address of the + * connection. Usually of the form {@literal ".service.windows.net"} but can change if the + * messages are brokered through an intermediary. + * + * @throws NullPointerException if {@code connectionId} or {@code hostname} is null. + */ Handler(final String connectionId, final String hostname) { - this.connectionId = connectionId; - this.hostname = hostname; + this.connectionId = Objects.requireNonNull(connectionId, "'connectionId' cannot be null."); + this.hostname = Objects.requireNonNull(hostname, "'hostname' cannot be null."); } + /** + * Gets the connection id. + * + * @return The connection id. + */ public String getConnectionId() { return connectionId; } + /** + * Gets the hostname of the AMQP connection. This could be the DNS hostname or the IP address of the connection. + * Usually of the form {@literal ".service.windows.net"} but can change if the messages are + * brokered through an intermediary. + * + * @return Gets the hostname of the AMQP connection. + */ public String getHostname() { return hostname; } + /** + * Gets the endpoint states of the handler. + * + * @return The endpoint states of the handler. + */ public Flux getEndpointStates() { return endpointStateProcessor.distinct(); } @@ -50,6 +81,10 @@ void onError(Throwable error) { endpointSink.error(error); } + /** + * Changes the endpoint to {@link EndpointState#CLOSED} and completes the stream of {@link #getEndpointStates() + * endpoint states}. + */ @Override public void close() { if (isTerminal.getAndSet(true)) { diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/WebSocketsConnectionHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/WebSocketsConnectionHandler.java index a93ceb85c9643..eb07317611335 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/WebSocketsConnectionHandler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/WebSocketsConnectionHandler.java @@ -3,18 +3,17 @@ package com.azure.core.amqp.implementation.handler; -import com.azure.core.util.ClientOptions; +import com.azure.core.amqp.implementation.ConnectionOptions; import com.azure.core.util.logging.ClientLogger; import com.microsoft.azure.proton.transport.ws.impl.WebSocketImpl; import org.apache.qpid.proton.engine.Event; -import org.apache.qpid.proton.engine.SslDomain; import org.apache.qpid.proton.engine.impl.TransportInternal; /** * Creates an AMQP connection using web sockets (port 443). */ public class WebSocketsConnectionHandler extends ConnectionHandler { - static final int HTTPS_PORT = 443; + public static final int HTTPS_PORT = 443; // This is the current limitation of https://github.com/Azure/qpid-proton-j-extensions. // Once this library enables larger frames - this property can be removed. @@ -28,20 +27,26 @@ public class WebSocketsConnectionHandler extends ConnectionHandler { * Creates a handler that handles proton-j's connection events using web sockets. * * @param connectionId Identifier for this connection. - * @param hostname Hostname to use for socket creation. - * @param product The name of the product this connection handler is created for. + * @param productName The name of the product this connection handler is created for. * @param clientVersion The version of the client library creating the connection handler. - * @param clientOptions provided by the user. + * @param connectionOptions Options used when creating the connection. */ - public WebSocketsConnectionHandler(final String connectionId, final String hostname, final String product, - final String clientVersion, final SslDomain.VerifyMode verifyMode, final ClientOptions clientOptions) { - super(connectionId, hostname, product, clientVersion, verifyMode, clientOptions); + public WebSocketsConnectionHandler(final String connectionId, final String productName, final String clientVersion, + final ConnectionOptions connectionOptions) { + super(connectionId, productName, clientVersion, connectionOptions); } + /** + * Adds a web sockets layer before adding additional connection layers (ie. SSL). + * + * @param event The proton-j event. + * @param transport Transport to add layers to. + */ @Override protected void addTransportLayers(final Event event, final TransportInternal transport) { final String hostName = event.getConnection().getHostname(); + logger.info("Adding web socket layer"); final WebSocketImpl webSocket = new WebSocketImpl(); webSocket.configure( hostName, @@ -60,11 +65,6 @@ protected void addTransportLayers(final Event event, final TransportInternal tra super.addTransportLayers(event, transport); } - @Override - public int getProtocolPort() { - return HTTPS_PORT; - } - @Override public int getMaxFrameSize() { return MAX_FRAME_SIZE; diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/WebSocketsProxyConnectionHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/WebSocketsProxyConnectionHandler.java index 4932d78c24cfd..a1a599f4fa009 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/WebSocketsProxyConnectionHandler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/WebSocketsProxyConnectionHandler.java @@ -6,18 +6,16 @@ import com.azure.core.amqp.ProxyAuthenticationType; import com.azure.core.amqp.ProxyOptions; import com.azure.core.amqp.implementation.AmqpErrorCode; -import com.azure.core.util.ClientOptions; +import com.azure.core.amqp.implementation.ConnectionOptions; import com.azure.core.util.CoreUtils; import com.azure.core.util.logging.ClientLogger; import com.microsoft.azure.proton.transport.proxy.ProxyHandler; import com.microsoft.azure.proton.transport.proxy.impl.ProxyHandlerImpl; import com.microsoft.azure.proton.transport.proxy.impl.ProxyImpl; -import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.transport.ConnectionError; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Event; -import org.apache.qpid.proton.engine.SslDomain; import org.apache.qpid.proton.engine.Transport; import org.apache.qpid.proton.engine.impl.TransportInternal; @@ -26,57 +24,78 @@ import java.net.Proxy; import java.net.ProxySelector; import java.net.URI; -import java.util.HashMap; import java.util.List; import java.util.Locale; -import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; /** - * Creates an AMQP connection using web sockets (port 443) and connects through a proxy. + * Creates an AMQP connection using web sockets and connects through a proxy. */ public class WebSocketsProxyConnectionHandler extends WebSocketsConnectionHandler { private static final String HTTPS_URI_FORMAT = "https://%s:%s"; - private static final String PROXY_SELECTOR_HAS_BEEN_MODIFIED = "ProxySelector has been modified."; private final ClientLogger logger = new ClientLogger(WebSocketsProxyConnectionHandler.class); - private final String amqpHostname; - private final String remoteHost; + private final InetSocketAddress connectionHostname; private final ProxyOptions proxyOptions; + private final String fullyQualifiedNamespace; + private final String amqpBrokerHostname; /** * Creates a handler that handles proton-j's connection through a proxy using web sockets. + * The hostname of the proxy is exposed in {@link #getHostname()}. * * @param connectionId Identifier for this connection. - * @param amqpHostname Hostname of the AMQP message broker. The hostname of the proxy is exposed in {@link - * #getHostname()}. * @param proxyOptions The options to use for proxy. - * @param product The name of the product this connection handler is created for. + * @param productName The name of the product this connection handler is created for. * @param clientVersion The version of the client library creating the connection handler. - * @param clientOptions provided by the user. + * @param connectionOptions Options used when creating the connection. * * @throws NullPointerException if {@code amqpHostname} or {@code proxyConfiguration} is null. + * @throws IllegalStateException if a proxy address is unavailable for the given {@code proxyOptions}. */ - public WebSocketsProxyConnectionHandler(String connectionId, String amqpHostname, ProxyOptions proxyOptions, - String product, String clientVersion, SslDomain.VerifyMode verifyMode, ClientOptions clientOptions) { - super(connectionId, amqpHostname, product, clientVersion, verifyMode, clientOptions); + public WebSocketsProxyConnectionHandler(String connectionId, String productName, String clientVersion, + ConnectionOptions connectionOptions, ProxyOptions proxyOptions) { + super(connectionId, productName, clientVersion, connectionOptions); - this.amqpHostname = Objects.requireNonNull(amqpHostname, "'amqpHostname' cannot be null."); this.proxyOptions = Objects.requireNonNull(proxyOptions, "'proxyConfiguration' cannot be null."); - this.remoteHost = amqpHostname + ":" + HTTPS_PORT; + this.fullyQualifiedNamespace = connectionOptions.getFullyQualifiedNamespace(); + this.amqpBrokerHostname = connectionOptions.getFullyQualifiedNamespace() + ":" + connectionOptions.getPort(); + + if (proxyOptions.isProxyAddressConfigured()) { + this.connectionHostname = (InetSocketAddress) proxyOptions.getProxyAddress().address(); + } else { + final URI serviceUri = createURI(connectionOptions.getHostname(), connectionOptions.getPort()); + final ProxySelector proxySelector = ProxySelector.getDefault(); + if (proxySelector == null) { + throw logger.logExceptionAsError(new IllegalStateException("ProxySelector should not be null.")); + } + + final List proxies = proxySelector.select(serviceUri); + if (!isProxyAddressLegal(proxies)) { + final String formatted = String.format("No proxy address found for: '%s'. Available: %s.", + serviceUri, proxies.stream().map(Proxy::toString).collect(Collectors.joining(", "))); + + throw logger.logExceptionAsError(new IllegalStateException(formatted)); + } + + final Proxy proxy = proxies.get(0); + this.connectionHostname = (InetSocketAddress) proxy.address(); + } } /** * Looks through system defined proxies to see if one should be used for connecting to the message broker. * - * @param hostname Hostname for the AMQP message broker. + * @param hostname Hostname for the AMQP connection. + * @param port Port to connect to. * * @return {@code true} if a proxy should be used to connect to the AMQP message broker and null otherwise. */ - public static boolean shouldUseProxy(final String hostname) { + public static boolean shouldUseProxy(final String hostname, final int port) { Objects.requireNonNull(hostname, "'hostname' cannot be null."); - final URI uri = createURI(hostname, HTTPS_PORT); + final URI uri = createURI(hostname, port); final ProxySelector proxySelector = ProxySelector.getDefault(); if (proxySelector == null) { return false; @@ -86,31 +105,24 @@ public static boolean shouldUseProxy(final String hostname) { return isProxyAddressLegal(proxies); } - @Override - public void onConnectionInit(Event event) { - final Connection connection = event.getConnection(); - logger.info("onConnectionInit host[{}], connectionId[{}]", remoteHost, getConnectionId()); - - connection.setHostname(remoteHost); - connection.setContainer(getConnectionId()); - - final Map properties = new HashMap<>(); - getConnectionProperties().forEach((key, value) -> properties.put(Symbol.getSymbol(key), value)); - - connection.setProperties(properties); - connection.open(); - } - + /** + * Gets the hostname for the proxy. + * + * @return The hostname for the proxy. + */ @Override public String getHostname() { - final InetSocketAddress socketAddress = getProxyAddress(); - return socketAddress.getHostString(); + return connectionHostname.getHostString(); } + /** + * Gets the port for the proxy. + * + * @return The port for the proxy. + */ @Override public int getProtocolPort() { - final InetSocketAddress socketAddress = getProxyAddress(); - return socketAddress.getPort(); + return connectionHostname.getPort(); } @Override @@ -120,27 +132,31 @@ public void onTransportError(Event event) { final Transport transport = event.getTransport(); final Connection connection = event.getConnection(); if (connection == null || transport == null) { + logger.verbose("connectionId[{}] There is no connection or transport associated with error. Event: {}", + event); return; } final ErrorCondition errorCondition = transport.getCondition(); if (errorCondition == null || !(errorCondition.getCondition().equals(ConnectionError.FRAMING_ERROR) || errorCondition.getCondition().equals(AmqpErrorCode.PROTON_IO_ERROR))) { + logger.verbose("connectionId[{}] There is no error condition and these are not framing errors. Error: {}", + errorCondition); return; } - final String hostName = event.getReactor().getConnectionAddress(connection); - final ProxySelector proxySelector = ProxySelector.getDefault(); - final boolean isProxyConfigured = proxySelector != null - || (proxyOptions != null && proxyOptions.isProxyAddressConfigured()); + final String hostname = event.getReactor().getConnectionAddress(connection); - if (!isProxyConfigured || CoreUtils.isNullOrEmpty(hostName)) { + // If the proxy is not configured, or we are not connected to a host yet. + if (proxyOptions == null || CoreUtils.isNullOrEmpty(hostname)) { + logger.verbose("connectionId[{}] Proxy is not configured and there is no host connected. Error: {}", + errorCondition); return; } - final String[] hostNameParts = hostName.split(":"); + final String[] hostNameParts = hostname.split(":"); if (hostNameParts.length != 2) { - logger.warning("connectionId[{}] Invalid hostname: {}", getConnectionId(), hostName); + logger.warning("connectionId[{}] Invalid hostname: {}", getConnectionId(), hostname); return; } @@ -156,12 +172,13 @@ public void onTransportError(Event event) { // it swallows the IOException and translates it to proton-io errorCode // we reconstruct the IOException in this case - but, callstack is lost final IOException ioException = new IOException(errorCondition.getDescription()); - final URI url = createURI(amqpHostname, HTTPS_PORT); + final URI url = createURI(fullyQualifiedNamespace, port); final InetSocketAddress address = new InetSocketAddress(hostNameParts[0], port); logger.error("connectionId[{}] Failed to connect to url: '{}', proxy host: '{}'", getConnectionId(), url, address.getHostString(), ioException); + final ProxySelector proxySelector = ProxySelector.getDefault(); if (proxySelector != null) { proxySelector.connectFailed(url, address, ioException); } @@ -177,35 +194,14 @@ protected void addTransportLayers(final Event event, final TransportInternal tra ? new ProxyImpl(getProtonConfiguration()) : new ProxyImpl(); - // host name used to create proxy connect request + // host name used to create proxy connect request must contain a port number. // after creating the socket to proxy - final String hostname = event.getConnection().getHostname(); final ProxyHandler proxyHandler = new ProxyHandlerImpl(); - proxy.configure(hostname, null, proxyHandler, transport); + proxy.configure(amqpBrokerHostname, null, proxyHandler, transport); transport.addTransportLayer(proxy); - logger.info("connectionId[{}] addProxyHandshake: hostname[{}]", getConnectionId(), hostname); - } - - private InetSocketAddress getProxyAddress() { - if (proxyOptions != null && proxyOptions.isProxyAddressConfigured()) { - return (InetSocketAddress) proxyOptions.getProxyAddress().address(); - } - - final URI serviceUri = createURI(amqpHostname, HTTPS_PORT); - final ProxySelector proxySelector = ProxySelector.getDefault(); - if (proxySelector == null) { - throw logger.logExceptionAsError(new IllegalStateException(PROXY_SELECTOR_HAS_BEEN_MODIFIED)); - } - - final List proxies = proxySelector.select(serviceUri); - if (!isProxyAddressLegal(proxies)) { - throw logger.logExceptionAsError(new IllegalStateException(PROXY_SELECTOR_HAS_BEEN_MODIFIED)); - } - - final Proxy proxy = proxies.get(0); - return (InetSocketAddress) proxy.address(); + logger.info("connectionId[{}] addProxyHandshake: hostname[{}]", getConnectionId(), amqpBrokerHostname); } private com.microsoft.azure.proton.transport.proxy.ProxyConfiguration getProtonConfiguration() { diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java index fd9e3bf5ab835..99c199df169b1 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java @@ -61,7 +61,7 @@ class ReactorConnectionTest { private static final ConnectionStringProperties CREDENTIAL_INFO = new ConnectionStringProperties("Endpoint=sb" + "://test-event-hub.servicebus.windows.net/;SharedAccessKeyName=dummySharedKeyName;" + "SharedAccessKey=dummySharedKeyValue;EntityPath=eventhub1;"); - private static final String HOSTNAME = CREDENTIAL_INFO.getEndpoint().getHost(); + private static final String FULLY_QUALIFIED_NAMESPACE = CREDENTIAL_INFO.getEndpoint().getHost(); private static final Scheduler SCHEDULER = Schedulers.elastic(); private static final String PRODUCT = "test"; private static final String CLIENT_VERSION = "1.0.0-test"; @@ -113,11 +113,11 @@ void setup() throws IOException { tokenProvider, CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, AmqpTransportType.AMQP, retryOptions, ProxyOptions.SYSTEM_DEFAULTS, SCHEDULER, clientOptions, VERIFY_MODE); - connectionHandler = new ConnectionHandler(CONNECTION_ID, HOSTNAME, PRODUCT, CLIENT_VERSION, - VERIFY_MODE, clientOptions); + connectionHandler = new ConnectionHandler(CONNECTION_ID, PRODUCT, CLIENT_VERSION, connectionOptions); when(reactor.selectable()).thenReturn(selectable); - when(reactor.connectionToHost(HOSTNAME, connectionHandler.getProtocolPort(), connectionHandler)) + when(reactor.connectionToHost(FULLY_QUALIFIED_NAMESPACE, connectionHandler.getProtocolPort(), + connectionHandler)) .thenReturn(connectionProtonJ); when(reactor.process()).thenReturn(true); @@ -126,12 +126,11 @@ void setup() throws IOException { when(reactorProvider.getReactorDispatcher()).thenReturn(reactorDispatcher); when(reactorProvider.createReactor(CONNECTION_ID, connectionHandler.getMaxFrameSize())).thenReturn(reactor); - when(reactorHandlerProvider.createConnectionHandler(CONNECTION_ID, HOSTNAME, - connectionOptions.getTransportType(), connectionOptions.getProxyOptions(), PRODUCT, CLIENT_VERSION, - VERIFY_MODE, connectionOptions.getClientOptions())) + when(reactorHandlerProvider.createConnectionHandler(CONNECTION_ID, PRODUCT, CLIENT_VERSION, connectionOptions)) .thenReturn(connectionHandler); - sessionHandler = new SessionHandler(CONNECTION_ID, HOSTNAME, SESSION_NAME, reactorDispatcher, TEST_DURATION); + sessionHandler = new SessionHandler(CONNECTION_ID, FULLY_QUALIFIED_NAMESPACE, SESSION_NAME, reactorDispatcher, + TEST_DURATION); when(reactorHandlerProvider.createSessionHandler(anyString(), anyString(), anyString(), any(Duration.class))) .thenReturn(sessionHandler); @@ -159,7 +158,7 @@ void createConnection() { // Assert Assertions.assertNotNull(connection); Assertions.assertEquals(CONNECTION_ID, connection.getId()); - Assertions.assertEquals(HOSTNAME, connection.getFullyQualifiedNamespace()); + Assertions.assertEquals(FULLY_QUALIFIED_NAMESPACE, connection.getFullyQualifiedNamespace()); Assertions.assertEquals(connectionHandler.getMaxFrameSize(), connection.getMaxFrameSize()); @@ -276,7 +275,7 @@ void onConnectionStateOpen() { // Arrange final Event event = mock(Event.class); when(event.getConnection()).thenReturn(connectionProtonJ); - when(connectionProtonJ.getHostname()).thenReturn(HOSTNAME); + when(connectionProtonJ.getHostname()).thenReturn(FULLY_QUALIFIED_NAMESPACE); when(connectionProtonJ.getRemoteContainer()).thenReturn("remote-container"); when(connectionProtonJ.getRemoteState()).thenReturn(EndpointState.ACTIVE); @@ -286,8 +285,8 @@ void onConnectionStateOpen() { .then(() -> connectionHandler.onConnectionRemoteOpen(event)) .expectNext(AmqpEndpointState.ACTIVE) // getConnectionStates is distinct. We don't expect to see another event with the same status. - .then(() -> connectionHandler.onConnectionRemoteOpen(event)) .then(() -> { + connectionHandler.onConnectionRemoteOpen(event); connection.dispose(); }) .verifyComplete(); @@ -306,9 +305,8 @@ void createCBSNode() { // Act and Assert StepVerifier.create(this.connection.getClaimsBasedSecurityNode()) - .assertNext(node -> { - Assertions.assertTrue(node instanceof ClaimsBasedSecurityChannel); - }).verifyComplete(); + .assertNext(node -> Assertions.assertTrue(node instanceof ClaimsBasedSecurityChannel)) + .verifyComplete(); } /** @@ -323,32 +321,31 @@ void createCBSNodeTimeoutException() { .setDelay(Duration.ofMillis(200)) .setMode(AmqpRetryMode.FIXED) .setTryTimeout(timeout); - final ConnectionOptions parameters = new ConnectionOptions(CREDENTIAL_INFO.getEndpoint().getHost(), + final ConnectionOptions connectionOptions = new ConnectionOptions(CREDENTIAL_INFO.getEndpoint().getHost(), tokenProvider, CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, AmqpTransportType.AMQP, retryOptions, ProxyOptions.SYSTEM_DEFAULTS, Schedulers.parallel(), clientOptions, VERIFY_MODE); - final ConnectionHandler handler = new ConnectionHandler(CONNECTION_ID, HOSTNAME, PRODUCT, CLIENT_VERSION, - VERIFY_MODE, clientOptions); + final ConnectionHandler handler = new ConnectionHandler(CONNECTION_ID, PRODUCT, CLIENT_VERSION, + connectionOptions); final ReactorHandlerProvider provider = mock(ReactorHandlerProvider.class); - when(provider.createConnectionHandler(CONNECTION_ID, HOSTNAME, parameters.getTransportType(), - parameters.getProxyOptions(), PRODUCT, CLIENT_VERSION, VERIFY_MODE, clientOptions)) + when(provider.createConnectionHandler(CONNECTION_ID, PRODUCT, CLIENT_VERSION, connectionOptions)) .thenReturn(handler); - when(provider.createSessionHandler(CONNECTION_ID, HOSTNAME, SESSION_NAME, timeout)).thenReturn(sessionHandler); + when(provider.createSessionHandler(CONNECTION_ID, FULLY_QUALIFIED_NAMESPACE, SESSION_NAME, timeout)) + .thenReturn(sessionHandler); - when(reactor.connectionToHost(HOSTNAME, handler.getProtocolPort(), handler)).thenReturn(connectionProtonJ); + when(reactor.connectionToHost(FULLY_QUALIFIED_NAMESPACE, handler.getProtocolPort(), handler)) + .thenReturn(connectionProtonJ); // Act and Assert - final ReactorConnection connectionBad = new ReactorConnection(CONNECTION_ID, parameters, reactorProvider, + final ReactorConnection connectionBad = new ReactorConnection(CONNECTION_ID, connectionOptions, reactorProvider, provider, tokenManager, messageSerializer, PRODUCT, CLIENT_VERSION, SenderSettleMode.SETTLED, ReceiverSettleMode.FIRST); try { StepVerifier.create(connectionBad.getClaimsBasedSecurityNode()) - .expectErrorSatisfies(error -> { - Assertions.assertTrue(error instanceof TimeoutException - || error.getCause() instanceof TimeoutException); - }) + .expectErrorSatisfies(error -> Assertions.assertTrue(error instanceof TimeoutException + || error.getCause() instanceof TimeoutException)) .verify(); } finally { connectionBad.dispose(); @@ -370,7 +367,7 @@ void cannotCreateResourcesOnFailure() { when(event.getTransport()).thenReturn(transport); when(event.getConnection()).thenReturn(connectionProtonJ); when(transport.getCondition()).thenReturn(errorCondition); - when(connectionProtonJ.getHostname()).thenReturn(HOSTNAME); + when(connectionProtonJ.getHostname()).thenReturn(FULLY_QUALIFIED_NAMESPACE); when(connectionProtonJ.getRemoteContainer()).thenReturn("remote-container"); when(connectionProtonJ.getRemoteState()).thenReturn(EndpointState.ACTIVE); @@ -395,4 +392,40 @@ void cannotCreateResourcesOnFailure() { verify(transport, times(1)).unbind(); } + + /** + * Verifies that if we use the custom endpoint, it will return the correct properties. + */ + @Test + void setsPropertiesUsingCustomEndpoint() throws IOException { + final String connectionId = "new-connection-id"; + final String hostname = "custom-endpoint.com"; + final int port = 10002; + final ConnectionOptions connectionOptions = new ConnectionOptions(CREDENTIAL_INFO.getEndpoint().getHost(), + tokenProvider, CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, AmqpTransportType.AMQP, + new AmqpRetryOptions(), ProxyOptions.SYSTEM_DEFAULTS, SCHEDULER, clientOptions, VERIFY_MODE, hostname, + port); + + final ConnectionHandler connectionHandler = new ConnectionHandler(connectionId, PRODUCT, CLIENT_VERSION, + connectionOptions); + + when(reactor.connectionToHost(hostname, port, connectionHandler)).thenReturn(connectionProtonJ); + + final ReactorDispatcher reactorDispatcher = new ReactorDispatcher(reactor); + when(reactorProvider.getReactor()).thenReturn(reactor); + when(reactorProvider.getReactorDispatcher()).thenReturn(reactorDispatcher); + when(reactorProvider.createReactor(connectionId, connectionHandler.getMaxFrameSize())).thenReturn(reactor); + + when(reactorHandlerProvider.createConnectionHandler(CONNECTION_ID, PRODUCT, CLIENT_VERSION, connectionOptions)) + .thenReturn(connectionHandler); + + final SessionHandler sessionHandler = new SessionHandler(connectionId, FULLY_QUALIFIED_NAMESPACE, SESSION_NAME, + reactorDispatcher, TEST_DURATION); + when(reactorHandlerProvider.createSessionHandler(anyString(), anyString(), anyString(), any(Duration.class))) + .thenReturn(sessionHandler); + + connection = new ReactorConnection(CONNECTION_ID, connectionOptions, reactorProvider, reactorHandlerProvider, + tokenManager, messageSerializer, PRODUCT, CLIENT_VERSION, SenderSettleMode.SETTLED, + ReceiverSettleMode.FIRST); + } } diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorHandlerProviderTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorHandlerProviderTest.java index ba157d40944c0..44980f8c69579 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorHandlerProviderTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorHandlerProviderTest.java @@ -3,12 +3,14 @@ package com.azure.core.amqp.implementation; +import com.azure.core.amqp.AmqpRetryOptions; import com.azure.core.amqp.AmqpTransportType; import com.azure.core.amqp.ProxyAuthenticationType; import com.azure.core.amqp.ProxyOptions; import com.azure.core.amqp.implementation.handler.ConnectionHandler; import com.azure.core.amqp.implementation.handler.WebSocketsConnectionHandler; import com.azure.core.amqp.implementation.handler.WebSocketsProxyConnectionHandler; +import com.azure.core.credential.TokenCredential; import com.azure.core.util.ClientOptions; import org.apache.qpid.proton.engine.SslDomain; import org.apache.qpid.proton.reactor.Reactor; @@ -17,29 +19,37 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; +import reactor.core.scheduler.Scheduler; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Proxy; import java.net.ProxySelector; +import java.net.URI; import java.util.Collections; import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; +/** + * Tests {@link ReactorHandlerProvider}. + */ public class ReactorHandlerProviderTest { private static final String CONNECTION_ID = "test-connection-id"; - private static final String HOSTNAME = "my-hostname.windows.com"; + private static final String FULLY_QUALIFIED_DOMAIN_NAME = "my-hostname.windows.com"; + private static final String HOSTNAME = "my fake hostname"; + private static final int PORT = 1003; private static final InetSocketAddress PROXY_ADDRESS = InetSocketAddress.createUnresolved("foo.proxy.com", 3138); private static final Proxy PROXY = new Proxy(Proxy.Type.HTTP, PROXY_ADDRESS); private static final String USERNAME = "test-user"; @@ -53,6 +63,10 @@ public class ReactorHandlerProviderTest { private Reactor reactor; @Mock private ReactorProvider reactorProvider; + @Mock + private TokenCredential tokenCredential; + @Mock + private Scheduler scheduler; private ReactorHandlerProvider provider; private ProxySelector originalProxySelector; @@ -87,14 +101,53 @@ public void teardown() { } @Test - public void getsConnectionHandlerAMQP() { + public void constructorNull() { + // Act + assertThrows(NullPointerException.class, () -> new ReactorHandlerProvider(null)); + } + + @Test + public void connectionHandlerNull() { + // Arrange + final ConnectionOptions connectionOptions = new ConnectionOptions("fqdn", tokenCredential, + CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, AmqpTransportType.AMQP_WEB_SOCKETS, + new AmqpRetryOptions(), null, scheduler, CLIENT_OPTIONS, VERIFY_MODE); + + // Act + assertThrows(NullPointerException.class, + () -> provider.createConnectionHandler(null, HOSTNAME, CLIENT_VERSION, connectionOptions)); + assertThrows(NullPointerException.class, + () -> provider.createConnectionHandler(CONNECTION_ID, null, CLIENT_VERSION, connectionOptions)); + assertThrows(NullPointerException.class, + () -> provider.createConnectionHandler(CONNECTION_ID, HOSTNAME, null, connectionOptions)); + assertThrows(NullPointerException.class, + () -> provider.createConnectionHandler(CONNECTION_ID, HOSTNAME, CLIENT_VERSION, null)); + } + + public static Stream getHostnameAndPorts() { + return Stream.of( + Arguments.of(FULLY_QUALIFIED_DOMAIN_NAME, -1, FULLY_QUALIFIED_DOMAIN_NAME, ConnectionHandler.AMQPS_PORT), + Arguments.of(HOSTNAME, PORT, HOSTNAME, PORT) + ); + } + + @MethodSource("getHostnameAndPorts") + @ParameterizedTest + public void getsConnectionHandlerAMQP(String hostname, int port, String expectedHostname, int expectedPort) { // Act - final ConnectionHandler handler = provider.createConnectionHandler(CONNECTION_ID, HOSTNAME, - AmqpTransportType.AMQP, null, PRODUCT, CLIENT_VERSION, VERIFY_MODE, CLIENT_OPTIONS); + + final ConnectionOptions connectionOptions = new ConnectionOptions(FULLY_QUALIFIED_DOMAIN_NAME, tokenCredential, + CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, AmqpTransportType.AMQP, + new AmqpRetryOptions(), ProxyOptions.SYSTEM_DEFAULTS, scheduler, CLIENT_OPTIONS, VERIFY_MODE, hostname, + port); + + final ConnectionHandler handler = provider.createConnectionHandler(CONNECTION_ID, PRODUCT, CLIENT_VERSION, + connectionOptions); // Assert Assertions.assertNotNull(handler); - Assertions.assertEquals(5671, handler.getProtocolPort()); + Assertions.assertEquals(expectedHostname, handler.getHostname()); + Assertions.assertEquals(expectedPort, handler.getProtocolPort()); } /** @@ -104,11 +157,15 @@ public void getsConnectionHandlerAMQP() { @MethodSource("getProxyConfigurations") public void getsConnectionHandlerWebSockets(ProxyOptions configuration) { // Act - final ConnectionHandler handler = provider.createConnectionHandler(CONNECTION_ID, HOSTNAME, - AmqpTransportType.AMQP_WEB_SOCKETS, configuration, PRODUCT, CLIENT_VERSION, VERIFY_MODE, CLIENT_OPTIONS); + final ConnectionOptions connectionOptions = new ConnectionOptions(FULLY_QUALIFIED_DOMAIN_NAME, tokenCredential, + CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, AmqpTransportType.AMQP_WEB_SOCKETS, + new AmqpRetryOptions(), configuration, scheduler, CLIENT_OPTIONS, VERIFY_MODE); + + // Act + final ConnectionHandler handler = provider.createConnectionHandler(CONNECTION_ID, PRODUCT, CLIENT_VERSION, + connectionOptions); // Assert - Assertions.assertNotNull(handler); Assertions.assertTrue(handler instanceof WebSocketsConnectionHandler); Assertions.assertEquals(443, handler.getProtocolPort()); } @@ -121,20 +178,70 @@ public void getsConnectionHandlerProxy() { // Arrange final InetSocketAddress address = InetSocketAddress.createUnresolved("my-new.proxy.com", 8888); final Proxy newProxy = new Proxy(Proxy.Type.HTTP, address); - final ProxyOptions configuration = new ProxyOptions(ProxyAuthenticationType.BASIC, newProxy, USERNAME, PASSWORD); + final ProxyOptions configuration = new ProxyOptions(ProxyAuthenticationType.BASIC, newProxy, USERNAME, + PASSWORD); final String hostname = "foo.eventhubs.azure.com"; + final ConnectionOptions connectionOptions = new ConnectionOptions(hostname, tokenCredential, + CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, AmqpTransportType.AMQP_WEB_SOCKETS, + new AmqpRetryOptions(), configuration, scheduler, CLIENT_OPTIONS, VERIFY_MODE); // Act - final ConnectionHandler handler = provider.createConnectionHandler(CONNECTION_ID, hostname, - AmqpTransportType.AMQP_WEB_SOCKETS, configuration, PRODUCT, CLIENT_VERSION, VERIFY_MODE, CLIENT_OPTIONS); + final ConnectionHandler handler = provider.createConnectionHandler(CONNECTION_ID, PRODUCT, CLIENT_VERSION, + connectionOptions); // Assert Assertions.assertNotNull(handler); Assertions.assertTrue(handler instanceof WebSocketsProxyConnectionHandler); Assertions.assertEquals(address.getHostName(), handler.getHostname()); Assertions.assertEquals(address.getPort(), handler.getProtocolPort()); + } + + public static Stream getsConnectionHandlerSystemProxy() { + return Stream.of( + Arguments.of("foo.eventhubs.azure.com", WebSocketsProxyConnectionHandler.HTTPS_PORT, + PROXY_ADDRESS.getHostName(), PROXY_ADDRESS.getPort()), + Arguments.of("foo.eventhubs.azure.com", 8882, "my-new2.proxy.com", 8888) + ); + } + + /** + * Verify that we use the system proxy. + */ + @MethodSource + @ParameterizedTest + public void getsConnectionHandlerSystemProxy(String hostname, Integer port, String expectedHostname, + int expectedPort) { + // Arrange + final InetSocketAddress address = InetSocketAddress.createUnresolved("my-new2.proxy.com", 8888); + final Proxy newProxy = new Proxy(Proxy.Type.HTTP, address); + + final String fullyQualifiedDomainName = "foo.eventhubs.azure.com"; + final ConnectionOptions connectionOptions = new ConnectionOptions(fullyQualifiedDomainName, tokenCredential, + CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, AmqpTransportType.AMQP_WEB_SOCKETS, + new AmqpRetryOptions(), null, scheduler, CLIENT_OPTIONS, VERIFY_MODE, hostname, port); + + when(proxySelector.select(any())).thenAnswer(invocation -> { + final URI uri = invocation.getArgument(0); + if (fullyQualifiedDomainName.equals(uri.getHost()) && uri.getPort() == WebSocketsConnectionHandler.HTTPS_PORT) { + return Collections.singletonList(PROXY); + } + + if (uri.getHost().equals(hostname) && uri.getPort() == port) { + return Collections.singletonList(newProxy); + } - verifyNoInteractions(proxySelector); + return Collections.emptyList(); + }); + + // Act + final ConnectionHandler handler = provider.createConnectionHandler(CONNECTION_ID, PRODUCT, CLIENT_VERSION, + connectionOptions); + + // Assert + Assertions.assertNotNull(handler); + Assertions.assertTrue(handler instanceof WebSocketsProxyConnectionHandler); + Assertions.assertEquals(expectedHostname, handler.getHostname()); + Assertions.assertEquals(expectedPort, handler.getProtocolPort()); } /** @@ -145,12 +252,19 @@ public void getsConnectionHandlerProxy() { public void noProxySelected(ProxyOptions configuration) { // Arrange final String hostname = "foo.eventhubs.azure.com"; - when(proxySelector.select(argThat(u -> u.getHost().equals(hostname)))) + + // The default port used for the first ConnectionOptions constructor is the default HTTPS_PORT. + when(proxySelector.select(argThat(u -> u.getHost().equals(hostname) + && u.getPort() == WebSocketsConnectionHandler.HTTPS_PORT))) .thenReturn(Collections.singletonList(PROXY)); + final ConnectionOptions connectionOptions = new ConnectionOptions(hostname, tokenCredential, + CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, AmqpTransportType.AMQP_WEB_SOCKETS, + new AmqpRetryOptions(), configuration, scheduler, CLIENT_OPTIONS, VERIFY_MODE); + // Act - final ConnectionHandler handler = provider.createConnectionHandler(CONNECTION_ID, hostname, - AmqpTransportType.AMQP_WEB_SOCKETS, configuration, PRODUCT, CLIENT_VERSION, VERIFY_MODE, CLIENT_OPTIONS); + final ConnectionHandler handler = provider.createConnectionHandler(CONNECTION_ID, PRODUCT, CLIENT_VERSION, + connectionOptions); // Act and Assert Assertions.assertEquals(PROXY_ADDRESS.getHostName(), handler.getHostname()); @@ -163,27 +277,30 @@ public void noProxySelected(ProxyOptions configuration) { @Test public void shouldUseProxyNoLegalProxyAddress() { // Arrange - final String host = "foo.eventhubs.azure.com"; + final String hostname = "foo.eventhubs.azure.com"; + final int port = 1000; - when(proxySelector.select(argThat(u -> u.getHost().equals(host)))) + when(proxySelector.select(argThat(u -> u.getHost().equals(hostname) && u.getPort() == port))) .thenReturn(Collections.emptyList()); // Act and Assert - Assertions.assertFalse(WebSocketsProxyConnectionHandler.shouldUseProxy(host)); + Assertions.assertFalse(WebSocketsProxyConnectionHandler.shouldUseProxy(hostname, port)); } @Test public void shouldUseProxyHostNull() { - assertThrows(NullPointerException.class, () -> WebSocketsProxyConnectionHandler.shouldUseProxy(null)); + assertThrows(NullPointerException.class, () -> WebSocketsProxyConnectionHandler.shouldUseProxy(null, 111)); } @Test public void shouldUseProxyNullProxySelector() { // Arrange final String host = "foo.eventhubs.azure.com"; + final int port = 1000; + ProxySelector.setDefault(null); // Act and Assert - Assertions.assertFalse(WebSocketsProxyConnectionHandler.shouldUseProxy(host)); + Assertions.assertFalse(WebSocketsProxyConnectionHandler.shouldUseProxy(host, port)); } } diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/ConnectionHandlerTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/ConnectionHandlerTest.java index 7323c656e41e4..7a76ac72b9041 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/ConnectionHandlerTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/ConnectionHandlerTest.java @@ -3,7 +3,13 @@ package com.azure.core.amqp.implementation.handler; +import com.azure.core.amqp.AmqpRetryOptions; +import com.azure.core.amqp.AmqpTransportType; +import com.azure.core.amqp.ProxyOptions; +import com.azure.core.amqp.implementation.CbsAuthorizationType; import com.azure.core.amqp.implementation.ClientConstants; +import com.azure.core.amqp.implementation.ConnectionOptions; +import com.azure.core.credential.TokenCredential; import com.azure.core.util.ClientOptions; import com.azure.core.util.UserAgentUtil; import org.apache.qpid.proton.amqp.Symbol; @@ -18,8 +24,10 @@ import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.mockito.Captor; +import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; +import reactor.core.scheduler.Scheduler; import reactor.test.StepVerifier; import java.util.HashMap; @@ -36,7 +44,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -45,51 +52,63 @@ public class ConnectionHandlerTest { private static final ClientOptions CLIENT_OPTIONS = new ClientOptions().setApplicationId(APPLICATION_ID); private static final String CONNECTION_ID = "some-random-id"; private static final String HOSTNAME = "hostname-random"; - private ConnectionHandler handler; private static final String PRODUCT = "test"; private static final String CLIENT_VERSION = "1.0.0-test"; private static final SslDomain.VerifyMode VERIFY_MODE = SslDomain.VerifyMode.VERIFY_PEER_NAME; @Captor private ArgumentCaptor> argumentCaptor; + @Mock + private TokenCredential tokenCredential; + @Mock + private Scheduler scheduler; + + private ConnectionOptions connectionOptions; + private ConnectionHandler handler; @BeforeEach public void setup() { MockitoAnnotations.initMocks(this); - handler = new ConnectionHandler(CONNECTION_ID, HOSTNAME, PRODUCT, CLIENT_VERSION, VERIFY_MODE, CLIENT_OPTIONS); + + this.connectionOptions = new ConnectionOptions(HOSTNAME, tokenCredential, + CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, AmqpTransportType.AMQP, new AmqpRetryOptions(), + ProxyOptions.SYSTEM_DEFAULTS, scheduler, CLIENT_OPTIONS, VERIFY_MODE); + this.handler = new ConnectionHandler(CONNECTION_ID, PRODUCT, CLIENT_VERSION, connectionOptions); } @AfterEach public void teardown() { + if (handler != null) { + handler.close(); + } + Mockito.framework().clearInlineMocks(); argumentCaptor = null; } @Test void constructorNull() { - assertThrows(NullPointerException.class, () -> new ConnectionHandler( - null, HOSTNAME, PRODUCT, CLIENT_VERSION, VERIFY_MODE, CLIENT_OPTIONS)); - assertThrows(NullPointerException.class, () -> new ConnectionHandler( - CONNECTION_ID, null, PRODUCT, CLIENT_VERSION, VERIFY_MODE, CLIENT_OPTIONS)); - assertThrows(NullPointerException.class, () -> new ConnectionHandler( - CONNECTION_ID, HOSTNAME, null, CLIENT_VERSION, VERIFY_MODE, CLIENT_OPTIONS)); - assertThrows(NullPointerException.class, () -> new ConnectionHandler( - CONNECTION_ID, HOSTNAME, PRODUCT, null, VERIFY_MODE, CLIENT_OPTIONS)); - assertThrows(NullPointerException.class, () -> new ConnectionHandler( - CONNECTION_ID, HOSTNAME, PRODUCT, CLIENT_VERSION, null, CLIENT_OPTIONS)); - assertThrows(NullPointerException.class, () -> new ConnectionHandler( - CONNECTION_ID, HOSTNAME, PRODUCT, CLIENT_VERSION, VERIFY_MODE, null)); + assertThrows(NullPointerException.class, () -> new ConnectionHandler(null, PRODUCT, + CLIENT_VERSION, connectionOptions)); + assertThrows(NullPointerException.class, () -> new ConnectionHandler(CONNECTION_ID, null, + CLIENT_VERSION, connectionOptions)); + assertThrows(NullPointerException.class, () -> new ConnectionHandler(CONNECTION_ID, PRODUCT, + null, connectionOptions)); + assertThrows(NullPointerException.class, () -> new ConnectionHandler(CONNECTION_ID, PRODUCT, + CLIENT_VERSION, null)); } @Test void applicationIdNotSet() { // Arrange - final ClientOptions options = new ClientOptions(); + final ClientOptions clientOptions = new ClientOptions(); final String expected = UserAgentUtil.toUserAgentString(null, PRODUCT, CLIENT_VERSION, null); + final ConnectionOptions options = new ConnectionOptions(HOSTNAME, tokenCredential, + CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, AmqpTransportType.AMQP, new AmqpRetryOptions(), + ProxyOptions.SYSTEM_DEFAULTS, scheduler, clientOptions, VERIFY_MODE); // Act - final ConnectionHandler handler = new ConnectionHandler(CONNECTION_ID, HOSTNAME, PRODUCT, CLIENT_VERSION, - VERIFY_MODE, options); + final ConnectionHandler handler = new ConnectionHandler(CONNECTION_ID, PRODUCT, CLIENT_VERSION, options); // Assert final String userAgent = (String) handler.getConnectionProperties().get(USER_AGENT.toString()); @@ -99,12 +118,12 @@ void applicationIdNotSet() { @Test void applicationIdSet() { // Arrange - final ClientOptions options = new ClientOptions().setApplicationId("my-application-id"); - final String expected = UserAgentUtil.toUserAgentString(options.getApplicationId(), PRODUCT, CLIENT_VERSION, null); + final String expected = UserAgentUtil.toUserAgentString(CLIENT_OPTIONS.getApplicationId(), PRODUCT, + CLIENT_VERSION, null); // Act - final ConnectionHandler handler = new ConnectionHandler(CONNECTION_ID, HOSTNAME, PRODUCT, CLIENT_VERSION, - VERIFY_MODE, options); + final ConnectionHandler handler = new ConnectionHandler(CONNECTION_ID, PRODUCT, CLIENT_VERSION, + connectionOptions); // Assert final String userAgent = (String) handler.getConnectionProperties().get(USER_AGENT.toString()); @@ -166,7 +185,6 @@ void addsSslLayer() { @Test void onConnectionInit() { // Arrange - final String expectedHostname = String.join(":", HOSTNAME, String.valueOf(AMQPS_PORT)); final Map expectedProperties = new HashMap<>(handler.getConnectionProperties()); final Connection connection = mock(Connection.class); final Event event = mock(Event.class); @@ -176,9 +194,9 @@ void onConnectionInit() { handler.onConnectionInit(event); // Assert - verify(connection, times(1)).setHostname(expectedHostname); - verify(connection, times(1)).setContainer(CONNECTION_ID); - verify(connection, times(1)).open(); + verify(connection).setHostname(HOSTNAME); + verify(connection).setContainer(CONNECTION_ID); + verify(connection).open(); verify(connection).setProperties(argumentCaptor.capture()); Map actualProperties = argumentCaptor.getValue(); diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/WebSocketsConnectionHandlerTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/WebSocketsConnectionHandlerTest.java index 2bf5047d64aac..724a2a24b4ad9 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/WebSocketsConnectionHandlerTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/WebSocketsConnectionHandlerTest.java @@ -3,7 +3,13 @@ package com.azure.core.amqp.implementation.handler; +import com.azure.core.amqp.AmqpRetryOptions; +import com.azure.core.amqp.AmqpTransportType; +import com.azure.core.amqp.ProxyOptions; +import com.azure.core.amqp.implementation.CbsAuthorizationType; import com.azure.core.amqp.implementation.ClientConstants; +import com.azure.core.amqp.implementation.ConnectionOptions; +import com.azure.core.credential.TokenCredential; import com.azure.core.util.ClientOptions; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.engine.Connection; @@ -18,8 +24,10 @@ import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.mockito.Captor; +import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; +import reactor.core.scheduler.Scheduler; import reactor.test.StepVerifier; import java.util.HashMap; @@ -31,7 +39,6 @@ import static com.azure.core.amqp.implementation.handler.WebSocketsConnectionHandler.MAX_FRAME_SIZE; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -39,24 +46,37 @@ public class WebSocketsConnectionHandlerTest { private static final ClientOptions CLIENT_OPTIONS = new ClientOptions(); private static final String CONNECTION_ID = "some-random-id"; private static final String HOSTNAME = "hostname-random"; - private WebSocketsConnectionHandler handler; private static final String PRODUCT = "test"; private static final String CLIENT_VERSION = "1.0.0-test"; private static final SslDomain.VerifyMode VERIFY_MODE = SslDomain.VerifyMode.VERIFY_PEER_NAME; + private WebSocketsConnectionHandler handler; + private ConnectionOptions connectionOptions; + @Captor ArgumentCaptor> argumentCaptor; + @Mock + private TokenCredential tokenCredential; + @Mock + private Scheduler scheduler; @BeforeEach public void setup() { MockitoAnnotations.initMocks(this); - handler = new WebSocketsConnectionHandler(CONNECTION_ID, HOSTNAME, PRODUCT, CLIENT_VERSION, VERIFY_MODE, - CLIENT_OPTIONS); + + this.connectionOptions = new ConnectionOptions(HOSTNAME, tokenCredential, + CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, AmqpTransportType.AMQP_WEB_SOCKETS, new AmqpRetryOptions(), + ProxyOptions.SYSTEM_DEFAULTS, scheduler, CLIENT_OPTIONS, VERIFY_MODE); + this.handler = new WebSocketsConnectionHandler(CONNECTION_ID, PRODUCT, CLIENT_VERSION, connectionOptions); } @AfterEach public void teardown() { + if (handler != null) { + handler.close(); + } + Mockito.framework().clearInlineMocks(); argumentCaptor = null; } @@ -69,8 +89,9 @@ public void createHandler() { expected.put(FRAMEWORK.toString(), ClientConstants.FRAMEWORK_INFO); // Assert - Assertions.assertEquals(HOSTNAME, handler.getHostname()); + Assertions.assertEquals(connectionOptions.getHostname(), handler.getHostname()); Assertions.assertEquals(MAX_FRAME_SIZE, handler.getMaxFrameSize()); + Assertions.assertEquals(HTTPS_PORT, connectionOptions.getPort()); Assertions.assertEquals(HTTPS_PORT, handler.getProtocolPort()); final Map properties = handler.getConnectionProperties(); @@ -110,7 +131,6 @@ public void addsSslLayer() { @Test public void onConnectionInit() { // Arrange - final String expectedHostname = String.join(":", HOSTNAME, String.valueOf(HTTPS_PORT)); final Map expectedProperties = new HashMap<>(handler.getConnectionProperties()); final Connection connection = mock(Connection.class); final Event event = mock(Event.class); @@ -120,9 +140,9 @@ public void onConnectionInit() { handler.onConnectionInit(event); // Assert - verify(connection, times(1)).setHostname(expectedHostname); - verify(connection, times(1)).setContainer(CONNECTION_ID); - verify(connection, times(1)).open(); + verify(connection).setHostname(HOSTNAME); + verify(connection).setContainer(CONNECTION_ID); + verify(connection).open(); verify(connection).setProperties(argumentCaptor.capture()); Map actualProperties = argumentCaptor.getValue(); @@ -138,4 +158,53 @@ public void onConnectionInit() { }); Assertions.assertTrue(actualProperties.isEmpty()); } + + /** + * verifies that with a different connection endpoint, we still use the correct remote host. + */ + @Test + public void onConnectionInitDifferentEndpoint() { + // Arrange + final Map expectedProperties = new HashMap<>(handler.getConnectionProperties()); + final Connection connection = mock(Connection.class); + final Event event = mock(Event.class); + when(event.getConnection()).thenReturn(connection); + + final String fullyQualifiedNamespace = "foo.servicebus.windows.net"; + final String customEndpoint = "internal.service.net"; + final int port = 9888; + + final ConnectionOptions connectionOptions = new ConnectionOptions(fullyQualifiedNamespace, tokenCredential, + CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, AmqpTransportType.AMQP_WEB_SOCKETS, new AmqpRetryOptions(), + ProxyOptions.SYSTEM_DEFAULTS, scheduler, CLIENT_OPTIONS, VERIFY_MODE, customEndpoint, port); + + try (WebSocketsConnectionHandler handler = new WebSocketsConnectionHandler(CONNECTION_ID, PRODUCT, + CLIENT_VERSION, connectionOptions)) { + + // Act + handler.onConnectionInit(event); + + // Assert + verify(connection).setHostname(connectionOptions.getFullyQualifiedNamespace()); + verify(connection).setContainer(CONNECTION_ID); + verify(connection).open(); + + verify(connection).setProperties(argumentCaptor.capture()); + Map actualProperties = argumentCaptor.getValue(); + Assertions.assertEquals(expectedProperties.size(), actualProperties.size()); + expectedProperties.forEach((key, value) -> { + final Symbol symbol = Symbol.getSymbol(key); + final Object removed = actualProperties.remove(symbol); + Assertions.assertNotNull(removed); + + final String expected = String.valueOf(value); + final String actual = String.valueOf(removed); + Assertions.assertEquals(expected, actual); + }); + Assertions.assertTrue(actualProperties.isEmpty()); + + Assertions.assertEquals(port, connectionOptions.getPort()); + Assertions.assertEquals(port, handler.getProtocolPort()); + } + } } diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/WebSocketsProxyConnectionHandlerTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/WebSocketsProxyConnectionHandlerTest.java index ba4df1e81ce22..3d76e41ee7463 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/WebSocketsProxyConnectionHandlerTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/WebSocketsProxyConnectionHandlerTest.java @@ -3,15 +3,23 @@ package com.azure.core.amqp.implementation.handler; +import com.azure.core.amqp.AmqpRetryOptions; +import com.azure.core.amqp.AmqpTransportType; import com.azure.core.amqp.ProxyAuthenticationType; import com.azure.core.amqp.ProxyOptions; +import com.azure.core.amqp.implementation.CbsAuthorizationType; +import com.azure.core.amqp.implementation.ConnectionOptions; +import com.azure.core.credential.TokenCredential; import com.azure.core.util.ClientOptions; import org.apache.qpid.proton.engine.SslDomain; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mock; import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import reactor.core.scheduler.Scheduler; import java.net.InetSocketAddress; import java.net.Proxy; @@ -22,7 +30,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; @@ -44,33 +51,50 @@ public class WebSocketsProxyConnectionHandlerTest { private ProxySelector originalProxySelector; private ProxySelector proxySelector; + @Mock + private TokenCredential tokenCredential; + @Mock + private Scheduler scheduler; + + private ConnectionOptions connectionOptions; + private WebSocketsProxyConnectionHandler handler; + /** * Creates mocks of the proxy selector and authenticator and sets them as defaults. */ @BeforeEach public void setup() { - originalProxySelector = ProxySelector.getDefault(); + MockitoAnnotations.initMocks(this); + + this.connectionOptions = new ConnectionOptions(HOSTNAME, tokenCredential, + CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, AmqpTransportType.AMQP, new AmqpRetryOptions(), + ProxyOptions.SYSTEM_DEFAULTS, scheduler, CLIENT_OPTIONS, VERIFY_MODE); - proxySelector = mock(ProxySelector.class, Mockito.CALLS_REAL_METHODS); + this.originalProxySelector = ProxySelector.getDefault(); + this.proxySelector = mock(ProxySelector.class, Mockito.CALLS_REAL_METHODS); ProxySelector.setDefault(proxySelector); } @AfterEach public void teardown() { + if (handler != null) { + handler.close(); + } + ProxySelector.setDefault(originalProxySelector); Mockito.framework().clearInlineMocks(); } @Test - public void nullProxyConfiguration() { - assertThrows(NullPointerException.class, () -> new WebSocketsProxyConnectionHandler(CONNECTION_ID, HOSTNAME, - null, PRODUCT, CLIENT_VERSION, VERIFY_MODE, CLIENT_OPTIONS)); - } - - @Test - public void nullHostname() { - assertThrows(NullPointerException.class, () -> new WebSocketsProxyConnectionHandler(CONNECTION_ID, null, - PROXY_CONFIGURATION, PRODUCT, CLIENT_VERSION, VERIFY_MODE, CLIENT_OPTIONS)); + public void constructorNull() { + assertThrows(NullPointerException.class, () -> new WebSocketsProxyConnectionHandler(CONNECTION_ID, + PRODUCT, CLIENT_VERSION, connectionOptions, null)); + assertThrows(NullPointerException.class, () -> new WebSocketsProxyConnectionHandler(null, + PRODUCT, CLIENT_VERSION, connectionOptions, PROXY_CONFIGURATION)); + assertThrows(NullPointerException.class, () -> new WebSocketsProxyConnectionHandler(CONNECTION_ID, + PRODUCT, CLIENT_VERSION, connectionOptions, null)); + assertThrows(NullPointerException.class, () -> new WebSocketsProxyConnectionHandler(CONNECTION_ID, + PRODUCT, CLIENT_VERSION, connectionOptions, null)); } /** @@ -82,12 +106,14 @@ public void noProxySelected() { when(proxySelector.select(argThat(u -> u.getHost().equals(HOSTNAME)))) .thenReturn(Collections.singletonList(PROXY)); - final WebSocketsProxyConnectionHandler handler = new WebSocketsProxyConnectionHandler(CONNECTION_ID, HOSTNAME, - PROXY_CONFIGURATION, PRODUCT, CLIENT_VERSION, VERIFY_MODE, CLIENT_OPTIONS); + this.handler = new WebSocketsProxyConnectionHandler(CONNECTION_ID, PRODUCT, CLIENT_VERSION, connectionOptions, + PROXY_CONFIGURATION); // Act and Assert Assertions.assertEquals(PROXY_ADDRESS.getHostName(), handler.getHostname()); Assertions.assertEquals(PROXY_ADDRESS.getPort(), handler.getProtocolPort()); + + handler.close(); } /** @@ -99,15 +125,14 @@ public void systemProxyConfigurationSelected() { when(proxySelector.select(argThat(u -> u.getHost().equals(HOSTNAME)))) .thenReturn(Collections.singletonList(PROXY)); - final WebSocketsProxyConnectionHandler handler = new WebSocketsProxyConnectionHandler(CONNECTION_ID, HOSTNAME, - ProxyOptions.SYSTEM_DEFAULTS, PRODUCT, CLIENT_VERSION, VERIFY_MODE, CLIENT_OPTIONS); + this.handler = new WebSocketsProxyConnectionHandler(CONNECTION_ID, PRODUCT, CLIENT_VERSION, connectionOptions, + ProxyOptions.SYSTEM_DEFAULTS); // Act and Assert Assertions.assertEquals(PROXY_ADDRESS.getHostName(), handler.getHostname()); Assertions.assertEquals(PROXY_ADDRESS.getPort(), handler.getProtocolPort()); - verify(proxySelector, times(2)) - .select(argThat(u -> u.getHost().equals(HOSTNAME))); + verify(proxySelector).select(argThat(u -> u.getHost().equals(HOSTNAME))); } /** @@ -118,13 +143,13 @@ public void proxyConfigurationSelected() { // Arrange final InetSocketAddress address = InetSocketAddress.createUnresolved("my-new.proxy.com", 8888); final Proxy newProxy = new Proxy(Proxy.Type.HTTP, address); - final ProxyOptions configuration = new ProxyOptions(ProxyAuthenticationType.BASIC, newProxy, USERNAME, PASSWORD); - final String host = "foo.eventhubs.azure.com"; + final ProxyOptions proxyOptions = new ProxyOptions(ProxyAuthenticationType.BASIC, newProxy, USERNAME, + PASSWORD); when(proxySelector.select(any())).thenReturn(Collections.singletonList(PROXY)); - final WebSocketsProxyConnectionHandler handler = new WebSocketsProxyConnectionHandler(CONNECTION_ID, host, - configuration, PRODUCT, CLIENT_VERSION, VERIFY_MODE, CLIENT_OPTIONS); + this.handler = new WebSocketsProxyConnectionHandler(CONNECTION_ID, PRODUCT, CLIENT_VERSION, connectionOptions, + proxyOptions); // Act and Assert Assertions.assertEquals(address.getHostName(), handler.getHostname()); @@ -136,27 +161,29 @@ public void proxyConfigurationSelected() { @Test public void shouldUseProxyNoLegalProxyAddress() { // Arrange - final String host = "foo.eventhubs.azure.com"; + final String hostname = "foo.eventhubs.azure.com"; + final int port = 10000; - when(proxySelector.select(argThat(u -> u.getHost().equals(host)))) + when(proxySelector.select(argThat(u -> u.getHost().equals(hostname) && u.getPort() == port))) .thenReturn(Collections.emptyList()); // Act and Assert - Assertions.assertFalse(WebSocketsProxyConnectionHandler.shouldUseProxy(host)); + Assertions.assertFalse(WebSocketsProxyConnectionHandler.shouldUseProxy(hostname, port)); } @Test public void shouldUseProxyHostNull() { - assertThrows(NullPointerException.class, () -> WebSocketsProxyConnectionHandler.shouldUseProxy(null)); + assertThrows(NullPointerException.class, () -> WebSocketsProxyConnectionHandler.shouldUseProxy(null, 1000)); } @Test public void shouldUseProxyNullProxySelector() { // Arrange final String host = "foo.eventhubs.azure.com"; + final int port = 2000; ProxySelector.setDefault(null); // Act and Assert - Assertions.assertFalse(WebSocketsProxyConnectionHandler.shouldUseProxy(host)); + Assertions.assertFalse(WebSocketsProxyConnectionHandler.shouldUseProxy(host, port)); } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/EventHubReactorConnectionTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/EventHubReactorConnectionTest.java index 36a6bf65efee4..53de527f6f24e 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/EventHubReactorConnectionTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/EventHubReactorConnectionTest.java @@ -54,6 +54,9 @@ public class EventHubReactorConnectionTest { private static final String CONNECTION_ID = "test-connection-id"; private static final String HOSTNAME = "test-event-hub.servicebus.windows.net/"; + private static String product; + private static String clientVersion; + @Mock private Reactor reactor; @Mock @@ -78,8 +81,6 @@ public class EventHubReactorConnectionTest { private Record record; private ConnectionOptions connectionOptions; - private static String product; - private static String clientVersion; @BeforeAll public static void init() { @@ -90,22 +91,23 @@ public static void init() { @BeforeEach public void setup() throws IOException { - final ConnectionHandler connectionHandler = new ConnectionHandler(CONNECTION_ID, HOSTNAME, product, - clientVersion, SslDomain.VerifyMode.VERIFY_PEER_NAME, CLIENT_OPTIONS); - MockitoAnnotations.initMocks(this); + final ProxyOptions proxy = ProxyOptions.SYSTEM_DEFAULTS; + this.connectionOptions = new ConnectionOptions(HOSTNAME, tokenCredential, + CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, AmqpTransportType.AMQP, new AmqpRetryOptions(), proxy, + scheduler, CLIENT_OPTIONS, SslDomain.VerifyMode.VERIFY_PEER_NAME); + + final ConnectionHandler connectionHandler = new ConnectionHandler(CONNECTION_ID, product, clientVersion, + connectionOptions); + when(reactor.selectable()).thenReturn(selectable); - when(reactor.connectionToHost(connectionHandler.getHostname(), connectionHandler.getProtocolPort(), connectionHandler)) + when(reactor.connectionToHost(connectionHandler.getHostname(), connectionHandler.getProtocolPort(), + connectionHandler)) .thenReturn(reactorConnection); when(reactor.process()).thenReturn(true); when(reactor.attachments()).thenReturn(record); - final ProxyOptions proxy = ProxyOptions.SYSTEM_DEFAULTS; - connectionOptions = new ConnectionOptions(HOSTNAME, tokenCredential, - CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, AmqpTransportType.AMQP, new AmqpRetryOptions(), proxy, - scheduler, CLIENT_OPTIONS, SslDomain.VerifyMode.VERIFY_PEER_NAME); - final ReactorDispatcher reactorDispatcher = new ReactorDispatcher(reactor); when(reactorProvider.getReactor()).thenReturn(reactor); when(reactorProvider.getReactorDispatcher()).thenReturn(reactorDispatcher); @@ -115,8 +117,7 @@ CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, AmqpTransportType.AMQP, new AmqpRe final SessionHandler sessionHandler = new SessionHandler(CONNECTION_ID, HOSTNAME, "EVENT_HUB", reactorDispatcher, Duration.ofSeconds(20)); - when(handlerProvider.createConnectionHandler(CONNECTION_ID, HOSTNAME, AmqpTransportType.AMQP, proxy, product, - clientVersion, SslDomain.VerifyMode.VERIFY_PEER_NAME, CLIENT_OPTIONS)) + when(handlerProvider.createConnectionHandler(CONNECTION_ID, product, clientVersion, connectionOptions)) .thenReturn(connectionHandler); when(handlerProvider.createSessionHandler(eq(CONNECTION_ID), eq(HOSTNAME), anyString(), any(Duration.class))) .thenReturn(sessionHandler);