diff --git a/eng/versioning/version_client.txt b/eng/versioning/version_client.txt index 45c9fa30824fc..5cc4074dcc03c 100644 --- a/eng/versioning/version_client.txt +++ b/eng/versioning/version_client.txt @@ -480,6 +480,9 @@ io.clientcore:http-stress;1.0.0-beta.1;1.0.0-beta.1 # unreleased_com.azure:azure-core-test;1.26.0-beta.1 +unreleased_com.azure:azure-core;1.50.0-beta.1 +unreleased_com.azure:azure-core-amqp;2.10.0-beta.1 + # Released Beta dependencies: Copy the entry from above, prepend "beta_", remove the current # version and set the version to the released beta. Released beta dependencies are only valid # for dependency versions. These entries are specifically for when we've released a beta for diff --git a/sdk/core/azure-core-amqp/spotbugs-exclude.xml b/sdk/core/azure-core-amqp/spotbugs-exclude.xml index 11ba7d2e116a8..67b702ec76c8d 100644 --- a/sdk/core/azure-core-amqp/spotbugs-exclude.xml +++ b/sdk/core/azure-core-amqp/spotbugs-exclude.xml @@ -120,4 +120,14 @@ + + + + + + + + + + diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ChannelCacheWrapper.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ChannelCacheWrapper.java new file mode 100644 index 0000000000000..faa174e74bda2 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ChannelCacheWrapper.java @@ -0,0 +1,62 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.implementation; + +import reactor.core.publisher.Mono; + +import java.util.Objects; + +/** + * A temporary type to side by side support {@link RequestResponseChannel} caching that + * + *

+ * TODO (anu): remove this temporary type when removing v1 and 'RequestResponseChannelCache' is no longer opt-in for v2. + *

+ */ +final class ChannelCacheWrapper { + private final AmqpChannelProcessor channelProcessor; + private final RequestResponseChannelCache channelCache; + + /** + * Creates channel cache for V1 client or V2 client without "com.azure.core.amqp.internal.session-channel-cache.v2" + * opted-in. + * + * @param channelProcessor the channel processor for caching {@link RequestResponseChannel}. + */ + ChannelCacheWrapper(AmqpChannelProcessor channelProcessor) { + this.channelProcessor = Objects.requireNonNull(channelProcessor, "'channelProcessor' cannot be null."); + this.channelCache = null; + } + + /** + * Creates channel cache for V1 client with "com.azure.core.amqp.internal.session-channel-cache.v2" opted-in. + * + * @param channelCache the cache for {@link RequestResponseChannel}. + */ + ChannelCacheWrapper(RequestResponseChannelCache channelCache) { + this.channelCache = Objects.requireNonNull(channelCache, "'channelCache' cannot be null."); + this.channelProcessor = null; + } + + Mono get() { + if (channelCache != null) { + return channelCache.get(); + } else { + return channelProcessor; + } + } + + Mono closeAsync() { + if (channelCache != null) { + return channelCache.closeAsync(); + } else { + return channelProcessor.flatMap(RequestResponseChannel::closeAsync); + } + } +} diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ManagementChannel.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ManagementChannel.java index 4a40a9737a2ac..bc5cb04493c19 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ManagementChannel.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ManagementChannel.java @@ -33,7 +33,7 @@ */ public class ManagementChannel implements AmqpManagementNode { private final TokenManager tokenManager; - private final AmqpChannelProcessor createChannel; + private final ChannelCacheWrapper channelCache; private final String fullyQualifiedNamespace; private final ClientLogger logger; private final String entityPath; @@ -41,14 +41,14 @@ public class ManagementChannel implements AmqpManagementNode { /** * Creates a new instance of ManagementChannel. * - * @param createChannel Creates a new AMQP channel. + * @param channelCache The request response channel cache. * @param fullyQualifiedNamespace Fully qualified namespace for the message broker. * @param entityPath The entity path for the message broker. * @param tokenManager Manages tokens for authorization. */ - public ManagementChannel(AmqpChannelProcessor createChannel, String fullyQualifiedNamespace, - String entityPath, TokenManager tokenManager) { - this.createChannel = Objects.requireNonNull(createChannel, "'createChannel' cannot be null."); + public ManagementChannel(ChannelCacheWrapper channelCache, String fullyQualifiedNamespace, String entityPath, + TokenManager tokenManager) { + this.channelCache = Objects.requireNonNull(channelCache, "'channelCache' cannot be null."); this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace, "'fullyQualifiedNamespace' cannot be null."); this.entityPath = Objects.requireNonNull(entityPath, "'entityPath' cannot be null."); @@ -62,7 +62,7 @@ public ManagementChannel(AmqpChannelProcessor createChan @Override public Mono send(AmqpAnnotatedMessage message) { - return isAuthorized().then(createChannel.flatMap(channel -> { + return isAuthorized().then(channelCache.get().flatMap(channel -> { final Message protonJMessage = MessageUtils.toProtonJMessage(message); return channel.sendWithAck(protonJMessage) @@ -74,7 +74,7 @@ public Mono send(AmqpAnnotatedMessage message) { @Override public Mono send(AmqpAnnotatedMessage message, DeliveryOutcome deliveryOutcome) { - return isAuthorized().then(createChannel.flatMap(channel -> { + return isAuthorized().then(channelCache.get().flatMap(channel -> { final Message protonJMessage = MessageUtils.toProtonJMessage(message); final DeliveryState protonJDeliveryState = MessageUtils.toProtonJDeliveryState(deliveryOutcome); @@ -87,7 +87,7 @@ public Mono send(AmqpAnnotatedMessage message, DeliveryOut @Override public Mono closeAsync() { - return createChannel.flatMap(channel -> channel.closeAsync()).cache(); + return channelCache.closeAsync().cache(); } private void handleResponse(Message response, SynchronousSink sink, diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ProtonSession.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ProtonSession.java new file mode 100644 index 0000000000000..9e00e3068eb67 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ProtonSession.java @@ -0,0 +1,451 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.implementation; + +import com.azure.core.amqp.exception.AmqpErrorCondition; +import com.azure.core.amqp.exception.AmqpErrorContext; +import com.azure.core.amqp.exception.AmqpException; +import com.azure.core.amqp.implementation.handler.SessionHandler; +import com.azure.core.util.logging.ClientLogger; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.EndpointState; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.engine.Session; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; + +import java.io.IOException; +import java.time.Duration; +import java.util.Objects; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static com.azure.core.amqp.exception.AmqpErrorCondition.TIMEOUT_ERROR; +import static com.azure.core.amqp.implementation.ClientConstants.SESSION_NAME_KEY; +import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST; + +/** + * A type managing a QPid Proton-j low-level {@link Session} instance. + */ +final class ProtonSession { + private static final String SESSION_NOT_OPENED = "session has not been opened."; + private static final String NOT_OPENING_DISPOSED_SESSION = "session is already disposed, not opening."; + private static final String DISPOSED_MESSAGE_FORMAT = "Cannot create %s from a closed session."; + private static final String REACTOR_CLOSED_MESSAGE_FORMAT + = "connectionId:[%s] sessionName:[%s] connection-reactor is disposed."; + private static final String OBTAIN_CHANNEL_TIMEOUT_MESSAGE_FORMAT + = "connectionId:[%s] sessionName:[%s] obtaining channel (%s) timed out."; + private final AtomicReference state = new AtomicReference<>(State.EMPTY); + private final AtomicBoolean opened = new AtomicBoolean(false); + private final Sinks.Empty openAwaiter = Sinks.empty(); + private final Connection connection; + private final ReactorProvider reactorProvider; + private final SessionHandler handler; + private final ClientLogger logger; + + /** + * Creates a ProtonSession. + * + * @param connectionId the id of the QPid Proton-j Connection hosting the session. + * @param hostname the host name of the broker that the QPid Proton-j Connection hosting + * the session is connected to. + * @param connection the QPid Proton-j Connection hosting the session. + * @param handlerProvider the handler provider for various type of endpoints (session, link). + * @param reactorProvider the provider for reactor dispatcher. + * @param sessionName the session name. + * @param openTimeout the session open timeout. + * @param logger the client logger. + */ + ProtonSession(String connectionId, String hostname, Connection connection, ReactorHandlerProvider handlerProvider, + ReactorProvider reactorProvider, String sessionName, Duration openTimeout, ClientLogger logger) { + this.connection = Objects.requireNonNull(connection, "'connection' cannot be null."); + this.reactorProvider = Objects.requireNonNull(reactorProvider, "'reactorProvider' cannot be null."); + Objects.requireNonNull(handlerProvider, "'handlerProvider' cannot be null."); + this.handler = handlerProvider.createSessionHandler(connectionId, hostname, sessionName, openTimeout); + this.logger = Objects.requireNonNull(logger, "'logger' cannot be null."); + } + + /** + * Gets the session name. + * + * @return the session name. + */ + String getName() { + return handler.getSessionName(); + } + + /** + * Gets the identifier of the QPid Proton-j Connection hosting the session. + * + * @return the connection identifier. + */ + String getConnectionId() { + return handler.getConnectionId(); + } + + /** + * Gets the host name of the broker that the QPid Proton-j Connection facilitating the session + * is connected to. + * + * @return the hostname. + */ + String getHostname() { + return handler.getHostname(); + } + + /** + * Gets the error context of the session. + * + * @return the error context. + */ + AmqpErrorContext getErrorContext() { + return handler.getErrorContext(); + } + + /** + * Gets the connectivity states of the session. + * + * @return the session's connectivity states. + */ + Flux getEndpointStates() { + return handler.getEndpointStates(); + } + + /** + * Gets the reactor dispatcher provider associated with the session. + *

+ * Any operation (e.g., obtaining sender, receiver) on the session must be invoked on the reactor dispatcher. + *

+ * + * @return the reactor dispatcher provider. + */ + ReactorProvider getReactorProvider() { + return reactorProvider; + } + + /** + * Opens the session in the QPid Proton-j Connection. + *

+ * The session open attempt is made upon the first subscription, once opened later subscriptions will complete + * immediately, i.e. there is an open-only-once semantics. + *

+ *

+ * If the session (or parent Qpid Proton-j connection) is disposed after opening, any later operation attempts + * (e.g., creating sender, receiver, channel) will fail. + *

+ *

+ * By design, no re-open attempt will be made within this type. Lifetime of a {@link ProtonSession} instance is + * scoped to life time of one low level Qpid Proton-j session instance it manages, which is scoped within the + * life time of qpid Proton-j Connection hosting it. Re-establishing session requires querying the connection-cache + * to obtain the latest connection (may not be same as the connection facilitated this session) then hosting and + * opening a new {@link ProtonSession} on it. It means, upon a retriable {@link AmqpException} from any APIs (to + * create sender, receiver, channel) in this type, the call sites needs to obtain a new {@link ProtonSession} + * by polling the connection-cache. + *

+ * + * @return a mono that completes once the session is opened. + *

+ *

    the mono can terminates with retriable {@link AmqpException} if + *
  • the session disposal happened while opening,
  • + *
  • or the connection reactor thread got shutdown while opening.
  • + *
+ *

+ */ + Mono open() { + if (opened.getAndSet(true)) { + return openAwaiter.asMono(); + } + try { + getReactorProvider().getReactorDispatcher().invoke(() -> { + final Session session = connection.session(); + BaseHandler.setHandler(session, handler); + session.open(); + logger.atInfo() + .addKeyValue(SESSION_NAME_KEY, handler.getSessionName()) + .log("session local open scheduled."); + + if (state.compareAndSet(State.EMPTY, new State(session))) { + openAwaiter.emitEmpty(FAIL_FAST); + } else { + session.close(); + if (state.get() == State.DISPOSED) { + openAwaiter.emitError(new ProtonSessionClosedException(NOT_OPENING_DISPOSED_SESSION), + FAIL_FAST); + } else { + openAwaiter.emitError(new IllegalStateException("session is already opened."), FAIL_FAST); + } + } + }); + } catch (Exception e) { + if (e instanceof IOException | e instanceof RejectedExecutionException) { + final String message = String.format(REACTOR_CLOSED_MESSAGE_FORMAT, getConnectionId(), getName()); + openAwaiter.emitError(retriableAmqpError(null, message, e), FAIL_FAST); + } else { + openAwaiter.emitError(e, FAIL_FAST); + } + } + return openAwaiter.asMono(); + } + + /** + * Gets a bidirectional channel on the session. + *

+ * A channel consists of a sender link and a receiver link, and is used for management operations where a sender + * link is used to send a request to the broker and a receiver link gets the associated response from the broker. + *

+ * + * @param name the channel name, which is used as the name prefix for sender link and receiver link in the channel. + * @param timeout the timeout for obtaining the channel. + * + * @return a mono that completes with a {@link ProtonChannel} once the channel is created in the session. + *

+ *

    + *
  • the mono terminates with {@link IllegalStateException} if the session is not opened yet via {@link #open()}.
  • + *
  • the mono terminates with + *
      + *
    • a retriable {@link ProtonSessionClosedException} if the session is disposed,
    • + *
    • a retriable {@link AmqpException} if obtaining channel timeout,
    • + *
    • or a retriable {@link AmqpException} if the connection reactor thread is shutdown.
    • + *
    + *
  • + *
+ *

+ */ + Mono channel(final String name, Duration timeout) { + final Mono channel = Mono.create(sink -> { + if (name == null) { + sink.error(new NullPointerException("'name' cannot be null.")); + return; + } + try { + // Pre-check to fail fast before using reactor dispatcher thread. The pre-check asserts that the session + // is opened and not in disposed state. + getSession("channel"); + } catch (ProtonSessionClosedException | IllegalStateException e) { + sink.error(e); + return; + } + try { + getReactorProvider().getReactorDispatcher().invoke(() -> { + final Session session; + try { + session = getSession("channel"); + } catch (ProtonSessionClosedException | IllegalStateException e) { + sink.error(e); + return; + } + final String senderName = name + ":sender"; + final String receiverName = name + ":receiver"; + sink.success(new ProtonChannel(name, session.sender(senderName), session.receiver(receiverName))); + }); + } catch (Exception e) { + if (e instanceof IOException | e instanceof RejectedExecutionException) { + final String message = String.format(REACTOR_CLOSED_MESSAGE_FORMAT, getConnectionId(), getName()); + sink.error(retriableAmqpError(null, message, e)); + } else { + sink.error(e); + } + } + }); + // TODO (anu): when removing v1 support, move the timeout to the call site, ReactorSession::channel(), so it + // aligns with the placement of timeout for ReactorSession::open(). + return channel.timeout(timeout, Mono.error(() -> { + final String message + = String.format(OBTAIN_CHANNEL_TIMEOUT_MESSAGE_FORMAT, getConnectionId(), getName(), name); + return retriableAmqpError(TIMEOUT_ERROR, message, null); + })); + } + + /** + * Gets a QPid Proton-j sender on the session. + *

+ * The call site required to invoke this method on Reactor dispatcher thread using {@link #getReactorProvider()}. + * It is possible to run into race conditions with QPid Proton-j if invoked from any other threads. + *

+ * + * @param name the sender name. + * + * @return the sender. + * @throws IllegalStateException if the attempt to obtain the sender was made before opening the session. + * @throws ProtonSessionClosedException (a retriable {@link AmqpException}) if the session was disposed. + */ + Sender senderUnsafe(String name) { + final Session session = getSession("sender link"); + return session.sender(name); + } + + /** + * Gets a QPid Proton-j receiver on the session. + *

+ * The call site required to invoke this method on Reactor dispatcher thread using {@link #getReactorProvider()}. + * It is possible to run into race conditions with QPid Proton-j if invoked from any other threads. + *

+ * + * @param name the sender name. + * + * @return the sender. + * @throws IllegalStateException if the attempt to obtain the receiver was made before opening the session. + * @throws ProtonSessionClosedException a retriable {@link AmqpException}) if the session was disposed. + */ + Receiver receiverUnsafe(String name) { + final Session session = getSession("receive link"); + return session.receiver(name); + } + + /** + * Begin the disposal by locally closing the underlying QPid Proton-j session. + * + * @param errorCondition the error condition to close the session with. + */ + void beginClose(ErrorCondition errorCondition) { + final State s = state.getAndSet(State.DISPOSED); + if (s == State.EMPTY || s == State.DISPOSED) { + return; + } + final Session session = s.get(); + if (session.getLocalState() != EndpointState.CLOSED) { + session.close(); + if (errorCondition != null && session.getCondition() == null) { + session.setCondition(errorCondition); + } + } + } + + /** + * Completes the disposal by closing the {@link SessionHandler}. + */ + void endClose() { + handler.close(); + } + + /** + * Obtain the underlying QPid Proton-j {@link Session} atomically. + * + * @param resourceType the type of the resource (e.g., sender, receiver, channel) that the call site want to host + * on the obtained session. The provided string value is used only to form error message for any exception. + * + * @return the QPid Proton-j session. + * @throws IllegalStateException if the attempt to obtain the session was made before opening via {@link #open()}. + * @throws ProtonSessionClosedException a retriable {@link AmqpException}) if the session was disposed. + */ + private Session getSession(String resourceType) { + final State s = state.get(); + if (s == State.EMPTY) { + throw logger.logExceptionAsError(new IllegalStateException(SESSION_NOT_OPENED)); + } + if (s == State.DISPOSED) { + throw logger.logExceptionAsWarning( + new ProtonSessionClosedException(String.format(DISPOSED_MESSAGE_FORMAT, resourceType))); + } + return s.get(); + } + + /** + * Creates a retriable AMQP exception. + *

+ * The call sites uses this method to translate a session unavailability "event" (session disposed, session operation + * timed-out or connection being closed) to a retriable error. While the "event" is transient, recovering from it + * by creating a new session on current connection or on a new connection needs to be done not by this class or + * the parent 'ReactorConnection' owning this ProtonSession but by the downstream. E.g., the downstream Consumer, + * Producer Client that has access to the chain to propagate retry request to top level V2 ReactorConnectionCache. + *

+ * @param condition the error condition. + * @param message the error message. + * @param cause the actual cause of error. + * @return the retriable AMQP exception. + */ + private AmqpException retriableAmqpError(AmqpErrorCondition condition, String message, Throwable cause) { + return new AmqpException(true, condition, message, cause, handler.getErrorContext()); + } + + /** + * Type representing a bidirectional channel hosted on the QPid Proton-j session. + *

+ * The {@link RequestResponseChannel} underneath uses an instance of {@link ProtonChannel} for the bi-directional + * communication with the broker. + *

+ */ + static final class ProtonChannel { + private final String name; + private final Sender sender; + private final Receiver receiver; + + /** + * Creates a ProtonChannel. + * + * @param name the channel name. + * @param sender the sender endpoint of the channel. + * @param receiver the receiver endpoint of the channel. + */ + ProtonChannel(String name, Sender sender, Receiver receiver) { + this.name = name; + this.sender = sender; + this.receiver = receiver; + } + + /** + * Gets the channel name. + * + * @return the channel name. + */ + String getName() { + return name; + } + + /** + * Gets the sender endpoint of the channel. + * + * @return the channel's sender endpoint. + */ + Sender getSender() { + return sender; + } + + /** + * Gets the receiver endpoint of the channel. + * + * @return the channel's receiver endpoint. + */ + Receiver getReceiver() { + return receiver; + } + } + + /** + * A retriable {@link AmqpException} indicating session is disposed. + */ + static final class ProtonSessionClosedException extends AmqpException { + private ProtonSessionClosedException(String message) { + super(true, message, null); + } + } + + /** + * A type to atomically access the underlying QPid Proton-j {@link Session} that {@link ProtonSession} manages. + */ + private static final class State { + private static final State EMPTY = new State(); + private static final State DISPOSED = new State(); + private final Session session; + + private State() { + // Ctr for the EMPTY and DISPOSED state. + this.session = null; + } + + private State(Session session) { + this.session = Objects.requireNonNull(session, "'session' cannot be null."); + } + + private Session get() { + assert this != EMPTY; + return this.session; + } + } +} diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ProtonSessionWrapper.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ProtonSessionWrapper.java new file mode 100644 index 0000000000000..38dc5da2c5239 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ProtonSessionWrapper.java @@ -0,0 +1,238 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.implementation; + +import com.azure.core.amqp.exception.AmqpErrorContext; +import com.azure.core.amqp.implementation.ProtonSession.ProtonChannel; +import com.azure.core.amqp.implementation.handler.SessionHandler; +import com.azure.core.util.logging.ClientLogger; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.engine.EndpointState; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.engine.Session; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.Duration; +import java.util.Objects; + +/** + * A temporary type to support {@link ProtonSession} in v2 or to support direct use of low-level {@link Session} + * in v1 or v2. + *
    + *
  • In v2 mode without "com.azure.core.amqp.internal.session-channel-cache.v2" opt-in or in v1 mode, + * operations are directly performed on Qpid Proton-j low-level {@link Session} instance.
  • + *
  • In v2 mode with "com.azure.core.amqp.internal.session-channel-cache.v2" explicitly opted-in, operations are + * performed on ({@link ProtonSession}) instance, that internally delegates to Qpid Proton-j low-level + * {@link Session}, but with safety measures.
  • + *
+ *

+ * TODO (anu): remove this temporary type when removing v1 and 'ProtonSession' is no longer opt-in for v2. + *

+ */ +public final class ProtonSessionWrapper { + // Const defined only for log and error message purposes, actual configuration used in ClientBuilder. + private static final String SESSION_CHANNEL_CACHE_KEY = "com.azure.core.amqp.internal.session-channel-cache.v2"; + private final Session sessionUnsafe; + private final ProtonSession session; + private final SessionHandler handler; + private final ReactorProvider provider; + + /** + * Creates session wrapper for v2 client with {@link #SESSION_CHANNEL_CACHE_KEY} opted-in. + * + * @param session session to wrap. + */ + ProtonSessionWrapper(ProtonSession session) { + this.session = Objects.requireNonNull(session, "'session' cannot be null."); + this.handler = null; + this.provider = null; + this.sessionUnsafe = null; + } + + /** + * Creates session wrapper for v2 client without {@link #SESSION_CHANNEL_CACHE_KEY} opted-in or v1 client. + * + * @param sessionUnsafe session to wrap. + * @param handler handler for the session. + * @param provider the reactor dispatcher provider. + */ + public ProtonSessionWrapper(Session sessionUnsafe, SessionHandler handler, ReactorProvider provider) { + this.sessionUnsafe = Objects.requireNonNull(sessionUnsafe, "'sessionUnsafe' cannot be null."); + this.handler = Objects.requireNonNull(handler, "'handler' cannot be null."); + this.provider = Objects.requireNonNull(provider, "'provider' cannot be null."); + this.session = null; + } + + /** + * Check if the client is in v2 with {@link #SESSION_CHANNEL_CACHE_KEY} opted-in (hence uses {@link ProtonSession}). + * + * @return true if the client is in v2 mode and opted-in for {@link #SESSION_CHANNEL_CACHE_KEY}. + */ + boolean isV2ClientOnSessionCache() { + return session != null; + } + + String getName() { + if (isV2ClientOnSessionCache()) { + return session.getName(); + } else { + return handler.getSessionName(); + } + } + + String getConnectionId() { + if (isV2ClientOnSessionCache()) { + return session.getConnectionId(); + } else { + return handler.getConnectionId(); + } + } + + String getHostname() { + if (isV2ClientOnSessionCache()) { + return session.getHostname(); + } else { + return handler.getHostname(); + } + } + + Flux getEndpointStates() { + if (isV2ClientOnSessionCache()) { + return session.getEndpointStates(); + } else { + return handler.getEndpointStates(); + } + } + + ReactorProvider getReactorProvider() { + if (isV2ClientOnSessionCache()) { + return session.getReactorProvider(); + } else { + return provider; + } + } + + AmqpErrorContext getErrorContext() { + if (isV2ClientOnSessionCache()) { + return session.getErrorContext(); + } else { + return handler.getErrorContext(); + } + } + + void openUnsafe(ClientLogger logger) { + if (isV2ClientOnSessionCache()) { + throw logger.logExceptionAsError(new UnsupportedOperationException( + "Requires v2 client without " + SESSION_CHANNEL_CACHE_KEY + " or v1 client.")); + } + sessionUnsafe.open(); + } + + Mono open() { + if (!isV2ClientOnSessionCache()) { + return Mono.error( + new UnsupportedOperationException("open() requires v2 client with " + SESSION_CHANNEL_CACHE_KEY)); + } + return session.open(); + } + + Mono channel(String name, Duration timeout) { + if (isV2ClientOnSessionCache()) { + return session.channel(name, timeout).map(ProtonChannelWrapper::new); + } else { + return Mono.just(new ProtonChannelWrapper(name, sessionUnsafe)); + } + } + + Sender senderUnsafe(String name) { + if (isV2ClientOnSessionCache()) { + return session.senderUnsafe(name); + } else { + return sessionUnsafe.sender(name); + } + } + + Receiver receiverUnsafe(String name) { + if (isV2ClientOnSessionCache()) { + return session.receiverUnsafe(name); + } else { + return sessionUnsafe.receiver(name); + } + } + + void beginClose(ErrorCondition condition) { + if (isV2ClientOnSessionCache()) { + session.beginClose(condition); + } else { + if (sessionUnsafe.getLocalState() != EndpointState.CLOSED) { + sessionUnsafe.close(); + if (condition != null && sessionUnsafe.getCondition() == null) { + sessionUnsafe.setCondition(condition); + } + } + } + } + + void endClose() { + if (isV2ClientOnSessionCache()) { + session.endClose(); + } else { + handler.close(); + } + } + + /** + * A temporary type to represent a channel obtained from the {@link ProtonSessionWrapper}. + *

+ * TODO (anu): remove this temporary type when removing parent ProtonSessionWrapper type. + *

+ */ + static final class ProtonChannelWrapper { + private final String name; + private final Sender sender; + private final Receiver receiver; + + /** + * Creates channel wrapper for v2 client with {@link #SESSION_CHANNEL_CACHE_KEY} opted-in + * (hence uses {@link ProtonChannel} in {@link ProtonSession}). + * + * @param channel the channel to wrap. + */ + ProtonChannelWrapper(ProtonChannel channel) { + Objects.requireNonNull(channel, "'channel' cannot be null."); + this.name = channel.getName(); + this.sender = channel.getSender(); + this.receiver = channel.getReceiver(); + } + + /** + * Creates channel wrapper for v2 client without {@link #SESSION_CHANNEL_CACHE_KEY} opted-in or V1 client. + * + * @param name the name of the channel. + * @param sessionUnsafe the session to host the sender and receiver in the channel + */ + ProtonChannelWrapper(String name, Session sessionUnsafe) { + this.name = Objects.requireNonNull(name, "'name' cannot be null."); + Objects.requireNonNull(sessionUnsafe, "'sessionUnsafe' cannot be null."); + // In current V2 (that doesn't have ProtonSession) or V1, the RequestResponseChannel's sender and receiver + // gets created in the calling thread. Continue to do the same here in wrapper (i.e, no behavioral change). + this.sender = sessionUnsafe.sender(name + ":sender"); + this.receiver = sessionUnsafe.receiver(name + ":receiver"); + } + + String getName() { + return name; + } + + Sender sender() { + return sender; + } + + Receiver receiver() { + return receiver; + } + } +} diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java index 09766e2ddfc4b..d4eb18a1b139b 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java @@ -64,6 +64,7 @@ public class ReactorConnection implements AmqpConnection { private static final String MANAGEMENT_LINK_NAME = "mgmt"; private final ClientLogger logger; + private final ReactorSessionCache sessionCache; private final ConcurrentMap sessionMap = new ConcurrentHashMap<>(); private final ConcurrentHashMap managementNodes = new ConcurrentHashMap<>(); @@ -91,8 +92,10 @@ public class ReactorConnection implements AmqpConnection { private volatile ClaimsBasedSecurityChannel cbsChannel; private volatile AmqpChannelProcessor cbsChannelProcessor; + private volatile RequestResponseChannelCache cbsChannelCache; private volatile Connection connection; private final boolean isV2; + private final boolean useSessionChannelCache; /** * Creates a new AMQP connection that uses proton-j. @@ -106,12 +109,15 @@ public class ReactorConnection implements AmqpConnection { * @param messageSerializer Serializer to translate objects to and from proton-j {@link Message messages}. * @param senderSettleMode to set as {@link SenderSettleMode} on sender. * @param receiverSettleMode to set as {@link ReceiverSettleMode} on receiver. - * @param isV2 (temporary) flag to use either v1 or v2 receiver. + * @param isV2 (temporary) flag to use either v1 or v2 stack. + * @param useSessionChannelCache indicates if {@link ReactorSessionCache} and {@link RequestResponseChannelCache} + * should be used when in v2 mode. */ public ReactorConnection(String connectionId, ConnectionOptions connectionOptions, ReactorProvider reactorProvider, ReactorHandlerProvider handlerProvider, AmqpLinkProvider linkProvider, TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer, - SenderSettleMode senderSettleMode, ReceiverSettleMode receiverSettleMode, boolean isV2) { + SenderSettleMode senderSettleMode, ReceiverSettleMode receiverSettleMode, boolean isV2, + boolean useSessionChannelCache) { this.connectionOptions = connectionOptions; this.reactorProvider = reactorProvider; @@ -129,6 +135,16 @@ public ReactorConnection(String connectionId, ConnectionOptions connectionOption this.senderSettleMode = senderSettleMode; this.receiverSettleMode = receiverSettleMode; this.isV2 = isV2; + if (!isV2 && useSessionChannelCache) { + // Internal-Error: When client is in V1 mode, the builder should have ignored the attempt to opt in + // the "com.azure.core.amqp.internal.session-channel-cache.v2" configuration. + logger.atError().log("Internal-Error: Unexpected attempt to use SessionCache and ChannelCache in V1."); + this.useSessionChannelCache = false; + } else { + this.useSessionChannelCache = useSessionChannelCache; + } + this.sessionCache = new ReactorSessionCache(connectionId, handler.getHostname(), handlerProvider, + reactorProvider, operationTimeout, logger); this.connectionMono = Mono.fromCallable(this::getOrCreateConnection).flatMap(reactorConnection -> { final Mono activeEndpoint @@ -149,11 +165,11 @@ public ReactorConnection(String connectionId, ConnectionOptions connectionOption })); return activeEndpoint.thenReturn(reactorConnection); }).doOnError(error -> { - if (isDisposed.getAndSet(true)) { - logger.verbose("Connection was already disposed: Error occurred while connection was starting.", error); - } else { + if (setDisposed()) { closeAsync(new AmqpShutdownSignal(false, false, "Error occurred while connection was starting. Error: " + error)).subscribe(); + } else { + logger.verbose("Connection was already disposed: Error occurred while connection was starting.", error); } }); @@ -162,14 +178,14 @@ public ReactorConnection(String connectionId, ConnectionOptions connectionOption logger.atVerbose().addKeyValue("state", state).log("getConnectionState"); return AmqpEndpointStateUtil.getConnectionState(state); }).onErrorResume(error -> { - if (!isDisposed.getAndSet(true)) { + if (setDisposed()) { logger.verbose("Disposing of active sessions due to error."); return closeAsync(new AmqpShutdownSignal(false, false, error.getMessage())).then(Mono.error(error)); } else { return Mono.error(error); } }).doOnComplete(() -> { - if (!isDisposed.getAndSet(true)) { + if (setDisposed()) { logger.verbose("Disposing of active sessions due to connection close."); closeAsync(new AmqpShutdownSignal(false, false, "Connection handler closed.")).subscribe(); } @@ -254,10 +270,19 @@ public Mono getManagementNode(String entityPath) { .addKeyValue("address", address) .log("Creating management node."); - final AmqpChannelProcessor requestResponseChannel - = createRequestResponseChannel(sessionName, linkName, address); - return new ManagementChannel(requestResponseChannel, getFullyQualifiedNamespace(), entityPath, - tokenManager); + final ChannelCacheWrapper channelCache; + if (useSessionChannelCache) { + // V2 with 'SessionCache,RequestResponseChannelCache' opted-in. + final RequestResponseChannelCache cache + = new RequestResponseChannelCache(this, sessionName, linkName, address, retryPolicy); + channelCache = new ChannelCacheWrapper(cache); + } else { + // V2 without 'SessionCache,RequestResponseChannelCache' opt-in or V1. + final AmqpChannelProcessor cache + = createRequestResponseChannel(sessionName, linkName, address); + channelCache = new ChannelCacheWrapper(cache); + } + return new ManagementChannel(channelCache, getFullyQualifiedNamespace(), entityPath, tokenManager); })); }); } @@ -304,6 +329,12 @@ public Map getConnectionProperties() { */ @Override public Mono createSession(String sessionName) { + if (useSessionChannelCache) { + // V2 with 'SessionCache,RequestResponseChannelCache' opted-in. + final ReactorSessionCache.Loader loader = this::createSession; + return sessionCache.getOrLoad(connectionMono, sessionName, loader).cast(AmqpSession.class); + } + // V2 without 'SessionCache,RequestResponseChannelCache' opt-in or V1. return connectionMono.map(connection -> { return sessionMap.computeIfAbsent(sessionName, key -> { final SessionHandler sessionHandler = handlerProvider.createSessionHandler(connectionId, @@ -311,7 +342,9 @@ public Mono createSession(String sessionName) { final Session session = connection.session(); BaseHandler.setHandler(session, sessionHandler); - final AmqpSession amqpSession = createSession(key, session, sessionHandler); + final ProtonSessionWrapper sessionWrapper + = new ProtonSessionWrapper(session, sessionHandler, reactorProvider); + final AmqpSession amqpSession = createSession(sessionWrapper); final Disposable subscription = amqpSession.getEndpointStates().subscribe(state -> { }, error -> { // If we were already disposing of the connection, the session would be removed. @@ -366,17 +399,18 @@ public Mono createSession(String sessionName) { } /** - * Creates a new AMQP session with the given parameters. - * - * @param sessionName Name of the AMQP session. - * @param session The reactor session associated with this session. - * @param handler Session handler for the reactor session. + * Creates a new ReactorSession that uses the given low-level session. + *

+ * TODO (anu): Use 'ProtonSession' as the arg when removing v1 and 'SessionCache' (hence 'ProtonSession') is no + * longer opt-in for v2. + *

+ * @param session the QPid Proton-j session. * - * @return A new instance of AMQP session. + * @return A new instance of ReactorSession. */ - protected AmqpSession createSession(String sessionName, Session session, SessionHandler handler) { - return new ReactorSession(this, session, handler, sessionName, reactorProvider, handlerProvider, linkProvider, - getClaimsBasedSecurityNode(), tokenManagerProvider, messageSerializer, connectionOptions.getRetry()); + protected ReactorSession createSession(ProtonSessionWrapper session) { + return new ReactorSession(this, session, handlerProvider, linkProvider, getClaimsBasedSecurityNode(), + tokenManagerProvider, messageSerializer, connectionOptions.getRetry()); } /** @@ -384,6 +418,12 @@ protected AmqpSession createSession(String sessionName, Session session, Session */ @Override public boolean removeSession(String sessionName) { + if (useSessionChannelCache) { + // V2 with 'SessionCache,RequestResponseChannelCache' opted-in. + return sessionCache.evict(sessionName); + } + + // V2 without 'SessionCache,RequestResponseChannelCache' opt-in or V1. if (sessionName == null) { return false; } @@ -435,14 +475,14 @@ protected Mono getReactorConnection() { */ protected AmqpChannelProcessor createRequestResponseChannel(String sessionName, String linkName, String entityPath) { - + assert !isV2 || !useSessionChannelCache; Objects.requireNonNull(entityPath, "'entityPath' cannot be null."); final Flux createChannel = createSession(sessionName).cast(ReactorSession.class) - .map(reactorSession -> new RequestResponseChannel(this, getId(), getFullyQualifiedNamespace(), linkName, - entityPath, reactorSession.session(), connectionOptions.getRetry(), handlerProvider, reactorProvider, - messageSerializer, senderSettleMode, receiverSettleMode, - handlerProvider.getMetricProvider(getFullyQualifiedNamespace(), entityPath), isV2)) + .flatMap(reactorSession -> reactorSession.channel(linkName)) + .map(channel -> new RequestResponseChannel(this, getId(), getFullyQualifiedNamespace(), entityPath, channel, + connectionOptions.getRetry(), handlerProvider, reactorProvider, messageSerializer, senderSettleMode, + receiverSettleMode, handlerProvider.getMetricProvider(getFullyQualifiedNamespace(), entityPath), isV2)) .doOnNext(e -> { logger.atInfo() .addKeyValue(ENTITY_PATH_KEY, entityPath) @@ -461,26 +501,25 @@ protected AmqpChannelProcessor createRequestResponseChan } // Note: The V1 'createRequestResponseChannel(...)' internal API will be removed once entirely on the V2 stack. - Mono newRequestResponseChannel(String sessionName, String linksNamePrefix, - String entityPath) { - assert isV2; + Mono newRequestResponseChannel(String sessionName, String name, String entityPath) { + assert isV2 && useSessionChannelCache; Objects.requireNonNull(entityPath, "'entityPath' cannot be null."); return createSession(sessionName).cast(ReactorSession.class) - .map(reactorSession -> new RequestResponseChannel(this, getId(), getFullyQualifiedNamespace(), - linksNamePrefix, entityPath, reactorSession.session(), connectionOptions.getRetry(), handlerProvider, - reactorProvider, messageSerializer, senderSettleMode, receiverSettleMode, - handlerProvider.getMetricProvider(getFullyQualifiedNamespace(), entityPath), isV2)); + .flatMap(reactorSession -> reactorSession.channel(name)) + .map(channel -> new RequestResponseChannel(this, getId(), getFullyQualifiedNamespace(), entityPath, channel, + connectionOptions.getRetry(), handlerProvider, reactorProvider, messageSerializer, senderSettleMode, + receiverSettleMode, handlerProvider.getMetricProvider(getFullyQualifiedNamespace(), entityPath), isV2)); } @Override public Mono closeAsync() { - if (isDisposed.getAndSet(true)) { + if (setDisposed()) { + return closeAsync(new AmqpShutdownSignal(false, true, "Disposed by client.")); + } else { logger.verbose("Connection was already closed. Not disposing again."); return isClosedMono.asMono(); } - - return closeAsync(new AmqpShutdownSignal(false, true, "Disposed by client.")); } /** @@ -500,10 +539,20 @@ public Mono closeAsync(AmqpShutdownSignal shutdownSignal) { } final Mono cbsCloseOperation; - if (cbsChannelProcessor != null) { - cbsCloseOperation = cbsChannelProcessor.flatMap(channel -> channel.closeAsync()); + if (useSessionChannelCache) { + // V2 with 'SessionCache,RequestResponseChannelCache' opted-in. + if (cbsChannelCache != null) { + cbsCloseOperation = cbsChannelCache.closeAsync(); + } else { + cbsCloseOperation = Mono.empty(); + } } else { - cbsCloseOperation = Mono.empty(); + // V2 without 'SessionCache,RequestResponseChannelCache' opt-in or V1. + if (cbsChannelProcessor != null) { + cbsCloseOperation = cbsChannelProcessor.flatMap(channel -> channel.closeAsync()); + } else { + cbsCloseOperation = Mono.empty(); + } } final Mono managementNodeCloseOperations @@ -558,8 +607,16 @@ private synchronized void closeConnectionWork() { connection.close(); handler.close(); - final ArrayList> closingSessions = new ArrayList<>(); - sessionMap.values().forEach(link -> closingSessions.add(link.isClosed())); + final Mono awaitSessionsClose; + if (useSessionChannelCache) { + // V2 with 'SessionCache,RequestResponseChannelCache' opted-in. + awaitSessionsClose = sessionCache.awaitClose(); + } else { + // V2 without 'SessionCache,RequestResponseChannelCache' opt-in or V1. + final ArrayList> closingSessions = new ArrayList<>(); + sessionMap.values().forEach(link -> closingSessions.add(link.isClosed())); + awaitSessionsClose = Mono.when(closingSessions); + } // We shouldn't need to add a timeout to this operation because executorCloseMono schedules its last // remaining work after OperationTimeout has elapsed and closes afterwards. @@ -572,7 +629,7 @@ private synchronized void closeConnectionWork() { // Close all the children and the ReactorExecutor. final Mono closeSessionAndExecutorMono - = Mono.when(closingSessions).timeout(operationTimeout).onErrorResume(error -> { + = awaitSessionsClose.timeout(operationTimeout).onErrorResume(error -> { logger.info("Timed out waiting for all sessions to close."); return Mono.empty(); }).then(closedExecutor).then(Mono.fromRunnable(() -> { @@ -591,11 +648,20 @@ private synchronized void closeConnectionWork() { private synchronized ClaimsBasedSecurityNode getOrCreateCBSNode() { if (cbsChannel == null) { logger.info("Setting CBS channel."); - cbsChannelProcessor = createRequestResponseChannel(CBS_SESSION_NAME, CBS_LINK_NAME, CBS_ADDRESS); - cbsChannel = new ClaimsBasedSecurityChannel(cbsChannelProcessor, connectionOptions.getTokenCredential(), - connectionOptions.getAuthorizationType(), connectionOptions.getRetry()); + if (useSessionChannelCache) { + // V2 with 'SessionCache,RequestResponseChannelCache' opted-in. + cbsChannelCache + = new RequestResponseChannelCache(this, CBS_ADDRESS, CBS_SESSION_NAME, CBS_LINK_NAME, retryPolicy); + cbsChannel + = new ClaimsBasedSecurityChannel(cbsChannelCache.get(), connectionOptions.getTokenCredential(), + connectionOptions.getAuthorizationType(), connectionOptions.getRetry()); + } else { + // V2 without 'SessionCache,RequestResponseChannelCache' opt-in or V1. + cbsChannelProcessor = createRequestResponseChannel(CBS_SESSION_NAME, CBS_LINK_NAME, CBS_ADDRESS); + cbsChannel = new ClaimsBasedSecurityChannel(cbsChannelProcessor, connectionOptions.getTokenCredential(), + connectionOptions.getAuthorizationType(), connectionOptions.getRetry()); + } } - return cbsChannel; } @@ -638,6 +704,21 @@ private synchronized Connection getOrCreateConnection() throws IOException { return connection; } + /** + * Sets the atomic flag indicating that this connection is disposed of. + * + * @return true if the flag is set for the first time, false if it was already set. + */ + private boolean setDisposed() { + final boolean firstDisposal = !isDisposed.getAndSet(true); + if (firstDisposal) { + sessionCache.setOwnerDisposed(); + return true; + } else { + return false; + } + } + /** * ReactorExceptionHandler handles exceptions that occur in the reactor. */ @@ -652,7 +733,7 @@ public void onConnectionError(Throwable exception) { .addKeyValue(FULLY_QUALIFIED_NAMESPACE_KEY, getFullyQualifiedNamespace()) .log("onConnectionError, Starting new reactor", exception); - if (!isDisposed.getAndSet(true)) { + if (setDisposed()) { logger.atVerbose() .addKeyValue(FULLY_QUALIFIED_NAMESPACE_KEY, getFullyQualifiedNamespace()) .log("onReactorError: Disposing."); @@ -667,7 +748,7 @@ void onConnectionShutdown(AmqpShutdownSignal shutdownSignal) { .addKeyValue(FULLY_QUALIFIED_NAMESPACE_KEY, getFullyQualifiedNamespace()) .log("onConnectionShutdown. Shutting down."); - if (!isDisposed.getAndSet(true)) { + if (setDisposed()) { logger.atVerbose() .addKeyValue(FULLY_QUALIFIED_NAMESPACE_KEY, getFullyQualifiedNamespace()) .log("onConnectionShutdown: disposing."); diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java index 86317730c41dd..8f9d30322fe29 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java @@ -13,9 +13,12 @@ import com.azure.core.amqp.AmqpTransaction; import com.azure.core.amqp.AmqpTransactionCoordinator; import com.azure.core.amqp.ClaimsBasedSecurityNode; +import com.azure.core.amqp.exception.AmqpErrorContext; import com.azure.core.amqp.exception.AmqpException; import com.azure.core.amqp.implementation.handler.SendLinkHandler; -import com.azure.core.amqp.implementation.handler.SessionHandler; +import com.azure.core.amqp.implementation.ProtonSession.ProtonChannel; +import com.azure.core.amqp.implementation.ProtonSession.ProtonSessionClosedException; +import com.azure.core.amqp.implementation.ProtonSessionWrapper.ProtonChannelWrapper; import com.azure.core.util.CoreUtils; import com.azure.core.util.logging.ClientLogger; import com.azure.core.util.logging.LoggingEventBuilder; @@ -27,10 +30,8 @@ import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.BaseHandler; -import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Sender; -import org.apache.qpid.proton.engine.Session; import reactor.core.Disposable; import reactor.core.Disposables; import reactor.core.publisher.Flux; @@ -44,6 +45,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.RejectedExecutionException; @@ -51,6 +53,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import static com.azure.core.amqp.exception.AmqpErrorCondition.TIMEOUT_ERROR; import static com.azure.core.amqp.implementation.AmqpConstants.CLIENT_IDENTIFIER; import static com.azure.core.amqp.implementation.AmqpConstants.CLIENT_RECEIVER_IDENTIFIER; import static com.azure.core.amqp.implementation.AmqpLoggingUtils.addErrorCondition; @@ -63,10 +66,14 @@ import static com.azure.core.util.FluxUtil.monoError; /** - * Represents an AMQP session using proton-j reactor. + * Represents an AMQP session using proton-j session {@link ProtonSession}. */ public class ReactorSession implements AmqpSession { private static final String TRANSACTION_LINK_NAME = "coordinator"; + private static final String ACTIVE_WAIT_TIMED_OUT + = "connectionId[%s] sessionName[%s] Timeout waiting for session to be active."; + private static final String COMPLETED_WITHOUT_ACTIVE + = "connectionId[%s] sessionName[%s] Session completed without being active."; private final ConcurrentMap> openSendLinks = new ConcurrentHashMap<>(); private final ConcurrentMap> openReceiveLinks = new ConcurrentHashMap<>(); @@ -83,8 +90,9 @@ public class ReactorSession implements AmqpSession { private final Flux endpointStates; private final AmqpConnection amqpConnection; - private final Session session; - private final SessionHandler sessionHandler; + // TODO (anu): When removing v1, use 'ProtonSession' directly instead of wrapper. + private final ProtonSessionWrapper protonSession; + private final Mono activeAwaiter; private final String sessionName; private final ReactorProvider provider; private final TokenManagerProvider tokenManagerProvider; @@ -104,10 +112,7 @@ public class ReactorSession implements AmqpSession { * Creates a new AMQP session using proton-j. * * @param amqpConnection AMQP connection associated with this session. - * @param session Proton-j session for this AMQP session. - * @param sessionHandler Handler for events that occur in the session. - * @param sessionName Name of the session. - * @param provider Provides reactor instances for messages to sent with. + * @param protonSession Proton-j session for this AMQP session. * @param handlerProvider Providers reactor handlers for listening to proton-j reactor events. * @param linkProvider Provides AMQP links that are created from proton-j links. * @param cbsNodeSupplier Mono that returns a reference to the {@link ClaimsBasedSecurityNode}. @@ -116,16 +121,15 @@ public class ReactorSession implements AmqpSession { * @param messageSerializer Serializes and deserializes proton-j messages. * @param retryOptions for the session operations. */ - public ReactorSession(AmqpConnection amqpConnection, Session session, SessionHandler sessionHandler, - String sessionName, ReactorProvider provider, ReactorHandlerProvider handlerProvider, - AmqpLinkProvider linkProvider, Mono cbsNodeSupplier, - TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer, AmqpRetryOptions retryOptions) { + public ReactorSession(AmqpConnection amqpConnection, ProtonSessionWrapper protonSession, + ReactorHandlerProvider handlerProvider, AmqpLinkProvider linkProvider, + Mono cbsNodeSupplier, TokenManagerProvider tokenManagerProvider, + MessageSerializer messageSerializer, AmqpRetryOptions retryOptions) { this.amqpConnection = amqpConnection; - this.session = session; - this.sessionHandler = sessionHandler; + this.protonSession = protonSession; this.handlerProvider = handlerProvider; - this.sessionName = sessionName; - this.provider = provider; + this.sessionName = protonSession.getName(); + this.provider = protonSession.getReactorProvider(); this.linkProvider = linkProvider; this.cbsNodeSupplier = cbsNodeSupplier; this.tokenManagerProvider = tokenManagerProvider; @@ -133,12 +137,12 @@ public ReactorSession(AmqpConnection amqpConnection, Session session, SessionHan this.retryOptions = retryOptions; this.activeTimeoutMessage = String.format( "ReactorSession connectionId[%s], session[%s]: Retries exhausted waiting for ACTIVE endpoint state.", - sessionHandler.getConnectionId(), sessionName); + protonSession.getConnectionId(), sessionName); - this.logger = new ClientLogger(ReactorSession.class, - createContextWithConnectionId(this.sessionHandler.getConnectionId())); + this.logger + = new ClientLogger(ReactorSession.class, createContextWithConnectionId(protonSession.getConnectionId())); - this.endpointStates = sessionHandler.getEndpointStates().map(state -> { + this.endpointStates = protonSession.getEndpointStates().map(state -> { logger.atVerbose() .addKeyValue(SESSION_NAME_KEY, sessionName) .addKeyValue("state", state) @@ -152,11 +156,44 @@ public ReactorSession(AmqpConnection amqpConnection, Session session, SessionHan subscriptions .add(shutdownSignals.flatMap(signal -> closeAsync("Shutdown signal received", null, false)).subscribe()); - session.open(); + final boolean isV1OrV2WithoutSessionCache = !protonSession.isV2ClientOnSessionCache(); + if (isV1OrV2WithoutSessionCache) { + // TODO (anu): delete openUnsafe() when removing v1 and 'SessionCache' (hence 'ProtonSession') is no longer + // opt-in for v2. + protonSession.openUnsafe(logger); + } + this.activeAwaiter = activeAwaiter(protonSession, retryOptions.getTryTimeout(), endpointStates); } - Session session() { - return this.session; + /** + * Open the session. + *

+ * The session open attempt is made upon the first subscription, i.e. there is an open-only-once semantics. + * Later subscriptions only trigger the session active check (i.e., checks if the session is still connected), + * if not, an error will be returned. + *

+ * + * @return the Mono that completes once the session is opened and active. + */ + final Mono open() { + return Mono.when(protonSession.open(), activeAwaiter).thenReturn(this); + } + + /** + * Create a channel on the session for sending and receiving messages. + * + * @param name the channel name. + * @return the Mono that completes with created {@link ProtonChannel}. + */ + final Mono channel(String name) { + // TODO (anu): return Mono of 'ProtonChannel' when removing v1 and 'SessionCache' (hence 'ProtonSession') is + // no longer opt-in for v2. + return protonSession.channel(name, retryOptions.getTryTimeout()); + } + + final ProtonSessionWrapper session() { + // Exposed only for testing. + return protonSession; } @Override @@ -299,7 +336,7 @@ public Mono getOrCreateTransactionCoordina if (isDisposed()) { return monoError(logger.atWarning().addKeyValue(SESSION_NAME_KEY, sessionName), new AmqpException(true, String.format("Cannot create coordinator send link %s from a closed session.", TRANSACTION_LINK_NAME), - sessionHandler.getErrorContext())); + getErrorContext())); } final TransactionCoordinator existing = transactionCoordinator.get(); @@ -357,12 +394,16 @@ protected Mono createConsumer(String linkName, String entityPat .addKeyValue(ENTITY_PATH_KEY, entityPath) .addKeyValue(LINK_NAME_KEY, linkName); - return monoError(logBuilder, new AmqpException(true, "Cannot create receive link from a closed session.", - sessionHandler.getErrorContext())); + return monoError(logBuilder, + new AmqpException(true, "Cannot create receive link from a closed session.", getErrorContext())); } final LinkSubscription existingLink = openReceiveLinks.get(linkName); if (existingLink != null) { + final ProtonSessionClosedException error = existingLink.getError(); + if (error != null) { + return Mono.error(error); + } logger.atInfo() .addKeyValue(LINK_NAME_KEY, linkName) .addKeyValue(ENTITY_PATH_KEY, entityPath) @@ -398,7 +439,12 @@ protected Mono createConsumer(String linkName, String entityPat consumerFactory); }); - sink.success(computed.getLink()); + final ProtonSessionClosedException error = computed.getError(); + if (error != null) { + sink.error(error); + } else { + sink.success(computed.getLink()); + } }); } catch (IOException | RejectedExecutionException e) { sink.error(e); @@ -447,12 +493,16 @@ private Mono createProducer(String linkName, String entityPath, .addKeyValue(ENTITY_PATH_KEY, entityPath) .addKeyValue(LINK_NAME_KEY, linkName); - return monoError(logBuilder, new AmqpException(true, "Cannot create send link from a closed session.", - sessionHandler.getErrorContext())); + return monoError(logBuilder, + new AmqpException(true, "Cannot create send link from a closed session.", getErrorContext())); } final LinkSubscription existing = openSendLinks.get(linkName); if (existing != null) { + final ProtonSessionClosedException error = existing.getError(); + if (error != null) { + return Mono.error(error); + } logger.atVerbose().addKeyValue(LINK_NAME_KEY, linkName).log("Returning existing send link."); return Mono.just(existing.getLink()); } @@ -493,7 +543,12 @@ private Mono createProducer(String linkName, String entityPath, return getSubscription(linkName, entityPath, target, linkProperties, options, tokenManager); }); - sink.success(computed.getLink()); + final ProtonSessionClosedException error = computed.getError(); + if (error != null) { + sink.error(error); + } else { + sink.success(computed.getLink()); + } }); } catch (IOException | RejectedExecutionException e) { sink.error(e); @@ -508,7 +563,13 @@ private LinkSubscription getSubscription(String linkName, String e org.apache.qpid.proton.amqp.transport.Target target, Map linkProperties, AmqpRetryOptions options, TokenManager tokenManager) { - final Sender sender = session.sender(linkName); + final Sender sender; + try { + sender = protonSession.senderUnsafe(linkName); + } catch (ProtonSessionClosedException e) { + // The only time Exception can be thrown is in v2 mode with ProtonSession opted-in. + return new LinkSubscription<>(e); + } sender.setTarget(target); sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); @@ -523,8 +584,8 @@ private LinkSubscription getSubscription(String linkName, String e } sender.setSource(source); - final SendLinkHandler sendLinkHandler = handlerProvider.createSendLinkHandler(sessionHandler.getConnectionId(), - sessionHandler.getHostname(), linkName, entityPath); + final SendLinkHandler sendLinkHandler = handlerProvider.createSendLinkHandler(protonSession.getConnectionId(), + protonSession.getHostname(), linkName, entityPath); BaseHandler.setHandler(sender, sendLinkHandler); sender.open(); @@ -536,6 +597,9 @@ private LinkSubscription getSubscription(String linkName, String e //@formatter:off final Disposable subscription = reactorSender.getEndpointStates().subscribe(state -> { }, error -> { + // If the session is already disposing of, all links would be discarded. In this case, don't remove + // the link from the local map, this helps to prevent downstream link recreation attempts while + // session cleanup is running, if (!isDisposed.get()) { removeLink(openSendLinks, linkName); } @@ -552,7 +616,7 @@ private LinkSubscription getSubscription(String linkName, String e return new LinkSubscription<>(reactorSender, subscription, String.format("connectionId[%s] session[%s]: Setting error on receive link.", - sessionHandler.getConnectionId(), sessionName)); + protonSession.getConnectionId(), sessionName)); } /** @@ -563,7 +627,13 @@ private LinkSubscription getSubscription(String linkName, Strin SenderSettleMode senderSettleMode, ReceiverSettleMode receiverSettleMode, TokenManager tokenManager, ConsumerFactory consumerFactory) { - final Receiver receiver = session.receiver(linkName); + final Receiver receiver; + try { + receiver = protonSession.receiverUnsafe(linkName); + } catch (ProtonSessionClosedException e) { + // The only time Exception can be thrown is in v2 mode with ProtonSession opted-in. + return new LinkSubscription<>(e); + } final Source source = new Source(); source.setAddress(entityPath); @@ -612,8 +682,9 @@ private LinkSubscription getSubscription(String linkName, Strin } }); - return new LinkSubscription<>(reactorReceiver, subscription, String.format( - "connectionId[%s] sessionName[%s]: Setting error on receive link.", amqpConnection.getId(), sessionName)); + return new LinkSubscription<>(reactorReceiver, subscription, + String.format("connectionId[%s] sessionName[%s]: Setting error on receive link.", + protonSession.getConnectionId(), sessionName)); } /** @@ -626,8 +697,8 @@ private Mono onClosedError(String message, String linkName, String entity return Mono.firstWithSignal(isClosedMono.asMono(), shutdownSignals.next()) .then(Mono.error(new AmqpException(false, String.format("connectionId[%s] entityPath[%s] linkName[%s] Connection closed. %s", - sessionHandler.getConnectionId(), entityPath, linkName, message), - sessionHandler.getErrorContext()))); + protonSession.getConnectionId(), entityPath, linkName, message), + getErrorContext()))); } /** @@ -677,13 +748,7 @@ private void handleError(Throwable error) { * when the {@link AmqpConnection} passes a shutdown signal. */ private void disposeWork(ErrorCondition errorCondition, boolean disposeLinks) { - if (session.getLocalState() != EndpointState.CLOSED) { - session.close(); - - if (errorCondition != null && session.getCondition() == null) { - session.setCondition(errorCondition); - } - } + protonSession.beginClose(errorCondition); final ArrayList> closingLinks = new ArrayList<>(); if (disposeLinks) { @@ -721,7 +786,7 @@ private void disposeWork(ErrorCondition errorCondition, boolean disposeLinks) { return false; }); - sessionHandler.close(); + protonSession.endClose(); subscriptions.dispose(); })); @@ -743,24 +808,71 @@ private boolean removeLink(ConcurrentMap + * When a subscription is made to the mono, it verifies the most recent state reported by the {@code endpointStates} + * flux. The state can transition from none to active to error or completed Or it can also change from none to error + * or completed directly. If the state is not active yet, then the subscription waits for {@code tryTimeout} before + * timing out with a retriable {@link AmqpException}. If the state is error, then the error is returned, if the state + * transition to completed then a {@link AmqpException} that can be retried is returned. + *

+ * + * @param protonSession the underlying {@link ProtonSession}. + * @param tryTimeout the duration to wait for the session to be active. + * @param endpointStates the flux streaming session endpoint states. + * @return a mono that completes when the session is active. + */ + private static Mono activeAwaiter(ProtonSessionWrapper protonSession, Duration tryTimeout, + Flux endpointStates) { + final String connectionId = protonSession.getConnectionId(); + final String sessionName = protonSession.getName(); + // session-active-timeout = session-open-timeout + 2 seconds buffer. (anu - double check buffer need?) + final Duration activeTimeout = tryTimeout.plusSeconds(2); + return endpointStates.filter(state -> state == AmqpEndpointState.ACTIVE).next().switchIfEmpty(Mono.defer(() -> { + final String message = String.format(COMPLETED_WITHOUT_ACTIVE, connectionId, sessionName); + return Mono.error(new AmqpException(true, message, protonSession.getErrorContext())); + })).timeout(activeTimeout, Mono.error(() -> { + final String message = String.format(ACTIVE_WAIT_TIMED_OUT, connectionId, sessionName); + return new AmqpException(true, TIMEOUT_ERROR, message, protonSession.getErrorContext()); + })).then(); + } + private static final class LinkSubscription { private final AtomicBoolean isDisposed = new AtomicBoolean(); private final T link; private final Disposable subscription; private final String errorMessage; + private final ProtonSessionClosedException error; private LinkSubscription(T link, Disposable subscription, String errorMessage) { this.link = link; this.subscription = subscription; this.errorMessage = errorMessage; + this.error = null; + } + + private LinkSubscription(ProtonSessionClosedException error) { + this.link = null; + this.subscription = null; + this.errorMessage = null; + this.error = Objects.requireNonNull(error, "'error' cannot be null."); } public T getLink() { return link; } + ProtonSessionClosedException getError() { + return error; + } + Mono closeAsync(ErrorCondition errorCondition) { - if (isDisposed.getAndSet(true)) { + if (isDisposed.getAndSet(true) || error != null) { return Mono.empty(); } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSessionCache.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSessionCache.java new file mode 100644 index 0000000000000..4b3e93534d7b0 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSessionCache.java @@ -0,0 +1,264 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.implementation; + +import com.azure.core.util.logging.ClientLogger; +import org.apache.qpid.proton.engine.Connection; +import reactor.core.Disposable; +import reactor.core.publisher.Mono; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; + +import static com.azure.core.amqp.implementation.ClientConstants.SESSION_NAME_KEY; + +/** + * A cache of {@link ReactorSession} instances owned by a {@link ReactorConnection}. + */ +final class ReactorSessionCache { + private final ConcurrentMap entries = new ConcurrentHashMap<>(); + private final String fullyQualifiedNamespace; + private final String connectionId; + private final ReactorHandlerProvider handlerProvider; + private final ReactorProvider reactorProvider; + private final Duration openTimeout; + private final AtomicBoolean isOwnerDisposed; + private final ClientLogger logger; + + /** + * Creates the cache. + * + * @param connectionId the id of the {@link ReactorConnection} owning the cache. + * @param fullyQualifiedNamespace the host name of the broker that the owner is connected to. + * @param handlerProvider the handler provider for various type of endpoints (session, link). + * @param reactorProvider the provider for reactor dispatcher to dispatch work to QPid Reactor thread. + * @param openTimeout the session open timeout. + * @param logger the client logger. + */ + ReactorSessionCache(String connectionId, String fullyQualifiedNamespace, ReactorHandlerProvider handlerProvider, + ReactorProvider reactorProvider, Duration openTimeout, ClientLogger logger) { + this.fullyQualifiedNamespace = fullyQualifiedNamespace; + this.connectionId = connectionId; + this.handlerProvider = handlerProvider; + this.reactorProvider = reactorProvider; + this.openTimeout = openTimeout; + this.isOwnerDisposed = new AtomicBoolean(false); + this.logger = logger; + } + + /** + * Obtain the session with the given name from the cache, first loading and opening the session if necessary. + *

+ * The session returned from the cache will be already connected to the broker and ready to use. + *

+ *

+ * A session will be evicted from the cache if it terminates (e.g., broker disconnected the session). + *

+ * + * @param connectionMono the Mono that emits QPid Proton-j {@link Connection} that host the session. + * @param name the session name. + * @param loader to load the session on cache miss, cache miss can happen if session is requested + * for the first time or previously loaded one was evicted. + * + * @return the session, that is active and connected to the broker. + */ + Mono getOrLoad(Mono connectionMono, String name, Loader loader) { + final Mono entryMono = connectionMono.map(connection -> { + return entries.computeIfAbsent(name, sessionName -> { + final ReactorSession session = load(connection, sessionName, loader); + final Disposable disposable = setupAutoEviction(session); + return new Entry(session, disposable); + }); + }); + return entryMono.flatMap(entry -> { + final ReactorSession session = entry.getSession(); + return session.open() + .doOnError(error -> evict(session, "Evicting failed to open or in-active session.", error)); + // + // Notes on session.open(): + // + // 'ReactorSession::open()' has open-only-once semantics, where the open attempt (i.e., the internal call + // to org.apache.qpid.proton.engine.Session::open()) is triggered upon the first subscription that loads + // session into this cache. The one time internal open call will be executed on the QPid Reactor thread. + // + // Later subscriptions only trigger the session active check (i.e., checks if the session is still + // connected to the broker). + // + // For both first and later subscriptions, the 'ReactorSession::open()' will return only after the session + // is active, if the session is not active within the timeout configured via AmqpRetryOptions::tryTimeout, + // then the API will fail with timeout error. + }); + // + // Notes on eviction: + // + // 1. If the session is disconnected after the successful open, the auto-eviction that was set up + // (via 'setupAutoEviction') when the session was loaded will take care of the eviction. + // 2. If the initial open attempt itself fails or if the session transition to connected (active) state + // fails with time out, then 'doOnError' (via 'evict') will take care of the eviction. + } + + /** + * Evicts the session from the cache. + * + * @param name the name of the session to evict. + * @return true if the session was evicted, false if no session found with the given name. + */ + boolean evict(String name) { + if (name == null) { + return false; + } + final Entry removed = entries.remove(name); + if (removed != null) { + removed.dispose(); + } + return removed != null; + } + + /** + * Signal that the owner ({@link ReactorConnection}) of the cache is disposed of. + */ + void setOwnerDisposed() { + isOwnerDisposed.set(true); + } + + /** + * When the owner {@link ReactorConnection} is being disposed of, all {@link ReactorSession} loaded into the cache + * will receive shutdown signal through the channel established at ReactorSession's construction time, the owner + * may use this method to waits for sessions to complete it closing. + * + * @return a Mono that completes when all sessions are closed via owner shutdown signaling. + */ + Mono awaitClose() { + final ArrayList> closing = new ArrayList<>(entries.size()); + for (Entry entry : entries.values()) { + closing.add(entry.awaitSessionClose()); + } + return Mono.when(closing); + } + + /** + * Load a new {@link ReactorSession} to be cached. + * + * @param connection the QPid Proton-j connection to host the session. + * @param name the session name. + * @param loader the function to load the session. + * + * @return the session to cache. + */ + private ReactorSession load(Connection connection, String name, Loader loader) { + final ProtonSession protonSession = new ProtonSession(connectionId, fullyQualifiedNamespace, connection, + handlerProvider, reactorProvider, name, openTimeout, logger); + // TODO (anu): Update loader signature to use 'ProtonSession' instead of 'ProtonSessionWrapper' when removing v1. + return loader.load(new ProtonSessionWrapper(protonSession)); + } + + /** + * Register to evict the session from the cache when the session terminates. + * + * @param session the session to register for cache eviction. + * @return the registration disposable. + */ + private Disposable setupAutoEviction(ReactorSession session) { + return session.getEndpointStates().subscribe(__ -> { + }, error -> { + evict(session, "Evicting session terminated with error.", error); + }, () -> { + evict(session, "Evicting terminated session.", null); + }); + } + + /** + * Attempt to evict the session from the cache. + * + * @param session the session to evict. + * @param message the message to log on eviction. + * @param error the error triggered the eviction. + */ + private void evict(ReactorSession session, String message, Throwable error) { + if (isOwnerDisposed.get()) { + // If (owner) connection is already disposing of, all session(s) would be discarded. Which means the whole + // cache itself would be discarded. In this case, don't evict the individual sessions from the cache, this + // helps to prevent session recreation attempts by downstream while connection cleanup is running. + return; + } + final String name = session.getSessionName(); + if (error != null) { + logger.atInfo().addKeyValue(SESSION_NAME_KEY, name).log(message, error); + } else { + logger.atInfo().addKeyValue(SESSION_NAME_KEY, name).log(message); + } + evict(name); + } + + /** + * Type to load a {@link ReactorSession} for caching it. + */ + @FunctionalInterface + interface Loader { + /** + * Load a {@link ReactorSession} for caching. + * + * @param protonSession the {@link ProtonSession} to back the loaded {@link ReactorSession}. + *

+ * TODO (anu): When removing v1, update signature to use 'ProtonSession' instead of wrapper. + *

+ * + * @return the session to cache. + */ + ReactorSession load(ProtonSessionWrapper protonSession); + } + + /** + * An entry in the cache holding {@link ReactorSession} and {@link Disposable} for the task to evict the entry + * from the cache. + */ + private static final class Entry extends AtomicBoolean { + private final ReactorSession session; + private final Disposable disposable; + + /** + * Creates a cache entry. + * + * @param session the session to cache. + * @param disposable the disposable to evict the session from the cache. + */ + private Entry(ReactorSession session, Disposable disposable) { + super(false); + this.session = session; + this.disposable = disposable; + } + + /** + * Gets the session cached in the entry. + * + * @return the session. + */ + private ReactorSession getSession() { + return session; + } + + /** + * Await for the cached session to close. + * + * @return a Mono that completes when the session is closed. + */ + private Mono awaitSessionClose() { + return session.isClosed(); + } + + /** + * Dispose of the cached session and the eviction disposable. + */ + private void dispose() { + if (super.getAndSet(true)) { + return; + } + session.closeAsync("closing session.", null, true).subscribe(); + disposable.dispose(); + } + } +} diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannel.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannel.java index e5daac8c17b05..3cefba230f0d9 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannel.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannel.java @@ -13,6 +13,7 @@ import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler; import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler2; import com.azure.core.amqp.implementation.handler.SendLinkHandler; +import com.azure.core.amqp.implementation.ProtonSessionWrapper.ProtonChannelWrapper; import com.azure.core.util.AsyncCloseable; import com.azure.core.util.logging.ClientLogger; import org.apache.qpid.proton.Proton; @@ -28,7 +29,6 @@ import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Sender; -import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.engine.impl.DeliveryImpl; import org.apache.qpid.proton.message.Message; import reactor.core.Disposable; @@ -125,9 +125,8 @@ public class RequestResponseChannel implements AsyncCloseable { * @param amqpConnection AMQP connection associated with this request response channel. * @param connectionId Identifier of the connection. * @param fullyQualifiedNamespace Fully qualified namespace for the the host. - * @param linkName Name of the link. * @param entityPath Address in the message broker to send message to. - * @param session Reactor session associated with this link. + * @param channel Channel composing QPid Proton-j sender link and receiver link. * @param retryOptions Retry options to use for sending the request response. * @param handlerProvider Provides handlers that interact with proton-j's reactor. * @param provider The reactor provider that the request will be sent with. @@ -140,11 +139,12 @@ public class RequestResponseChannel implements AsyncCloseable { * @throws RuntimeException if the send/receive links could not be locally scheduled to open. */ protected RequestResponseChannel(AmqpConnection amqpConnection, String connectionId, String fullyQualifiedNamespace, - String linkName, String entityPath, Session session, AmqpRetryOptions retryOptions, + String entityPath, ProtonChannelWrapper channel, AmqpRetryOptions retryOptions, ReactorHandlerProvider handlerProvider, ReactorProvider provider, MessageSerializer messageSerializer, SenderSettleMode senderSettleMode, ReceiverSettleMode receiverSettleMode, AmqpMetricsProvider metricsProvider, boolean isV2) { + final String linkName = channel.getName(); Map loggingContext = createContextWithConnectionId(connectionId); loggingContext.put(LINK_NAME_KEY, linkName); this.logger = new ClientLogger(RequestResponseChannel.class, loggingContext); @@ -161,7 +161,7 @@ protected RequestResponseChannel(AmqpConnection amqpConnection, String connectio this.messageSerializer = messageSerializer; // Setup send (request) link. - this.sendLink = session.sender(linkName + ":sender"); + this.sendLink = channel.sender(); final Target senderTarget = new Target(); senderTarget.setAddress(entityPath); this.sendLink.setTarget(senderTarget); @@ -173,7 +173,7 @@ protected RequestResponseChannel(AmqpConnection amqpConnection, String connectio BaseHandler.setHandler(sendLink, sendLinkHandler); // Setup receive (response) link. - this.receiveLink = session.receiver(linkName + ":receiver"); + this.receiveLink = channel.receiver(); final Source receiverSource = new Source(); receiverSource.setAddress(entityPath); this.receiveLink.setSource(receiverSource); diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannelCache.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannelCache.java index 465ce20fbb02b..08adfc6546d12 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannelCache.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannelCache.java @@ -9,6 +9,7 @@ import com.azure.core.util.logging.ClientLogger; import reactor.core.Disposable; import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; import reactor.util.retry.Retry; import java.time.Duration; @@ -18,6 +19,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeoutException; +import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST; import static com.azure.core.amqp.implementation.ClientConstants.CALL_SITE_KEY; import static com.azure.core.amqp.implementation.ClientConstants.CONNECTION_ID_KEY; import static com.azure.core.amqp.implementation.ClientConstants.INTERVAL_KEY; @@ -38,17 +40,16 @@ public final class RequestResponseChannelCache implements Disposable { private static final String IS_CACHE_TERMINATED_KEY = "isCacheTerminated"; private static final String IS_CONNECTION_TERMINATED_KEY = "isConnectionTerminated"; private static final String TRY_COUNT_KEY = "tryCount"; - + private final Sinks.Empty isClosedMono = Sinks.empty(); private final ClientLogger logger; private final ReactorConnection connection; private final Duration activationTimeout; private final Mono createOrGetCachedChannel; - private final Object lock = new Object(); private volatile boolean terminated; // Note: The only reason to have below 'currentChannel' is to close the cached RequestResponseChannel internally - // upon 'RequestResponseChannelCache' termination (via dispose()). We must never expose 'currentChannel' variable to - // any dependent type; instead, the dependent type must acquire RequestResponseChannel only through the cache route, + // upon 'RequestResponseChannelCache' termination (via dispose()). Type must never expose 'currentChannel' variable + // to any dependent type; instead, the dependent type must acquire RequestResponseChannel through the cache route, // i.e., by subscribing to 'createOrGetCachedChannel' via 'get()' getter. private volatile RequestResponseChannel currentChannel; @@ -60,13 +61,9 @@ public final class RequestResponseChannelCache implements Disposable { Objects.requireNonNull(linksName, "'linksName' cannot be null."); Objects.requireNonNull(retryPolicy, "'retryPolicy' cannot be null."); - // for correlation purpose, the cache and any RequestResponseChannel it caches uses the same loggingContext. - // E.g, - // { "connectionId": "MF_0f4c2e_1680070221023" "linkName": 'cbs' or '{entityPath}-mgmt' (e.g. 'q0-mgmt', - // 'tx-mgmt') } final Map loggingContext = new HashMap<>(2); - loggingContext.put(CONNECTION_ID_KEY, connection.getId()); - loggingContext.put(LINK_NAME_KEY, linksName); + loggingContext.put(CONNECTION_ID_KEY, connection.getId()); // E.g., 'MF_0f4c2e_1680070221023' + loggingContext.put(LINK_NAME_KEY, linksName); // E.g., 'cbs', '{entityPath}-mgmt' (E.g., 'q0-mgmt') this.logger = new ClientLogger(RequestResponseChannelCache.class, loggingContext); this.connection = connection; @@ -75,51 +72,32 @@ public final class RequestResponseChannelCache implements Disposable { final Mono newChannel = Mono.defer(() -> { final RecoveryTerminatedException terminatedError = checkRecoveryTerminated("new-channel"); if (terminatedError != null) { + // 'retryWhenSpec' function inspects 'RecoveryTerminatedException' and propagated to downstream as + // 'RequestResponseChannelClosedException' return Mono.error(terminatedError); } return connection.newRequestResponseChannel(sessionName, linksName, entityPath); }); this.createOrGetCachedChannel = newChannel.flatMap(c -> { - logger.atInfo().log("Waiting for channel to active."); - - final Mono awaitToActive = c.getEndpointStates() - .filter(s -> s == AmqpEndpointState.ACTIVE) - .next() - .switchIfEmpty( - Mono.error(() -> new AmqpException(true, "Channel completed without being active.", null))) - .then(Mono.just(c)) - .timeout(activationTimeout, Mono.defer(() -> { - final String timeoutMessage - = String.format("The channel activation wait timed-out (%s).", activationTimeout); - logger.atInfo().log(timeoutMessage + " Closing channel."); - return c.closeAsync().then(Mono.error(new AmqpException(true, timeoutMessage, null))); - })); - - return awaitToActive.doOnCancel(() -> { - logger.atInfo().log("The channel request was canceled while waiting to active."); - if (!c.isDisposed()) { - c.closeAsync().subscribe(); - } - }); + return awaitToActive(c, activationTimeout, logger); }).retryWhen(retryWhenSpec(retryPolicy)).handle((c, sink) -> { - final RequestResponseChannel channel = c; final RecoveryTerminatedException terminatedError; synchronized (lock) { - // Synchronize this {terminated-read, currentChannel-write} block in cache-refresh route with - // {terminated-write, currentChannel-read} block in dispose() route, to guard against channel leak - // (i.e. missing close) if the cache-refresh and dispose routes runs concurrently. + // Here in 'cache-refresh route', the {terminated-read, currentChannel-write} block is synchronized with + // {terminated-write, currentChannel-read} block in 'close route'. This synchronization ensure channel + // is not leaked (i.e. missing close) if the 'cache-refresh route' and 'close route' runs concurrently. terminatedError = checkRecoveryTerminated("cache-refresh"); - this.currentChannel = channel; + this.currentChannel = c; } if (terminatedError != null) { - if (!channel.isDisposed()) { - channel.closeAsync().subscribe(); + if (!c.isDisposed()) { + c.closeAsync().subscribe(); } sink.error(terminatedError.propagate()); } else { logger.atInfo().log("Emitting the new active channel."); - sink.next(channel); + sink.next(c); } }).cacheInvalidateIf(c -> { if (c.isDisposedOrDisposalInInProgress()) { @@ -144,26 +122,42 @@ public Mono get() { } /** - * Terminate the cache such that it is no longer possible to obtain RequestResponseChannel using {@link this#get()}. + * Terminate the cache such that it is no longer possible to obtain RequestResponseChannel using {@link #get()}. * If there is a current (cached) RequestResponseChannel then it will be closed. */ @Override public void dispose() { - final RequestResponseChannel channel; + closeAsync().subscribe(); + } + + /** + * Terminate the cache such that it is no longer possible to obtain RequestResponseChannel using {@link this#get()}. + * If there is a current (cached) RequestResponseChannel then it will be closed. + * + * @return a Mono that completes when the cache is terminated. + */ + Mono closeAsync() { + final RequestResponseChannel cached; synchronized (lock) { if (terminated) { - return; + return isClosedMono.asMono(); } terminated = true; - channel = currentChannel; + cached = currentChannel; } - if (channel != null && !channel.isDisposed()) { - logger.atInfo().log("Closing the cached channel and Terminating the channel recovery support."); - channel.closeAsync().subscribe(); + if (cached == null || cached.isDisposed()) { + logger.atInfo().log("closing the channel-cache."); + isClosedMono.emitEmpty(FAIL_FAST); } else { - logger.atInfo().log("Terminating the channel recovery support."); + cached.closeAsync().doOnEach(signal -> { + if (signal.isOnError() || signal.isOnComplete()) { + logger.atInfo().log("closing the cached channel and the channel-cache."); + isClosedMono.emitEmpty(FAIL_FAST); + } + }); } + return isClosedMono.asMono(); } @Override @@ -231,11 +225,11 @@ private Retry retryWhenSpec(AmqpRetryPolicy retryPolicy) { * Check if this cache is in a state where the cache refresh (i.e. recovery of RequestResponseChannel) is no longer * possible. *

- * The recovery mechanism is terminated once the cache is terminated due to {@link RequestResponseChannelCache#dispose()} - * call or the parent {@link ReactorConnection} is in terminated state. - * Since the parent {@link ReactorConnection} hosts any RequestResponseChannel object that RequestResponseChannelCache + * The recovery mechanism is terminated once the cache is terminated due to {@link #dispose()} or + * {@link #closeAsync()} call or the parent {@link ReactorConnection} is in terminated state. + * Since the parent {@link ReactorConnection} hosts any RequestResponseChannel that RequestResponseChannelCache * caches, recovery (scoped to the Connection) is impossible once the Connection is terminated - * (i.e. connection.isDisposed() == true). Which also means RequestResponseChannelCache cannot outlive the Connection. + * (i.e. connection.isDisposed() == true). This also means RequestResponseChannelCache cannot outlive the Connection. * * @param callSite the call site checking the recovery termination (for logging). * @return {@link RecoveryTerminatedException} if the recovery is terminated, {@code null} otherwise. @@ -254,9 +248,48 @@ private RecoveryTerminatedException checkRecoveryTerminated(String callSite) { return null; } + /** + * Wait for the channel to be active with a timeout. + *

+ * If the activation timeout or if the channel state transition to completed without being active, then an error + * will be raised. + *

+ *

+ * This API will close the channel if it times out or downstream cancels before becoming active. If the channel + * state completes without being active, this API will not try to close the channel, since completion signal means + * it is already closed (See self-close call in RequestResponseChannel endpointStates error and completion handler). + *

+ * @param channel the channel to await to be active. + * @param timeout the activation timeout. + * @param logger the logger. + * @return the channel that is active. + */ + private static Mono awaitToActive(RequestResponseChannel channel, Duration timeout, + ClientLogger logger) { + logger.atInfo().log("Waiting for channel to active."); + return channel.getEndpointStates() + .filter(s -> s == AmqpEndpointState.ACTIVE) + .next() + .switchIfEmpty(Mono.error(() -> new AmqpException(true, "Channel completed without being active.", null))) + .timeout(timeout, Mono.defer(() -> { + final String timeoutMessage = "Timeout waiting for channel to be active"; + logger.atInfo().addKeyValue("timeout", timeout).log(timeoutMessage); + final AmqpException timeoutError = new AmqpException(true, timeoutMessage + " (" + timeout + ")", null); + return channel.closeAsync().then(Mono.error(timeoutError)); + })) + .doOnCancel(() -> { + logger.atInfo().log("The channel request was canceled while waiting to active."); + if (!channel.isDisposed()) { + channel.closeAsync().subscribe(); + } + }) + .thenReturn(channel); + } + /** * The error type (internal to the cache) representing the termination of recovery support, which means cache cannot * be refreshed any longer. + * * @See {@link RequestResponseChannelCache#checkRecoveryTerminated(String)}. */ private static final class RecoveryTerminatedException extends RuntimeException { diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SessionHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SessionHandler.java index eee9eaee0feda..36ed115a13f95 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SessionHandler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SessionHandler.java @@ -51,6 +51,15 @@ public SessionHandler(String connectionId, String hostname, String sessionName, this.metricsProvider = metricProvider; } + /** + * Gets the name of the session. + * + * @return the session name. + */ + public String getSessionName() { + return sessionName; + } + /** * Gets the error context of the session. * diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ManagementChannelTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ManagementChannelTest.java index 13395d1007de8..42a447d1e8938 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ManagementChannelTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ManagementChannelTest.java @@ -104,7 +104,8 @@ public void setup(TestInfo testInfo) { when(requestResponseChannel.getErrorContext()).thenReturn(errorContext); when(requestResponseChannel.getEndpointStates()).thenReturn(Flux.never()); - managementChannel = new ManagementChannel(requestResponseMono, NAMESPACE, ENTITY_PATH, tokenManager); + ChannelCacheWrapper channelCache = new ChannelCacheWrapper(requestResponseMono); + managementChannel = new ManagementChannel(channelCache, NAMESPACE, ENTITY_PATH, tokenManager); } @AfterEach diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ProtonSessionTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ProtonSessionTest.java new file mode 100644 index 0000000000000..4f14061fa9441 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ProtonSessionTest.java @@ -0,0 +1,232 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.implementation; + +import com.azure.core.amqp.exception.AmqpException; +import com.azure.core.amqp.implementation.handler.SessionHandler; +import com.azure.core.amqp.implementation.ProtonSession.ProtonSessionClosedException; +import com.azure.core.util.logging.ClientLogger; +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Record; +import org.apache.qpid.proton.engine.Session; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import reactor.test.StepVerifier; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.RejectedExecutionException; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public final class ProtonSessionTest { + private AutoCloseable mocksCloseable; + + @BeforeEach + void session() { + mocksCloseable = MockitoAnnotations.openMocks(this); + } + + @AfterEach + void teardown() throws Exception { + Mockito.framework().clearInlineMock(this); + + if (mocksCloseable != null) { + mocksCloseable.close(); + } + } + + @Test + void shouldOpenQPidSession() { + final Session qpidSession = mock(Session.class); + doNothing().when(qpidSession).open(); + + final Setup setup = Setup.create(qpidSession); + final ProtonSession session = setup.getSession(); + + StepVerifier.create(session.open()).verifyComplete(); + + StepVerifier.create(session.open()).verifyComplete(); + + // assert the open-only-once semantics. + verify(qpidSession, times(1)).open(); + Assertions.assertEquals(1, setup.getDispatchCount()); + } + + @Test + void shouldThrowQPidOpenError() { + final Session qpidSession = mock(Session.class); + final RejectedExecutionException dispatchError = new RejectedExecutionException("QPid Reactor was disposed."); + final Map dispatchErrors = new HashMap<>(1); + dispatchErrors.put(0, dispatchError); + + final Setup setup = Setup.create(qpidSession, dispatchErrors); + final ProtonSession session = setup.getSession(); + + // assert the dispatch error on initial 'open' attempt. + StepVerifier.create(session.open()).verifyErrorSatisfies(e -> { + Assertions.assertInstanceOf(AmqpException.class, e); + final AmqpException ae = (AmqpException) e; + Assertions.assertNotNull(ae.getCause()); + Assertions.assertEquals(dispatchError, ae.getCause()); + }); + + // assert the second call to 'open' replays the same dispatch error. + StepVerifier.create(session.open()).verifyErrorSatisfies(e -> { + Assertions.assertInstanceOf(AmqpException.class, e); + final AmqpException ae = (AmqpException) e; + Assertions.assertNotNull(ae.getCause()); + Assertions.assertEquals(dispatchError, ae.getCause()); + }); + Assertions.assertEquals(1, setup.getDispatchCount()); + } + + @Test + void shouldThrowIfOpenAttemptedAfterDisposal() { + final Session qpidSession = mock(Session.class); + + final Setup setup = Setup.create(qpidSession); + final ProtonSession session = setup.getSession(); + + session.beginClose(null); + StepVerifier.create(session.open()).verifyErrorSatisfies(e -> { + Assertions.assertInstanceOf(ProtonSessionClosedException.class, e); + }); + } + + @Test + void shouldThrowIfChannelRequestedBeforeOpen() { + final Session qpidSession = mock(Session.class); + + final Setup setup = Setup.create(qpidSession); + final ProtonSession session = setup.getSession(); + + StepVerifier.create(session.channel("cbs", Duration.ZERO)).verifyErrorSatisfies(e -> { + Assertions.assertInstanceOf(IllegalStateException.class, e); + }); + } + + @Test + void shouldThrowIfChannelRequestedAfterDisposal() { + final Session qpidSession = mock(Session.class); + + final Setup setup = Setup.create(qpidSession); + final ProtonSession session = setup.getSession(); + + StepVerifier.create(session.open()).verifyComplete(); + session.beginClose(null); + StepVerifier.create(session.channel("cbs", Duration.ZERO)).verifyErrorSatisfies(e -> { + Assertions.assertInstanceOf(ProtonSessionClosedException.class, e); + }); + } + + @Test + void shouldCreateChannel() { + final String channelName = "cbs"; + final String channelSenderName = channelName + ":sender"; + final String channelReceiverName = channelName + ":receiver"; + final Session qpidSession = mock(Session.class); + final ArgumentCaptor captor0 = ArgumentCaptor.forClass(String.class); + final ArgumentCaptor captor1 = ArgumentCaptor.forClass(String.class); + + final Setup setup = Setup.create(qpidSession); + final ProtonSession session = setup.getSession(); + + StepVerifier.create(session.open()).verifyComplete(); + + StepVerifier.create(session.channel(channelName, Duration.ZERO)).expectNextCount(1).verifyComplete(); + + verify(qpidSession).sender(captor0.capture()); + Assertions.assertEquals(channelSenderName, captor0.getValue()); + verify(qpidSession).receiver(captor1.capture()); + Assertions.assertEquals(channelReceiverName, captor1.getValue()); + } + + @Test + void shouldBeginCloseClosesQPidSession() { + final Session qpidSession = mock(Session.class); + doNothing().when(qpidSession).open(); + doNothing().when(qpidSession).close(); + + final Setup setup = Setup.create(qpidSession); + final ProtonSession session = setup.getSession(); + + StepVerifier.create(session.open()).verifyComplete(); + + session.beginClose(null); + + verify(qpidSession, times(1)).close(); + Assertions.assertEquals(1, setup.getDispatchCount()); + } + + private static final class Setup { + private static final String CONNECTION_ID = "contoso-connection-id"; + private static final String NAMESPACE = "contoso.servicebus.windows.net"; + private static final Duration OPEN_TIMEOUT = Duration.ZERO; + private final int[] dispatchCount = new int[1]; + private final ProtonSession protonSession; + + static Setup create(Session qpidSession) { + return new Setup(qpidSession, null); + } + + static Setup create(Session qpidSession, Map dispatchErrors) { + return new Setup(qpidSession, dispatchErrors); + } + + private Setup(Session qpidSession, Map dispatchErrors) { + final ReactorProvider reactorProvider = mock(ReactorProvider.class); + final ReactorHandlerProvider handlerProvider = mock(ReactorHandlerProvider.class); + final ReactorDispatcher reactorDispatcher = mock(ReactorDispatcher.class); + final Connection connection = mock(Connection.class); + final Record record = mock(Record.class); + final String sessionName = "session0"; + try { + doAnswer(invocation -> { + final int callCount = dispatchCount[0]++; + if (dispatchErrors != null && dispatchErrors.containsKey(callCount)) { + throw dispatchErrors.get(callCount); + } else { + final Runnable work = invocation.getArgument(0); + work.run(); + return null; + } + }).when(reactorDispatcher).invoke(any(Runnable.class)); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } + when(reactorProvider.getReactorDispatcher()).thenReturn(reactorDispatcher); + when(handlerProvider.createSessionHandler(anyString(), anyString(), anyString(), any(Duration.class))) + .thenReturn(new SessionHandler(CONNECTION_ID, NAMESPACE, sessionName, reactorDispatcher, OPEN_TIMEOUT, + AmqpMetricsProvider.noop())); + when(connection.session()).thenReturn(qpidSession); + when(qpidSession.attachments()).thenReturn(record); + this.protonSession = new ProtonSession(CONNECTION_ID, NAMESPACE, connection, handlerProvider, + reactorProvider, sessionName, OPEN_TIMEOUT, new ClientLogger(Setup.class)); + } + + ProtonSession getSession() { + return protonSession; + } + + int getDispatchCount() { + return dispatchCount[0]; + } + } +} diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionCacheTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionCacheTest.java index 93acc7dd98711..4b51308f8e569 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionCacheTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionCacheTest.java @@ -433,7 +433,7 @@ private ReactorConnection createMockConnection(String id, Flux en final ReactorConnection connection = new ReactorConnection(id, connectionOptions, reactorProvider, handlerProvider, mock(AmqpLinkProvider.class), mock(TokenManagerProvider.class), - mock(MessageSerializer.class), SenderSettleMode.SETTLED, ReceiverSettleMode.FIRST, true); + mock(MessageSerializer.class), SenderSettleMode.SETTLED, ReceiverSettleMode.FIRST, true, false); return connection; } } diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java index 367abc1ff6e6e..ece28525b2a9e 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java @@ -64,6 +64,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import static com.azure.core.amqp.exception.AmqpErrorCondition.TIMEOUT_ERROR; @@ -172,7 +173,8 @@ void setup() throws IOException { .thenReturn(sessionHandler, sessionHandler2, null); connection = new ReactorConnection(CONNECTION_ID, connectionOptions, reactorProvider, reactorHandlerProvider, - linkProvider, tokenManager, messageSerializer, SenderSettleMode.SETTLED, ReceiverSettleMode.FIRST, true); + linkProvider, tokenManager, messageSerializer, SenderSettleMode.SETTLED, ReceiverSettleMode.FIRST, true, + false); // Setting up onConnectionRemoteOpen. when(connectionEvent.getConnection()).thenReturn(connectionProtonJ); @@ -244,20 +246,23 @@ void createSession() { sessionHandler.onSessionRemoteOpen(sessionEvent); + final AtomicReference sessionEmitted = new AtomicReference<>(null); // Act & Assert StepVerifier.create(connection.createSession(SESSION_NAME)).assertNext(s -> { assertNotNull(s); assertEquals(SESSION_NAME, s.getSessionName()); assertTrue(s instanceof ReactorSession); - assertSame(session, ((ReactorSession) s).session()); + final ReactorSession rs = (ReactorSession) s; + sessionEmitted.set(rs.session()); }).expectComplete().verify(VERIFY_TIMEOUT); - // Assert that the same instance is obtained and we don't get a new session with the same name. + // Assert that the same instance is obtained, and we don't get a new session with the same name. StepVerifier.create(connection.createSession(SESSION_NAME)).assertNext(s -> { assertNotNull(s); assertEquals(SESSION_NAME, s.getSessionName()); assertTrue(s instanceof ReactorSession); - assertSame(session, ((ReactorSession) s).session()); + final ReactorSession rs = (ReactorSession) s; + assertSame(sessionEmitted.get(), rs.session()); // TODO (anu): do better testing in SessionCacheTests. }).expectComplete().verify(VERIFY_TIMEOUT); verify(record).set(Handler.class, Handler.class, sessionHandler); @@ -340,7 +345,8 @@ void createSessionFailureWorksWithRetry() { assertNotNull(s); assertEquals(SESSION_NAME, s.getSessionName()); assertTrue(s instanceof ReactorSession); - assertSame(session2, ((ReactorSession) s).session()); + // TODO (anu): do better testing in SessionCacheTests. + // assertSame(session2, ((ReactorSession) s).session()); }).expectComplete().verify(); verify(record).set(Handler.class, Handler.class, sessionHandler); @@ -500,7 +506,7 @@ void createCBSNodeTimeoutException(boolean isV2) throws IOException { // Act and Assert final ReactorConnection connectionBad = new ReactorConnection(CONNECTION_ID, connectionOptions, reactorProvider, handlerProvider, linkProvider, - tokenManager, messageSerializer, SenderSettleMode.SETTLED, ReceiverSettleMode.FIRST, isV2); + tokenManager, messageSerializer, SenderSettleMode.SETTLED, ReceiverSettleMode.FIRST, isV2, false); StepVerifier.create(connectionBad.getClaimsBasedSecurityNode()).expectErrorSatisfies(error -> { assertTrue(error instanceof AmqpException); @@ -735,7 +741,8 @@ void setsPropertiesUsingCustomEndpoint() throws IOException { .thenReturn(sessionHandler); connection = new ReactorConnection(CONNECTION_ID, connectionOptions, reactorProvider, reactorHandlerProvider, - linkProvider, tokenManager, messageSerializer, SenderSettleMode.SETTLED, ReceiverSettleMode.FIRST, true); + linkProvider, tokenManager, messageSerializer, SenderSettleMode.SETTLED, ReceiverSettleMode.FIRST, true, + false); } @Test @@ -745,7 +752,7 @@ void disposeAsync() throws IOException { final ReactorDispatcher dispatcher = mock(ReactorDispatcher.class); final ReactorConnection connection2 = new ReactorConnection(CONNECTION_ID, connectionOptions, provider, reactorHandlerProvider, linkProvider, - tokenManager, messageSerializer, SenderSettleMode.SETTLED, ReceiverSettleMode.FIRST, true); + tokenManager, messageSerializer, SenderSettleMode.SETTLED, ReceiverSettleMode.FIRST, true, false); final AmqpShutdownSignal signal = new AmqpShutdownSignal(false, false, "Remove"); when(provider.getReactorDispatcher()).thenReturn(dispatcher); @@ -773,7 +780,7 @@ void dispose() throws IOException { final ReactorDispatcher dispatcher = mock(ReactorDispatcher.class); final ReactorConnection connection2 = new ReactorConnection(CONNECTION_ID, connectionOptions, provider, reactorHandlerProvider, linkProvider, - tokenManager, messageSerializer, SenderSettleMode.SETTLED, ReceiverSettleMode.FIRST, true); + tokenManager, messageSerializer, SenderSettleMode.SETTLED, ReceiverSettleMode.FIRST, true, false); when(provider.getReactorDispatcher()).thenReturn(dispatcher); diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionCacheTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionCacheTest.java new file mode 100644 index 0000000000000..331e3cbb3bd17 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionCacheTest.java @@ -0,0 +1,273 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.implementation; + +import com.azure.core.amqp.AmqpEndpointState; +import com.azure.core.util.logging.ClientLogger; +import org.apache.qpid.proton.engine.Connection; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import reactor.test.StepVerifier; + +import java.time.Duration; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public final class ReactorSessionCacheTest { + private static final String CONNECTION_ID = "contoso-connection-id"; + private static final String NAMESPACE = "contoso.servicebus.windows.net"; + private static final Duration OPEN_TIMEOUT = Duration.ZERO; + private final ReactorProvider reactorProvider = new ReactorProvider(); + private final ReactorHandlerProvider handlerProvider = new ReactorHandlerProvider(reactorProvider, null); + private final ClientLogger logger = new ClientLogger(ReactorSessionCacheTest.class); + + private AutoCloseable mocksCloseable; + + @BeforeEach + void session() { + mocksCloseable = MockitoAnnotations.openMocks(this); + } + + @AfterEach + void teardown() throws Exception { + Mockito.framework().clearInlineMock(this); + + if (mocksCloseable != null) { + mocksCloseable.close(); + } + } + + @Test + void shouldCacheSession() { + final ReactorSessionCache cache = createSessionCache(); + final Connection connection = mock(Connection.class); + + final String session0Name = "session-0"; + final ReactorSession session0 = session(session0Name, endpointStatesSink(), null); + + final HashMap> cacheLoaderLookup = new HashMap<>(); + cacheLoaderLookup.put(session0Name, sessions(session0)); + final CacheLoader cacheLoader = new CacheLoader(cacheLoaderLookup); + + StepVerifier.create(cache.getOrLoad(Mono.just(connection), session0Name, cacheLoader)) + .expectNext(session0) + .verifyComplete(); + + // Since the session0 is never terminated (i.e., it's endpointStates never errors or completes), the below + // cache lookup should get the same session0. + StepVerifier.create(cache.getOrLoad(Mono.just(connection), session0Name, cacheLoader)) + .expectNext(session0) + .verifyComplete(); + } + + @Test + void shouldAutoEvictCompletedSession() { + final ReactorSessionCache cache = createSessionCache(); + final Connection connection = mock(Connection.class); + + final String session0Name = "session-0"; + final Sinks.Many session0aEndpointStates = endpointStatesSink(); + final ReactorSession session0a = session(session0Name, session0aEndpointStates, null); + final ReactorSession session0b = session(session0Name, endpointStatesSink(), null); + + final HashMap> cacheLoaderLookup = new HashMap<>(); + cacheLoaderLookup.put(session0Name, sessions(session0a, session0b)); + final CacheLoader cacheLoader = new CacheLoader(cacheLoaderLookup); + + StepVerifier.create(cache.getOrLoad(Mono.just(connection), session0Name, cacheLoader)) + .expectNext(session0a) + .verifyComplete(); + + // Signal that session0a is terminated by completion. + session0aEndpointStates.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST); + + // Since the session0a is completed, the cache should return the new session, session0b. + StepVerifier.create(cache.getOrLoad(Mono.just(connection), session0Name, cacheLoader)) + .expectNext(session0b) + .verifyComplete(); + } + + @Test + void shouldAutoEvictErroredSession() { + final ReactorSessionCache cache = createSessionCache(); + final Connection connection = mock(Connection.class); + + final String session0Name = "session-0"; + final Sinks.Many session0aEndpointStates = endpointStatesSink(); + final ReactorSession session0a = session(session0Name, session0aEndpointStates, null); + final ReactorSession session0b = session(session0Name, endpointStatesSink(), null); + + final HashMap> cacheLoaderLookup = new HashMap<>(); + cacheLoaderLookup.put(session0Name, sessions(session0a, session0b)); + final CacheLoader cacheLoader = new CacheLoader(cacheLoaderLookup); + + StepVerifier.create(cache.getOrLoad(Mono.just(connection), session0Name, cacheLoader)) + .expectNext(session0a) + .verifyComplete(); + + // Signal that session0a is terminated by error. + session0aEndpointStates.emitError(new RuntimeException("session detached"), Sinks.EmitFailureHandler.FAIL_FAST); + + // Since the session0a is errored, the cache should return the new session, session0b. + StepVerifier.create(cache.getOrLoad(Mono.just(connection), session0Name, cacheLoader)) + .expectNext(session0b) + .verifyComplete(); + } + + @Test + void shouldEvictOnSessionOpenError() { + final ReactorSessionCache cache = createSessionCache(); + final Connection connection = mock(Connection.class); + + final String session0Name = "session-0"; + final Sinks.Many session0aEndpointStates = endpointStatesSink(); + final Throwable session0aOpenError = new RuntimeException("session0a open failed"); + final ReactorSession session0a = session(session0Name, session0aEndpointStates, session0aOpenError); + final ReactorSession session0b = session(session0Name, endpointStatesSink(), null); + + final HashMap> cacheLoaderLookup = new HashMap<>(); + cacheLoaderLookup.put(session0Name, sessions(session0a, session0b)); + final CacheLoader cacheLoader = new CacheLoader(cacheLoaderLookup); + + StepVerifier.create(cache.getOrLoad(Mono.just(connection), session0Name, cacheLoader)) + .verifyErrorMatches(e -> e == session0aOpenError); + + // Since the session0a open attempt is errored, the cache should return the new session, session0b. + StepVerifier.create(cache.getOrLoad(Mono.just(connection), session0Name, cacheLoader)) + .expectNext(session0b) + .verifyComplete(); + } + + @Test + void shouldExplicitEvictRemoveSession() { + final ReactorSessionCache cache = createSessionCache(); + final Connection connection = mock(Connection.class); + + final String session0Name = "session-0"; + final Sinks.Many session0aEndpointStates = endpointStatesSink(); + final ReactorSession session0a = session(session0Name, session0aEndpointStates, null); + final ReactorSession session0b = session(session0Name, endpointStatesSink(), null); + + final HashMap> cacheLoaderLookup = new HashMap<>(); + cacheLoaderLookup.put(session0Name, sessions(session0a, session0b)); + final CacheLoader cacheLoader = new CacheLoader(cacheLoaderLookup); + + StepVerifier.create(cache.getOrLoad(Mono.just(connection), session0Name, cacheLoader)) + .expectNext(session0a) + .verifyComplete(); + + // explicitly evict session0a. + Assertions.assertTrue(cache.evict(session0Name)); + + // Since the session0a was evicted, the cache should return the new session, session0b. + StepVerifier.create(cache.getOrLoad(Mono.just(connection), session0Name, cacheLoader)) + .expectNext(session0b) + .verifyComplete(); + } + + @Test + void shouldNotEvictSessionIfOwnerDisposed() { + final ReactorSessionCache cache = createSessionCache(); + final Connection connection = mock(Connection.class); + + final String session0Name = "session-0"; + final Sinks.Many session0EndpointStates = endpointStatesSink(); + final ReactorSession session0 = session(session0Name, endpointStatesSink(), null); + + final HashMap> cacheLoaderLookup = new HashMap<>(); + cacheLoaderLookup.put(session0Name, sessions(session0)); + final CacheLoader cacheLoader = new CacheLoader(cacheLoaderLookup); + + StepVerifier.create(cache.getOrLoad(Mono.just(connection), session0Name, cacheLoader)) + .expectNext(session0) + .verifyComplete(); + + // Signal that the cache owner (Connection) is disposed. + cache.setOwnerDisposed(); + // Signal that session0a is terminated after owner disposal. + session0EndpointStates.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST); + + // Since the owner is disposed, the cache should not evict the session0 even if it's terminated. + StepVerifier.create(cache.getOrLoad(Mono.just(connection), session0Name, cacheLoader)) + .expectNext(session0) + .verifyComplete(); + } + + private ReactorSessionCache createSessionCache() { + return new ReactorSessionCache(CONNECTION_ID, NAMESPACE, handlerProvider, reactorProvider, OPEN_TIMEOUT, + logger); + } + + private Sinks.Many endpointStatesSink() { + return Sinks.many().replay().latestOrDefault(AmqpEndpointState.UNINITIALIZED); + } + + private static Deque sessions(ReactorSession... sessions) { + final Deque queue = new ArrayDeque<>(sessions.length); + Collections.addAll(queue, sessions); + return queue; + } + + private static ReactorSession session(String sessionName, Sinks.Many sink, Throwable openError) { + final ReactorSession session = mock(ReactorSession.class); + when(session.getSessionName()).thenReturn(sessionName); + when(session.getEndpointStates()).thenReturn(sink.asFlux()); + if (openError != null) { + when(session.open()).thenReturn(Mono.error(openError)); + } else { + when(session.open()).thenReturn(Mono.just(session)); + } + when(session.closeAsync(anyString(), any(), eq(true))).thenReturn(Mono.empty()); + return session; + } + + private static final class CacheLoader implements ReactorSessionCache.Loader { + private final HashMap> lookup; + + CacheLoader(HashMap> lookup) { + Objects.requireNonNull(lookup, "'lookup' cannot be null."); + this.lookup = new HashMap<>(lookup.size()); + for (Map.Entry> e : lookup.entrySet()) { + final String name = Objects.requireNonNull(e.getKey(), "'name' cannot be null."); + final Deque sessions + = Objects.requireNonNull(e.getValue(), "'sessions' cannot be null."); + if (sessions.isEmpty()) { + throw new IllegalArgumentException("lookup cannot have empty 'sessions'"); + } + this.lookup.put(name, new ArrayDeque<>(sessions)); + } + } + + @Override + public ReactorSession load(ProtonSessionWrapper protonSession) { + // TODO (anu): When removing v1, use 'ProtonSession' instead of ProtonSessionWrapper. + final String name = protonSession.getName(); + final Deque sessions = lookup.get(name); + if (sessions == null) { + throw new IllegalStateException("lookup has no session mapping defined for the name " + name); + } + final ReactorSession session = sessions.poll(); + if (session == null) { + throw new IllegalStateException("lookup has no more sessions left for the name " + name); + } + return session; + } + } +} diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java index c4a92d8be7ddd..6c3f8f4a132e7 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java @@ -104,8 +104,8 @@ public class ReactorSessionTest { public void setup() throws IOException { mocksCloseable = MockitoAnnotations.openMocks(this); - this.handler = new SessionHandler(ID, HOST, ENTITY_PATH, reactorDispatcher, Duration.ofSeconds(60), - AmqpMetricsProvider.noop()); + this.handler + = new SessionHandler(ID, HOST, NAME, reactorDispatcher, Duration.ofSeconds(60), AmqpMetricsProvider.noop()); this.cbsNodeSupplier = Mono.just(cbsNode); when(reactorProvider.getReactor()).thenReturn(reactor); @@ -124,8 +124,9 @@ public void setup() throws IOException { when(amqpConnection.getShutdownSignals()).thenReturn(connectionShutdown.flux()); final AmqpRetryOptions options = new AmqpRetryOptions().setTryTimeout(TIMEOUT); - this.reactorSession = new ReactorSession(amqpConnection, session, handler, NAME, reactorProvider, - reactorHandlerProvider, new AmqpLinkProvider(), cbsNodeSupplier, tokenManagerProvider, serializer, options); + final ProtonSessionWrapper protonSession = new ProtonSessionWrapper(session, handler, reactorProvider); + this.reactorSession = new ReactorSession(amqpConnection, protonSession, reactorHandlerProvider, + new AmqpLinkProvider(), cbsNodeSupplier, tokenManagerProvider, serializer, options); } @AfterEach @@ -142,7 +143,7 @@ public void verifyConstructor() { // Assert verify(session, times(1)).open(); - Assertions.assertSame(session, reactorSession.session()); + // Assertions.assertSame(session, reactorSession.session()); Assertions.assertEquals(NAME, reactorSession.getSessionName()); Assertions.assertEquals(TIMEOUT, reactorSession.getOperationTimeout()); } @@ -317,7 +318,7 @@ void onSessionRemoteCloseWithErrorReportsMetrics() { when(session.getLocalState()).thenReturn(EndpointState.CLOSED); TestMeter meter = new TestMeter(); - SessionHandler handlerWithMetrics = new SessionHandler(ID, HOST, ENTITY_PATH, reactorDispatcher, + SessionHandler handlerWithMetrics = new SessionHandler(ID, HOST, NAME, reactorDispatcher, Duration.ofSeconds(60), new AmqpMetricsProvider(meter, HOST, ENTITY_PATH)); handlerWithMetrics.onSessionRemoteClose(event); @@ -344,7 +345,7 @@ void onSessionRemoteCloseNoErrorNoMetrics() { when(session.getLocalState()).thenReturn(EndpointState.CLOSED); TestMeter meter = new TestMeter(); - SessionHandler handlerWithMetrics = new SessionHandler(ID, HOST, ENTITY_PATH, reactorDispatcher, + SessionHandler handlerWithMetrics = new SessionHandler(ID, HOST, NAME, reactorDispatcher, Duration.ofSeconds(60), new AmqpMetricsProvider(meter, HOST, null)); handlerWithMetrics.onSessionRemoteClose(event); diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RequestResponseChannelCacheTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RequestResponseChannelCacheTest.java index cfdcc20bacb9b..7e67655184248 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RequestResponseChannelCacheTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RequestResponseChannelCacheTest.java @@ -9,6 +9,8 @@ import com.azure.core.amqp.implementation.handler.DeliverySettleMode; import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler2; import com.azure.core.amqp.implementation.handler.SendLinkHandler; +import com.azure.core.amqp.implementation.ProtonSessionWrapper.ProtonChannelWrapper; +import com.azure.core.amqp.implementation.ProtonSession.ProtonChannel; import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.EndpointState; @@ -470,7 +472,9 @@ void arrange(ReactorConnection connection, String fqdn, String chLinkName, Strin final SenderSettleMode settleMode = SenderSettleMode.SETTLED; final ReceiverSettleMode receiverSettleMode = ReceiverSettleMode.SECOND; - channel = new RequestResponseChannel(connection, connectionId, fqdn, chLinkName, chEntityPath, session, + final ProtonChannelWrapper protonChannel + = new ProtonChannelWrapper(new ProtonChannel(chLinkName, sender, receiver)); + channel = new RequestResponseChannel(connection, connectionId, fqdn, chEntityPath, protonChannel, retryOptions, handlerProvider, reactorProvider, serializer, settleMode, receiverSettleMode, AmqpMetricsProvider.noop(), true); } diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RequestResponseChannelTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RequestResponseChannelTest.java index b4fa7f5a668c9..5f49233de21a9 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RequestResponseChannelTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RequestResponseChannelTest.java @@ -143,6 +143,10 @@ void afterEach() throws Exception { } } + private static ProtonSessionWrapper.ProtonChannelWrapper sessionWrapper(Session session) { + return new ProtonSessionWrapper.ProtonChannelWrapper(LINK_NAME, session); + } + /** * Validate that this gets and sets properties correctly. */ @@ -159,8 +163,8 @@ void getsProperties() { // Act final RequestResponseChannel channel = new RequestResponseChannel(amqpConnection, CONNECTION_ID, NAMESPACE, - LINK_NAME, ENTITY_PATH, session, retryOptions, handlerProvider, reactorProvider, serializer, settleMode, - receiverSettleMode, AmqpMetricsProvider.noop(), isV2); + ENTITY_PATH, sessionWrapper(session), retryOptions, handlerProvider, reactorProvider, serializer, + settleMode, receiverSettleMode, AmqpMetricsProvider.noop(), isV2); final AmqpErrorContext errorContext = channel.getErrorContext(); StepVerifier.create(channel.closeAsync()).then(() -> { @@ -183,7 +187,7 @@ void getsProperties() { void disposeAsync() { // Arrange final RequestResponseChannel channel = new RequestResponseChannel(amqpConnection, CONNECTION_ID, NAMESPACE, - LINK_NAME, ENTITY_PATH, session, retryOptions, handlerProvider, reactorProvider, serializer, + ENTITY_PATH, sessionWrapper(session), retryOptions, handlerProvider, reactorProvider, serializer, SenderSettleMode.SETTLED, ReceiverSettleMode.SECOND, AmqpMetricsProvider.noop(), isV2); receiveEndpoints.next(EndpointState.ACTIVE); @@ -206,7 +210,7 @@ void disposeAsync() { void dispose() throws IOException { // Arrange final RequestResponseChannel channel = new RequestResponseChannel(amqpConnection, CONNECTION_ID, NAMESPACE, - LINK_NAME, ENTITY_PATH, session, retryOptions, handlerProvider, reactorProvider, serializer, + ENTITY_PATH, sessionWrapper(session), retryOptions, handlerProvider, reactorProvider, serializer, SenderSettleMode.SETTLED, ReceiverSettleMode.SECOND, AmqpMetricsProvider.noop(), isV2); receiveEndpoints.next(EndpointState.ACTIVE); @@ -245,7 +249,7 @@ void dispose() throws IOException { void sendNull() { // Arrange final RequestResponseChannel channel = new RequestResponseChannel(amqpConnection, CONNECTION_ID, NAMESPACE, - LINK_NAME, ENTITY_PATH, session, retryOptions, handlerProvider, reactorProvider, serializer, + ENTITY_PATH, sessionWrapper(session), retryOptions, handlerProvider, reactorProvider, serializer, SenderSettleMode.SETTLED, ReceiverSettleMode.SECOND, AmqpMetricsProvider.noop(), isV2); receiveEndpoints.next(EndpointState.ACTIVE); @@ -262,7 +266,7 @@ void sendNull() { void sendReplyToSet() { // Arrange final RequestResponseChannel channel = new RequestResponseChannel(amqpConnection, CONNECTION_ID, NAMESPACE, - LINK_NAME, ENTITY_PATH, session, retryOptions, handlerProvider, reactorProvider, serializer, + ENTITY_PATH, sessionWrapper(session), retryOptions, handlerProvider, reactorProvider, serializer, SenderSettleMode.SETTLED, ReceiverSettleMode.SECOND, AmqpMetricsProvider.noop(), isV2); final Message message = mock(Message.class); when(message.getReplyTo()).thenReturn("test-reply-to"); @@ -280,7 +284,7 @@ void sendReplyToSet() { void sendMessageIdSet() { // Arrange final RequestResponseChannel channel = new RequestResponseChannel(amqpConnection, CONNECTION_ID, NAMESPACE, - LINK_NAME, ENTITY_PATH, session, retryOptions, handlerProvider, reactorProvider, serializer, + ENTITY_PATH, sessionWrapper(session), retryOptions, handlerProvider, reactorProvider, serializer, SenderSettleMode.SETTLED, ReceiverSettleMode.SECOND, AmqpMetricsProvider.noop(), isV2); final Message message = mock(Message.class); when(message.getMessageId()).thenReturn(10L); @@ -380,7 +384,7 @@ void sendMessageWithTransaction() { 101, 100 }; final RequestResponseChannel channel = new RequestResponseChannel(amqpConnection, CONNECTION_ID, NAMESPACE, - LINK_NAME, ENTITY_PATH, session, retryOptions, handlerProvider, reactorProvider, serializer, + ENTITY_PATH, sessionWrapper(session), retryOptions, handlerProvider, reactorProvider, serializer, SenderSettleMode.SETTLED, ReceiverSettleMode.SECOND, AmqpMetricsProvider.noop(), isV2); final UnsignedLong messageId = UnsignedLong.valueOf(1); final Message message = mock(Message.class); @@ -508,7 +512,7 @@ void sendMessage() { 101, 100 }; final RequestResponseChannel channel = new RequestResponseChannel(amqpConnection, CONNECTION_ID, NAMESPACE, - LINK_NAME, ENTITY_PATH, session, retryOptions, handlerProvider, reactorProvider, serializer, + ENTITY_PATH, sessionWrapper(session), retryOptions, handlerProvider, reactorProvider, serializer, SenderSettleMode.SETTLED, ReceiverSettleMode.SECOND, AmqpMetricsProvider.noop(), isV2); final UnsignedLong messageId = UnsignedLong.valueOf(1); final Message message = mock(Message.class); @@ -636,7 +640,7 @@ void sendMessageWithMetrics() { TestMeter meter = new TestMeter(); final RequestResponseChannel channel - = new RequestResponseChannel(amqpConnection, CONNECTION_ID, NAMESPACE, LINK_NAME, ENTITY_PATH, session, + = new RequestResponseChannel(amqpConnection, CONNECTION_ID, NAMESPACE, ENTITY_PATH, sessionWrapper(session), retryOptions, handlerProvider, reactorProvider, serializer, SenderSettleMode.SETTLED, ReceiverSettleMode.SECOND, new AmqpMetricsProvider(meter, NAMESPACE, ENTITY_PATH), isV2); @@ -694,7 +698,7 @@ void sendMessageSendErrorWithMetrics() { // Arrange TestMeter meter = new TestMeter(); final RequestResponseChannel channel - = new RequestResponseChannel(amqpConnection, CONNECTION_ID, NAMESPACE, LINK_NAME, ENTITY_PATH, session, + = new RequestResponseChannel(amqpConnection, CONNECTION_ID, NAMESPACE, ENTITY_PATH, sessionWrapper(session), retryOptions, handlerProvider, reactorProvider, serializer, SenderSettleMode.SETTLED, ReceiverSettleMode.SECOND, new AmqpMetricsProvider(meter, NAMESPACE, ENTITY_PATH), isV2); @@ -732,7 +736,7 @@ void sendMessageEndpointErrorWithMetrics() { // Arrange TestMeter meter = new TestMeter(); final RequestResponseChannel channel - = new RequestResponseChannel(amqpConnection, CONNECTION_ID, NAMESPACE, LINK_NAME, ENTITY_PATH, session, + = new RequestResponseChannel(amqpConnection, CONNECTION_ID, NAMESPACE, ENTITY_PATH, sessionWrapper(session), retryOptions, handlerProvider, reactorProvider, serializer, SenderSettleMode.SETTLED, ReceiverSettleMode.SECOND, new AmqpMetricsProvider(meter, NAMESPACE, ENTITY_PATH), isV2); @@ -775,7 +779,7 @@ void sendMessageEndpointErrorWithMetrics() { void clearMessagesOnError() { // Arrange final RequestResponseChannel channel = new RequestResponseChannel(amqpConnection, CONNECTION_ID, NAMESPACE, - LINK_NAME, ENTITY_PATH, session, retryOptions, handlerProvider, reactorProvider, serializer, + ENTITY_PATH, sessionWrapper(session), retryOptions, handlerProvider, reactorProvider, serializer, SenderSettleMode.SETTLED, ReceiverSettleMode.SECOND, AmqpMetricsProvider.noop(), isV2); final AmqpException error = new AmqpException(true, "Message", new AmqpErrorContext("some-context")); final Message message = mock(Message.class); @@ -802,7 +806,7 @@ void clearMessagesOnError() { void parentDisposesConnection() { // Arrange final RequestResponseChannel channel = new RequestResponseChannel(amqpConnection, CONNECTION_ID, NAMESPACE, - LINK_NAME, ENTITY_PATH, session, retryOptions, handlerProvider, reactorProvider, serializer, + ENTITY_PATH, sessionWrapper(session), retryOptions, handlerProvider, reactorProvider, serializer, SenderSettleMode.SETTLED, ReceiverSettleMode.SECOND, AmqpMetricsProvider.noop(), isV2); final AmqpShutdownSignal shutdownSignal = new AmqpShutdownSignal(false, false, "Test-shutdown-signal"); @@ -842,7 +846,7 @@ public void closeAsyncTimeout() { // Arrange final AmqpRetryOptions retry = new AmqpRetryOptions().setTryTimeout(Duration.ofSeconds(1)).setMaxRetries(0); final RequestResponseChannel channel = new RequestResponseChannel(amqpConnection, CONNECTION_ID, NAMESPACE, - LINK_NAME, ENTITY_PATH, session, retry, handlerProvider, reactorProvider, serializer, + ENTITY_PATH, sessionWrapper(session), retry, handlerProvider, reactorProvider, serializer, SenderSettleMode.SETTLED, ReceiverSettleMode.SECOND, AmqpMetricsProvider.noop(), isV2); // Act & Assert @@ -868,7 +872,7 @@ public void closeAsync() { // Arrange final AmqpRetryOptions retry = new AmqpRetryOptions().setTryTimeout(Duration.ofSeconds(1)).setMaxRetries(0); final RequestResponseChannel channel = new RequestResponseChannel(amqpConnection, CONNECTION_ID, NAMESPACE, - LINK_NAME, ENTITY_PATH, session, retry, handlerProvider, reactorProvider, serializer, + ENTITY_PATH, sessionWrapper(session), retry, handlerProvider, reactorProvider, serializer, SenderSettleMode.SETTLED, ReceiverSettleMode.SECOND, AmqpMetricsProvider.noop(), isV2); sendEndpoints.next(EndpointState.ACTIVE); diff --git a/sdk/eventhubs/azure-messaging-eventhubs-track2-perf/src/main/java/com/azure/messaging/eventhubs/perf/ReactorReceiveEventsTest.java b/sdk/eventhubs/azure-messaging-eventhubs-track2-perf/src/main/java/com/azure/messaging/eventhubs/perf/ReactorReceiveEventsTest.java index 49a82961dc8fe..8af60cf42c311 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs-track2-perf/src/main/java/com/azure/messaging/eventhubs/perf/ReactorReceiveEventsTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs-track2-perf/src/main/java/com/azure/messaging/eventhubs/perf/ReactorReceiveEventsTest.java @@ -95,7 +95,7 @@ public Mono setupAsync() { final PerfMessageSerializer messageSerializer = new PerfMessageSerializer(); connection = new ReactorConnection(connectionId, connectionOptions, provider, handlerProvider, linkProvider, tokenManagerProvider, - messageSerializer, SenderSettleMode.SETTLED, ReceiverSettleMode.SECOND, false); + messageSerializer, SenderSettleMode.SETTLED, ReceiverSettleMode.SECOND, false, false); return Mono.empty(); } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/pom.xml b/sdk/eventhubs/azure-messaging-eventhubs/pom.xml index 069efa026ac6c..6890593de003a 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/pom.xml +++ b/sdk/eventhubs/azure-messaging-eventhubs/pom.xml @@ -44,12 +44,12 @@ com.azure azure-core - 1.49.0 + 1.50.0-beta.1 com.azure azure-core-amqp - 2.9.4 + 2.10.0-beta.1 diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubReactorAmqpConnection.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubReactorAmqpConnection.java index 2bfb90fc29cae..30bf4f066a3e0 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubReactorAmqpConnection.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubReactorAmqpConnection.java @@ -5,18 +5,18 @@ import com.azure.core.amqp.AmqpRetryOptions; import com.azure.core.amqp.AmqpRetryPolicy; -import com.azure.core.amqp.AmqpSession; import com.azure.core.amqp.implementation.AmqpLinkProvider; import com.azure.core.amqp.implementation.AmqpReceiveLink; import com.azure.core.amqp.implementation.AmqpSendLink; import com.azure.core.amqp.implementation.ConnectionOptions; import com.azure.core.amqp.implementation.MessageSerializer; +import com.azure.core.amqp.implementation.ProtonSessionWrapper; import com.azure.core.amqp.implementation.ReactorConnection; import com.azure.core.amqp.implementation.ReactorHandlerProvider; import com.azure.core.amqp.implementation.ReactorProvider; +import com.azure.core.amqp.implementation.ReactorSession; import com.azure.core.amqp.implementation.RetryUtil; import com.azure.core.amqp.implementation.TokenManagerProvider; -import com.azure.core.amqp.implementation.handler.SessionHandler; import com.azure.core.credential.TokenCredential; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.eventhubs.models.EventPosition; @@ -24,7 +24,6 @@ import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.BaseHandler; -import org.apache.qpid.proton.engine.Session; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; @@ -72,7 +71,7 @@ public EventHubReactorAmqpConnection(String connectionId, ConnectionOptions conn ReactorProvider reactorProvider, ReactorHandlerProvider handlerProvider, AmqpLinkProvider linkProvider, TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer) { super(connectionId, connectionOptions, reactorProvider, handlerProvider, linkProvider, tokenManagerProvider, - messageSerializer, SenderSettleMode.SETTLED, ReceiverSettleMode.SECOND, true); + messageSerializer, SenderSettleMode.SETTLED, ReceiverSettleMode.SECOND, false, false); this.connectionId = connectionId; this.reactorProvider = reactorProvider; this.handlerProvider = handlerProvider; @@ -163,9 +162,9 @@ public void dispose() { } @Override - protected AmqpSession createSession(String sessionName, Session session, SessionHandler handler) { - return new EventHubReactorSession(this, session, handler, sessionName, reactorProvider, - handlerProvider, linkProvider, getClaimsBasedSecurityNode(), tokenManagerProvider, retryOptions, messageSerializer); + protected ReactorSession createSession(ProtonSessionWrapper session) { + return new EventHubReactorSession(this, session, handlerProvider, linkProvider, + getClaimsBasedSecurityNode(), tokenManagerProvider, retryOptions, messageSerializer); } private synchronized ManagementChannel getOrCreateManagementChannel() { diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubReactorSession.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubReactorSession.java index cb2af9101227d..8397514591806 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubReactorSession.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubReactorSession.java @@ -13,12 +13,11 @@ import com.azure.core.amqp.implementation.AmqpSendLink; import com.azure.core.amqp.implementation.ConsumerFactory; import com.azure.core.amqp.implementation.MessageSerializer; +import com.azure.core.amqp.implementation.ProtonSessionWrapper; import com.azure.core.amqp.implementation.ReactorHandlerProvider; -import com.azure.core.amqp.implementation.ReactorProvider; import com.azure.core.amqp.implementation.ReactorSession; import com.azure.core.amqp.implementation.TokenManager; import com.azure.core.amqp.implementation.TokenManagerProvider; -import com.azure.core.amqp.implementation.handler.SessionHandler; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.eventhubs.models.EventPosition; import com.azure.messaging.eventhubs.models.ReceiveOptions; @@ -26,7 +25,6 @@ import org.apache.qpid.proton.amqp.UnknownDescribedType; import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; -import org.apache.qpid.proton.engine.Session; import reactor.core.publisher.Mono; import java.time.Duration; @@ -56,9 +54,6 @@ class EventHubReactorSession extends ReactorSession implements EventHubSession { * Creates a new AMQP session using proton-j. * * @param session Proton-j session for this AMQP session. - * @param sessionHandler Handler for events that occur in the session. - * @param sessionName Name of the session. - * @param provider Provides reactor instances for messages to sent with. * @param handlerProvider Providers reactor handlers for listening to proton-j reactor events. * @param linkProvider Provides amqp links for send and receive. * @param cbsNodeSupplier Mono that returns a reference to the {@link ClaimsBasedSecurityNode}. @@ -67,12 +62,12 @@ class EventHubReactorSession extends ReactorSession implements EventHubSession { * @param retryOptions to be used for this session. * @param messageSerializer to be used. */ - EventHubReactorSession(AmqpConnection amqpConnection, Session session, SessionHandler sessionHandler, - String sessionName, ReactorProvider provider, ReactorHandlerProvider handlerProvider, AmqpLinkProvider linkProvider, + EventHubReactorSession(AmqpConnection amqpConnection, ProtonSessionWrapper session, + ReactorHandlerProvider handlerProvider, AmqpLinkProvider linkProvider, Mono cbsNodeSupplier, TokenManagerProvider tokenManagerProvider, AmqpRetryOptions retryOptions, MessageSerializer messageSerializer) { - super(amqpConnection, session, sessionHandler, sessionName, provider, handlerProvider, linkProvider, cbsNodeSupplier, - tokenManagerProvider, messageSerializer, retryOptions); + super(amqpConnection, session, handlerProvider, linkProvider, cbsNodeSupplier, tokenManagerProvider, + messageSerializer, retryOptions); } @Override diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/CBSChannelTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/CBSChannelTest.java index b22d7a11fc416..c16930fade88d 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/CBSChannelTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/CBSChannelTest.java @@ -169,7 +169,7 @@ private TestReactorConnection(String connectionId, ConnectionOptions connectionO ReactorProvider reactorProvider, ReactorHandlerProvider handlerProvider, AmqpLinkProvider linkProvider, TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer) { super(connectionId, connectionOptions, reactorProvider, handlerProvider, linkProvider, tokenManagerProvider, - messageSerializer, SenderSettleMode.SETTLED, ReceiverSettleMode.SECOND, true); + messageSerializer, SenderSettleMode.SETTLED, ReceiverSettleMode.SECOND, false, false); } private Mono getCBSChannel(String linkName) { diff --git a/sdk/servicebus/azure-messaging-servicebus/pom.xml b/sdk/servicebus/azure-messaging-servicebus/pom.xml index 30989b3ba029f..5767f1f281390 100644 --- a/sdk/servicebus/azure-messaging-servicebus/pom.xml +++ b/sdk/servicebus/azure-messaging-servicebus/pom.xml @@ -57,7 +57,7 @@ com.azure azure-core - 1.49.0 + 1.50.0-beta.1 com.azure @@ -67,7 +67,7 @@ com.azure azure-core-amqp - 2.9.4 + 2.10.0-beta.1 com.azure diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java index c5d00055f999a..38a899e9bc91f 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java @@ -970,9 +970,10 @@ private ServiceBusConnectionProcessor getOrCreateConnectionProcessor(MessageSeri // For the V1-Stack, tell the connection to continue creating receivers on v1 stack. final boolean isV2 = false; + final boolean useSessionChannelCache = false; return (ServiceBusAmqpConnection) new ServiceBusReactorAmqpConnection(connectionId, connectionOptions, provider, handlerProvider, linkProvider, tokenManagerProvider, serializer, - crossEntityTransactions, isV2); + crossEntityTransactions, isV2, useSessionChannelCache); }).repeat(); sharedConnection = connectionFlux.subscribeWith(new ServiceBusConnectionProcessor( @@ -1037,8 +1038,8 @@ private ConnectionOptions getConnectionOptions() { } // Connection-caching for the V2-Stack. - private ReactorConnectionCache getOrCreateConnectionCache(MessageSerializer serializer, Meter meter) { - return v2StackSupport.getOrCreateConnectionCache(getConnectionOptions(), serializer, crossEntityTransactions, meter); + private ReactorConnectionCache getOrCreateConnectionCache(MessageSerializer serializer, Meter meter, boolean useSessionChannelCache) { + return v2StackSupport.getOrCreateConnectionCache(getConnectionOptions(), serializer, crossEntityTransactions, meter, useSessionChannelCache); } private static boolean isNullOrEmpty(String item) { @@ -1177,6 +1178,14 @@ private static final class V2StackSupport { .build(); private final AtomicReference sessionSyncReceiveFlag = new AtomicReference<>(); + private static final String SESSION_CHANNEL_CACHE_KEY = "com.azure.core.amqp.internal.session-channel-cache.v2"; + private static final ConfigurationProperty SESSION_CHANNEL_CACHE_PROPERTY = ConfigurationPropertyBuilder.ofBoolean(SESSION_CHANNEL_CACHE_KEY) + .environmentVariableName(SESSION_CHANNEL_CACHE_KEY) + .defaultValue(false) // "SessionCache" and "RequestResponseChannelCache" requires explicit opt in along with v2 stack opt in. + .shared(true) + .build(); + private final AtomicReference sessionChannelCacheFlag = new AtomicReference<>(); + private final Object connectionLock = new Object(); private ReactorConnectionCache sharedConnectionCache; private final AtomicInteger openClients = new AtomicInteger(); @@ -1242,12 +1251,22 @@ boolean isSessionSyncReceiveEnabled(Configuration configuration) { return isOptedIn(configuration, SESSION_SYNC_RECEIVE_PROPERTY, sessionSyncReceiveFlag); } + /** + * SessionCache and RequestResponseChannelCache not opted-in default, but the application may opt in. + * + * @param configuration the client configuration. + * @return true if SessionCache and RequestResponseChannelCache is opted-in. + */ + boolean isSessionChannelCacheEnabled(Configuration configuration) { + return isOptedIn(configuration, SESSION_CHANNEL_CACHE_PROPERTY, sessionChannelCacheFlag); + } + // Obtain the shared connection-cache based on the V2-Stack. ReactorConnectionCache getOrCreateConnectionCache(ConnectionOptions connectionOptions, - MessageSerializer serializer, boolean crossEntityTransactions, Meter meter) { + MessageSerializer serializer, boolean crossEntityTransactions, Meter meter, boolean useSessionChannelCache) { synchronized (connectionLock) { if (sharedConnectionCache == null) { - sharedConnectionCache = createConnectionCache(connectionOptions, serializer, crossEntityTransactions, meter); + sharedConnectionCache = createConnectionCache(connectionOptions, serializer, crossEntityTransactions, meter, useSessionChannelCache); } } @@ -1344,8 +1363,9 @@ private boolean isOptedIn(Configuration configuration, ConfigurationProperty createConnectionCache(ConnectionOptions connectionOptions, - MessageSerializer serializer, boolean crossEntityTransactions, Meter meter) { + MessageSerializer serializer, boolean crossEntityTransactions, Meter meter, boolean useSessionChannelCache) { final Supplier connectionSupplier = () -> { final String connectionId = StringUtil.getRandomString("MF"); final ReactorProvider provider = new ReactorProvider(); @@ -1358,7 +1378,7 @@ private static ReactorConnectionCache createCon //For the v2 stack, tell the connection to create receivers using the v2 stack. final boolean isV2 = true; return new ServiceBusReactorAmqpConnection(connectionId, connectionOptions, provider, handlerProvider, - linkProvider, tokenManagerProvider, serializer, crossEntityTransactions, isV2); + linkProvider, tokenManagerProvider, serializer, crossEntityTransactions, isV2, useSessionChannelCache); }; final String fullyQualifiedNamespace = connectionOptions.getFullyQualifiedNamespace(); @@ -1428,7 +1448,8 @@ public ServiceBusSenderAsyncClient buildAsyncClient() { final Meter meter = createMeter(clientOptions); if (isSenderOnV2) { // Sender Client (async|sync) on the V2-Stack. - connectionCacheWrapper = new ConnectionCacheWrapper(getOrCreateConnectionCache(messageSerializer, meter)); + final boolean useSessionChannelCache = v2StackSupport.isSessionChannelCacheEnabled(configuration); + connectionCacheWrapper = new ConnectionCacheWrapper(getOrCreateConnectionCache(messageSerializer, meter, useSessionChannelCache)); onClientClose = ServiceBusClientBuilder.this.v2StackSupport::onClientClose; } else { connectionCacheWrapper = new ConnectionCacheWrapper(getOrCreateConnectionProcessor(messageSerializer, meter)); @@ -2044,7 +2065,8 @@ SessionsMessagePump buildPumpForProcessor(ClientLogger logger, clientIdentifier = UUID.randomUUID().toString(); } final Meter meter = createMeter(clientOptions); - final ConnectionCacheWrapper connectionCacheWrapper = new ConnectionCacheWrapper(getOrCreateConnectionCache(messageSerializer, meter)); + final boolean useSessionChannelCache = v2StackSupport.isSessionChannelCacheEnabled(configuration); + final ConnectionCacheWrapper connectionCacheWrapper = new ConnectionCacheWrapper(getOrCreateConnectionCache(messageSerializer, meter, useSessionChannelCache)); final ServiceBusSessionAcquirer sessionAcquirer = new ServiceBusSessionAcquirer(logger, clientIdentifier, entityPath, entityType, receiveMode, retryOptions.getTryTimeout(), connectionCacheWrapper); @@ -2077,7 +2099,7 @@ SessionsMessagePump buildPumpForProcessor(ClientLogger logger, */ public ServiceBusSessionReceiverAsyncClient buildAsyncClient() { final boolean isSessionReactorReceiveOnV2 = v2StackSupport.isSessionReactorAsyncReceiveEnabled(configuration); - return buildAsyncClient(true, false, isSessionReactorReceiveOnV2); + return buildAsyncClient(true, isSessionReactorReceiveOnV2); } /** @@ -2096,13 +2118,13 @@ public ServiceBusSessionReceiverAsyncClient buildAsyncClient() { public ServiceBusSessionReceiverClient buildClient() { final boolean isSessionSyncReceiveOnV2 = v2StackSupport.isSessionSyncReceiveEnabled(configuration); final boolean isPrefetchDisabled = prefetchCount == 0; - return new ServiceBusSessionReceiverClient(buildAsyncClient(false, true, isSessionSyncReceiveOnV2), + return new ServiceBusSessionReceiverClient(buildAsyncClient(false, isSessionSyncReceiveOnV2), isPrefetchDisabled, MessageUtils.getTotalTimeout(retryOptions)); } // Common function to build Session-Enabled Receiver-Client - For Async[Reactor]Client Or to back SyncClient. - private ServiceBusSessionReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed, boolean syncConsumer, boolean isV2) { + private ServiceBusSessionReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed, boolean isV2) { final MessagingEntityType entityType = validateEntityPaths(connectionStringEntityName, topicName, queueName); final String entityPath = getEntityPath(entityType, queueName, topicName, subscriptionName, @@ -2125,7 +2147,8 @@ private ServiceBusSessionReceiverAsyncClient buildAsyncClient(boolean isAutoComp final ConnectionCacheWrapper connectionCacheWrapper; final Runnable onClientClose; if (isV2) { - connectionCacheWrapper = new ConnectionCacheWrapper(getOrCreateConnectionCache(messageSerializer, meter)); + final boolean useSessionChannelCache = v2StackSupport.isSessionChannelCacheEnabled(configuration); + connectionCacheWrapper = new ConnectionCacheWrapper(getOrCreateConnectionCache(messageSerializer, meter, useSessionChannelCache)); onClientClose = ServiceBusClientBuilder.this.v2StackSupport::onClientClose; } else { connectionCacheWrapper = new ConnectionCacheWrapper(getOrCreateConnectionProcessor(messageSerializer, meter)); @@ -2676,7 +2699,8 @@ ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed, Re final boolean syncReceiveOnV2 = v2StackSupport.isNonSessionSyncReceiveEnabled(configuration); if (syncReceiveOnV2) { // "Non-Session" Sync Receiver-Client on the V2-Stack. - connectionCacheWrapper = new ConnectionCacheWrapper(getOrCreateConnectionCache(messageSerializer, meter)); + final boolean useSessionChannelCache = v2StackSupport.isSessionChannelCacheEnabled(configuration); + connectionCacheWrapper = new ConnectionCacheWrapper(getOrCreateConnectionCache(messageSerializer, meter, useSessionChannelCache)); onClientClose = ServiceBusClientBuilder.this.v2StackSupport::onClientClose; } else { connectionCacheWrapper = new ConnectionCacheWrapper(getOrCreateConnectionProcessor(messageSerializer, meter)); @@ -2686,7 +2710,8 @@ ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed, Re final boolean asyncReceiveOnV2 = v2StackSupport.isNonSessionAsyncReceiveEnabled(configuration); if (asyncReceiveOnV2) { // "Non-Session" Async[Reactor|Processor] Receiver-Client on the V2-Stack. - connectionCacheWrapper = new ConnectionCacheWrapper(getOrCreateConnectionCache(messageSerializer, meter)); + final boolean useSessionChannelCache = v2StackSupport.isSessionChannelCacheEnabled(configuration); + connectionCacheWrapper = new ConnectionCacheWrapper(getOrCreateConnectionCache(messageSerializer, meter, useSessionChannelCache)); onClientClose = ServiceBusClientBuilder.this.v2StackSupport::onClientClose; } else { connectionCacheWrapper = new ConnectionCacheWrapper(getOrCreateConnectionProcessor(messageSerializer, meter)); @@ -2771,7 +2796,8 @@ public ServiceBusRuleManagerAsyncClient buildAsyncClient() { final boolean isManageRulesOnV2 = v2StackSupport.isSenderAndManageRulesEnabled(configuration); if (isManageRulesOnV2) { // RuleManager Client (async|sync) on the V2-Stack. - connectionCacheWrapper = new ConnectionCacheWrapper(getOrCreateConnectionCache(messageSerializer, meter)); + final boolean useSessionChannelCache = v2StackSupport.isSessionChannelCacheEnabled(configuration); + connectionCacheWrapper = new ConnectionCacheWrapper(getOrCreateConnectionCache(messageSerializer, meter, useSessionChannelCache)); onClientClose = ServiceBusClientBuilder.this.v2StackSupport::onClientClose; } else { connectionCacheWrapper = new ConnectionCacheWrapper(getOrCreateConnectionProcessor(messageSerializer, meter)); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java index 8c3dacc16198c..9cf216d1e5121 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java @@ -1735,10 +1735,13 @@ private ServiceBusAsyncConsumer getOrCreateConsumer() { final Mono retryableReceiveLinkMono = RetryUtil.withRetry(receiveLinkMono.onErrorMap( RequestResponseChannelClosedException.class, e -> { - // When the current connection is being disposed, the connectionProcessor can produce - // a new connection if downstream request. - // In this context, treat RequestResponseChannelClosedException from the RequestResponseChannel scoped - // to the current connection being disposed as retry-able so that retry can obtain new connection. + // When the current connection is being disposed, the V1 ConnectionProcessor or V2 ReactorConnectionCache + // can produce a new connection if downstream request. In this context, treat + // RequestResponseChannelClosedException error from the following two sources as retry-able so that + // retry can obtain a new connection - + // 1. error from the RequestResponseChannel scoped to the current connection being disposed, + // 2. error from the V2 RequestResponseChannelCache scoped to the current connection being disposed. + // return new AmqpException(true, e.getMessage(), e, null); }), connectionCacheWrapper.getRetryOptions(), diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java index cc893267fe52d..e46b398bbcdde 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java @@ -856,10 +856,13 @@ private Mono sendInternal(ServiceBusMessageBatch batch, ServiceBusTransact } }).onErrorMap(RequestResponseChannelClosedException.class, e -> { - // When the current connection is being disposed, the connectionProcessor can produce a new connection - // if downstream request. In this context, treat RequestResponseChannelClosedException from - // the RequestResponseChannel scoped to the current connection being disposed as retry-able so that retry - // can obtain new connection. + // When the current connection is being disposed, the V1 ConnectionProcessor or V2 ReactorConnectionCache + // can produce a new connection if downstream request. In this context, treat + // RequestResponseChannelClosedException error from the following two sources as retry-able so that + // retry can obtain a new connection - + // 1. error from the RequestResponseChannel scoped to the current connection being disposed, + // 2. error from the V2 RequestResponseChannelCache scoped to the current connection being disposed. + // return new AmqpException(true, e.getMessage(), e, null); }); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorAmqpConnection.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorAmqpConnection.java index 8c800c80203c3..007fef1573b64 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorAmqpConnection.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorAmqpConnection.java @@ -10,22 +10,21 @@ import com.azure.core.amqp.implementation.AzureTokenManagerProvider; import com.azure.core.amqp.implementation.ConnectionOptions; import com.azure.core.amqp.implementation.MessageSerializer; +import com.azure.core.amqp.implementation.ProtonSessionWrapper; import com.azure.core.amqp.implementation.ReactorConnection; import com.azure.core.amqp.implementation.ReactorHandlerProvider; import com.azure.core.amqp.implementation.ReactorProvider; +import com.azure.core.amqp.implementation.ReactorSession; import com.azure.core.amqp.implementation.RetryUtil; import com.azure.core.amqp.implementation.TokenManager; import com.azure.core.amqp.implementation.TokenManagerProvider; -import com.azure.core.amqp.implementation.handler.SessionHandler; import com.azure.core.amqp.models.CbsAuthorizationType; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.BaseHandler; -import org.apache.qpid.proton.engine.Session; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Scheduler; import java.util.concurrent.ConcurrentHashMap; @@ -46,13 +45,11 @@ public class ServiceBusReactorAmqpConnection extends ReactorConnection implement private static final ClientLogger LOGGER = new ClientLogger(ServiceBusReactorAmqpConnection.class); private final ConcurrentHashMap managementNodes = new ConcurrentHashMap<>(); private final String connectionId; - private final ReactorProvider reactorProvider; private final ReactorHandlerProvider handlerProvider; private final ServiceBusAmqpLinkProvider linkProvider; private final TokenManagerProvider tokenManagerProvider; private final AmqpRetryOptions retryOptions; private final MessageSerializer messageSerializer; - private final Scheduler scheduler; private final String fullyQualifiedNamespace; private final CbsAuthorizationType authorizationType; private final boolean distributedTransactionsSupport; @@ -71,23 +68,23 @@ public class ServiceBusReactorAmqpConnection extends ReactorConnection implement * @param distributedTransactionsSupport indicate if distributed transaction across different entities is required * for this connection. * @param isV2 (temporary) flag to use either v1 or v2 receiver. + * @param useSessionChannelCache indicates if ReactorSessionCache and RequestResponseChannelCache should be used + * when in v2 mode. */ public ServiceBusReactorAmqpConnection(String connectionId, ConnectionOptions connectionOptions, ReactorProvider reactorProvider, ReactorHandlerProvider handlerProvider, ServiceBusAmqpLinkProvider linkProvider, TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer, - boolean distributedTransactionsSupport, boolean isV2) { + boolean distributedTransactionsSupport, boolean isV2, boolean useSessionChannelCache) { super(connectionId, connectionOptions, reactorProvider, handlerProvider, linkProvider, tokenManagerProvider, - messageSerializer, SenderSettleMode.SETTLED, ReceiverSettleMode.FIRST, isV2); + messageSerializer, SenderSettleMode.SETTLED, ReceiverSettleMode.FIRST, isV2, useSessionChannelCache); this.connectionId = connectionId; - this.reactorProvider = reactorProvider; this.handlerProvider = handlerProvider; this.linkProvider = linkProvider; this.tokenManagerProvider = tokenManagerProvider; this.authorizationType = connectionOptions.getAuthorizationType(); this.retryOptions = connectionOptions.getRetry(); this.messageSerializer = messageSerializer; - this.scheduler = connectionOptions.getScheduler(); this.fullyQualifiedNamespace = connectionOptions.getFullyQualifiedNamespace(); this.distributedTransactionsSupport = distributedTransactionsSupport; this.isV2 = isV2; @@ -225,9 +222,9 @@ public Mono createReceiveLink(String linkName, String ent } @Override - protected AmqpSession createSession(String sessionName, Session session, SessionHandler handler) { - return new ServiceBusReactorSession(this, session, handler, sessionName, reactorProvider, - handlerProvider, linkProvider, getClaimsBasedSecurityNode(), tokenManagerProvider, messageSerializer, retryOptions, + protected ReactorSession createSession(ProtonSessionWrapper session) { + return new ServiceBusReactorSession(this, session, handlerProvider, linkProvider, getClaimsBasedSecurityNode(), + tokenManagerProvider, messageSerializer, retryOptions, new ServiceBusCreateSessionOptions(distributedTransactionsSupport), isV2); } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSession.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSession.java index ceedba564d4df..29c2828054010 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSession.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSession.java @@ -11,14 +11,13 @@ import com.azure.core.amqp.implementation.AmqpConstants; import com.azure.core.amqp.implementation.ConsumerFactory; import com.azure.core.amqp.implementation.MessageSerializer; +import com.azure.core.amqp.implementation.ProtonSessionWrapper; import com.azure.core.amqp.implementation.ReactorHandlerProvider; -import com.azure.core.amqp.implementation.ReactorProvider; import com.azure.core.amqp.implementation.ReactorSession; import com.azure.core.amqp.implementation.RetryUtil; import com.azure.core.amqp.implementation.TokenManager; import com.azure.core.amqp.implementation.TokenManagerProvider; import com.azure.core.amqp.implementation.handler.DeliverySettleMode; -import com.azure.core.amqp.implementation.handler.SessionHandler; import com.azure.core.util.CoreUtils; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; @@ -26,7 +25,6 @@ import org.apache.qpid.proton.amqp.UnsignedInteger; import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; -import org.apache.qpid.proton.engine.Session; import reactor.core.publisher.Mono; import java.time.Duration; @@ -66,9 +64,6 @@ class ServiceBusReactorSession extends ReactorSession implements ServiceBusSessi * Creates a new AMQP session using proton-j. * * @param session Proton-j session for this AMQP session. - * @param sessionHandler Handler for events that occur in the session. - * @param sessionName Name of the session. - * @param provider Provides reactor instances for messages to sent with. * @param handlerProvider Providers reactor handlers for listening to proton-j reactor events. * @param linkProvider Provides amqp links for send and receive. * @param cbsNodeSupplier Mono that returns a reference to the {@link ClaimsBasedSecurityNode}. @@ -78,13 +73,13 @@ class ServiceBusReactorSession extends ReactorSession implements ServiceBusSessi * @param createOptions the options to create {@link ServiceBusReactorSession}. * @param isV2 (temporary) flag indicating which receiver, v1 or v2, to create. */ - ServiceBusReactorSession(AmqpConnection amqpConnection, Session session, SessionHandler sessionHandler, - String sessionName, ReactorProvider provider, ReactorHandlerProvider handlerProvider, - ServiceBusAmqpLinkProvider linkProvider, Mono cbsNodeSupplier, TokenManagerProvider tokenManagerProvider, + ServiceBusReactorSession(AmqpConnection amqpConnection, ProtonSessionWrapper session, + ReactorHandlerProvider handlerProvider, ServiceBusAmqpLinkProvider linkProvider, + Mono cbsNodeSupplier, TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer, AmqpRetryOptions retryOptions, ServiceBusCreateSessionOptions createOptions, boolean isV2) { - super(amqpConnection, session, sessionHandler, sessionName, provider, handlerProvider, linkProvider, cbsNodeSupplier, - tokenManagerProvider, messageSerializer, retryOptions); + super(amqpConnection, session, handlerProvider, linkProvider, cbsNodeSupplier, tokenManagerProvider, + messageSerializer, retryOptions); this.amqpConnection = amqpConnection; this.retryOptions = retryOptions; this.linkProvider = linkProvider; diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientRecoveryIsolatedTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientRecoveryIsolatedTest.java index e68177a06c2cd..22d9983b55596 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientRecoveryIsolatedTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientRecoveryIsolatedTest.java @@ -853,7 +853,7 @@ ServiceBusReactorAmqpConnection arrange() { // New tests only for ReactorConnectionCache introduced in v2. final boolean isV2 = true; return new ServiceBusReactorAmqpConnection(connectionId, connectionOptions, - reactorProvider, handlerProvider, linkProvider, tokenManagerProvider, messageSerializer, false, isV2); + reactorProvider, handlerProvider, linkProvider, tokenManagerProvider, messageSerializer, false, isV2, false); } AmqpSendLink getAmqpSendLink(int sessionIdx, int linkIdx) { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSessionTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSessionTest.java index f5d4c62bd1c98..9c316095f68e5 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSessionTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSessionTest.java @@ -12,6 +12,7 @@ import com.azure.core.amqp.exception.AmqpResponseCode; import com.azure.core.amqp.implementation.AmqpConstants; import com.azure.core.amqp.implementation.MessageSerializer; +import com.azure.core.amqp.implementation.ProtonSessionWrapper; import com.azure.core.amqp.implementation.ReactorDispatcher; import com.azure.core.amqp.implementation.ReactorHandlerProvider; import com.azure.core.amqp.implementation.ReactorProvider; @@ -143,6 +144,7 @@ void setup(TestInfo testInfo) { when(handler.getEndpointStates()).thenReturn(endpointStateReplayProcessor); FluxSink sink1 = endpointStateReplayProcessor.sink(); sink1.next(EndpointState.ACTIVE); + when(handler.getSessionName()).thenReturn(SESSION_NAME); when(handler.getHostname()).thenReturn(HOSTNAME); when(handler.getConnectionId()).thenReturn(CONNECTION_ID); @@ -187,7 +189,10 @@ void setup(TestInfo testInfo) { when(reactorProvider.getReactorDispatcher()).thenReturn(dispatcher); when(connection.getShutdownSignals()).thenReturn(Flux.empty()); - serviceBusReactorSession = new ServiceBusReactorSession(connection, session, handler, SESSION_NAME, reactorProvider, + // TODO (anu): use 'ProtonSession' instead of 'ProtonSessionWrapper' and update the test. + final ProtonSessionWrapper sessionWrapper = new ProtonSessionWrapper(session, handler, reactorProvider); + + serviceBusReactorSession = new ServiceBusReactorSession(connection, sessionWrapper, handlerProvider, linkProvider, cbsNodeSupplier, tokenManagerProvider, messageSerializer, retryOptions, new ServiceBusCreateSessionOptions(false), true); when(connection.getShutdownSignals()).thenReturn(Flux.never()); @@ -290,9 +295,11 @@ void createCoordinatorLink() throws IOException { doNothing().when(coordinatorSenderEntity).setTarget(any(Target.class)); when(coordinatorSenderEntity.attachments()).thenReturn(record); when(session.sender(transactionLinkName)).thenReturn(coordinatorSenderEntity); + // TODO (anu): use 'ProtonSession' instead of 'ProtonSessionWrapper' and update the test. + final ProtonSessionWrapper sessionWrapper = new ProtonSessionWrapper(session, handler, reactorProvider); - final ServiceBusReactorSession serviceBusReactorSession = new ServiceBusReactorSession(connection, session, handler, - SESSION_NAME, reactorProvider, handlerProvider, linkProvider, cbsNodeSupplier, tokenManagerProvider, messageSerializer, + final ServiceBusReactorSession serviceBusReactorSession = new ServiceBusReactorSession(connection, sessionWrapper, + handlerProvider, linkProvider, cbsNodeSupplier, tokenManagerProvider, messageSerializer, retryOptions, new ServiceBusCreateSessionOptions(true), true); when(handlerProvider.createSendLinkHandler(CONNECTION_ID, HOSTNAME, transactionLinkName, transactionLinkName))