From adbd2f71663668c0ca59625f4cc6903949ae572b Mon Sep 17 00:00:00 2001 From: Anu Thomas Chandy Date: Wed, 11 Sep 2024 09:43:04 -0700 Subject: [PATCH] EventHub Management Node to use new cache (#41805) --- .../azure-messaging-eventhubs/CHANGELOG.md | 2 +- .../eventhubs/EventHubClientBuilder.java | 4 ++-- .../messaging/eventhubs/V2StackSupport.java | 11 +++++---- .../EventHubReactorAmqpConnection.java | 23 +++++++++++++++---- .../implementation/ManagementChannel.java | 17 +++++++------- 5 files changed, 36 insertions(+), 21 deletions(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md b/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md index 49a731a35c046..1d9503365c6a3 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md +++ b/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md @@ -4,7 +4,7 @@ ### Features Added -- Integrated RequestResponseChannelCache (CBS, Management channel cache) and ReactorSessionCache, these caches are activated when V2 stack is opted-in using the configuration `com.azure.messaging.eventhubs.v2`. ([39107](https://github.com/Azure/azure-sdk-for-java/pull/39107)) +- Integrated RequestResponseChannelCache (CBS, Management channel cache) and ReactorSessionCache, these caches are activated when V2 stack is opted-in using the configuration `com.azure.messaging.eventhubs.v2`. ([39107](https://github.com/Azure/azure-sdk-for-java/pull/39107)), ([41805](https://github.com/Azure/azure-sdk-for-java/pull/41805)) ### Breaking Changes diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java index a63b7e1674231..e0a18c90d89df 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java @@ -995,8 +995,8 @@ EventHubAsyncClient buildAsyncClient() { if (isSharedConnection.get()) { synchronized (connectionLock) { if (eventHubConnectionProcessor == null) { - final boolean useSessionChannelCache = true; // v2StackSupport.isSessionChannelCacheEnabled(configuration); if (v2StackSupport.isV2StackEnabled(configuration)) { + final boolean useSessionChannelCache = v2StackSupport.isSessionChannelCacheEnabled(configuration); eventHubConnectionProcessor = new ConnectionCacheWrapper(buildConnectionCache(messageSerializer, meter, useSessionChannelCache)); } else { eventHubConnectionProcessor = new ConnectionCacheWrapper(buildConnectionProcessor(messageSerializer, meter)); @@ -1010,7 +1010,7 @@ EventHubAsyncClient buildAsyncClient() { LOGGER.info("# of open clients with shared connection: {}", numberOfOpenClients); } else { if (v2StackSupport.isV2StackEnabled(configuration)) { - final boolean useSessionChannelCache = true; // v2StackSupport.isSessionChannelCacheEnabled(configuration); + final boolean useSessionChannelCache = v2StackSupport.isSessionChannelCacheEnabled(configuration); processor = new ConnectionCacheWrapper(buildConnectionCache(messageSerializer, meter, useSessionChannelCache)); } else { processor = new ConnectionCacheWrapper(buildConnectionProcessor(messageSerializer, meter)); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/V2StackSupport.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/V2StackSupport.java index fb0642de93291..372bef8ac8736 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/V2StackSupport.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/V2StackSupport.java @@ -42,7 +42,7 @@ final class V2StackSupport { private static final String SESSION_CHANNEL_CACHE_KEY = "com.azure.core.amqp.cache"; private static final ConfigurationProperty SESSION_CHANNEL_CACHE_PROPERTY = ConfigurationPropertyBuilder.ofBoolean(SESSION_CHANNEL_CACHE_KEY) .environmentVariableName(SESSION_CHANNEL_CACHE_KEY) - .defaultValue(false) // "SessionCache" and "RequestResponseChannelCache" requires explicit opt in along with v2 stack opt in. + .defaultValue(true) // "SessionCache" and "RequestResponseChannelCache" are enabled by default if v2 stack is opted in. .shared(true) .build(); private final AtomicReference sessionChannelCacheFlag = new AtomicReference<>(); @@ -64,17 +64,18 @@ boolean isV2StackEnabled(Configuration configuration) { } /** - * SessionCache and RequestResponseChannelCache not opted-in default, the application may opt in but only when - * v2 stack is also enabled via 'com.azure.messaging.eventhubs.v2'. + * SessionCache and RequestResponseChannelCache are enabled by default if the v2 stack is opted in via + * 'com.azure.messaging.eventhubs.v2', but application may opt out these two caches by setting + * 'com.azure.core.amqp.cache' to false. * * @param configuration the client configuration. - * @return true if SessionCache and RequestResponseChannelCache is opted-in. + * @return true if SessionCache and RequestResponseChannelCache are enabled. */ boolean isSessionChannelCacheEnabled(Configuration configuration) { if (!isV2StackEnabled(configuration)) { return false; } - return isOptedIn(configuration, SESSION_CHANNEL_CACHE_PROPERTY, sessionChannelCacheFlag); + return !isOptedOut(configuration, SESSION_CHANNEL_CACHE_PROPERTY, sessionChannelCacheFlag); } private boolean isOptedOut(Configuration configuration, ConfigurationProperty configProperty, diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubReactorAmqpConnection.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubReactorAmqpConnection.java index d7baa79745f89..3b3e8e34af4f9 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubReactorAmqpConnection.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubReactorAmqpConnection.java @@ -5,9 +5,11 @@ import com.azure.core.amqp.AmqpRetryOptions; import com.azure.core.amqp.AmqpRetryPolicy; +import com.azure.core.amqp.implementation.AmqpChannelProcessor; import com.azure.core.amqp.implementation.AmqpLinkProvider; import com.azure.core.amqp.implementation.AmqpReceiveLink; import com.azure.core.amqp.implementation.AmqpSendLink; +import com.azure.core.amqp.implementation.ChannelCacheWrapper; import com.azure.core.amqp.implementation.ConnectionOptions; import com.azure.core.amqp.implementation.MessageSerializer; import com.azure.core.amqp.implementation.ProtonSessionWrapper; @@ -15,6 +17,8 @@ import com.azure.core.amqp.implementation.ReactorHandlerProvider; import com.azure.core.amqp.implementation.ReactorProvider; import com.azure.core.amqp.implementation.ReactorSession; +import com.azure.core.amqp.implementation.RequestResponseChannel; +import com.azure.core.amqp.implementation.RequestResponseChannelCache; import com.azure.core.amqp.implementation.RetryUtil; import com.azure.core.amqp.implementation.TokenManagerProvider; import com.azure.core.credential.TokenCredential; @@ -54,6 +58,7 @@ public class EventHubReactorAmqpConnection extends ReactorConnection implements private final Scheduler scheduler; private final String eventHubName; private final boolean isV2; + private final boolean useSessionChannelCache; private volatile ManagementChannel managementChannel; @@ -81,6 +86,7 @@ public EventHubReactorAmqpConnection(String connectionId, ConnectionOptions conn this.messageSerializer = messageSerializer; this.eventHubName = eventHubName; this.isV2 = isV2; + this.useSessionChannelCache = useSessionChannelCache; this.retryOptions = connectionOptions.getRetry(); this.tokenCredential = connectionOptions.getTokenCredential(); this.scheduler = connectionOptions.getScheduler(); @@ -171,11 +177,20 @@ protected ReactorSession createSession(ProtonSessionWrapper session) { private synchronized ManagementChannel getOrCreateManagementChannel() { if (managementChannel == null) { - managementChannel = new ManagementChannel( - createRequestResponseChannel(MANAGEMENT_SESSION_NAME, MANAGEMENT_LINK_NAME, MANAGEMENT_ADDRESS), - eventHubName, tokenCredential, tokenManagerProvider, this.messageSerializer, scheduler); + final ChannelCacheWrapper channelCache; + if (useSessionChannelCache) { + final AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy(retryOptions); + final RequestResponseChannelCache cache + = new RequestResponseChannelCache(this, MANAGEMENT_ADDRESS, MANAGEMENT_SESSION_NAME, MANAGEMENT_LINK_NAME, retryPolicy); + channelCache = new ChannelCacheWrapper(cache); + } else { + final AmqpChannelProcessor cache + = createRequestResponseChannel(MANAGEMENT_SESSION_NAME, MANAGEMENT_LINK_NAME, MANAGEMENT_ADDRESS); + channelCache = new ChannelCacheWrapper(cache); + } + managementChannel = new ManagementChannel(channelCache, eventHubName, tokenCredential, tokenManagerProvider, + this.messageSerializer, scheduler); } - return managementChannel; } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ManagementChannel.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ManagementChannel.java index 10fbadb26cb70..10e103918d975 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ManagementChannel.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ManagementChannel.java @@ -6,6 +6,7 @@ import com.azure.core.amqp.AmqpEndpointState; import com.azure.core.amqp.exception.AmqpResponseCode; import com.azure.core.amqp.implementation.AmqpConstants; +import com.azure.core.amqp.implementation.ChannelCacheWrapper; import com.azure.core.amqp.implementation.ExceptionUtil; import com.azure.core.amqp.implementation.MessageSerializer; import com.azure.core.amqp.implementation.RequestResponseChannel; @@ -60,7 +61,7 @@ public class ManagementChannel implements EventHubManagementNode { private static final ClientLogger LOGGER = new ClientLogger(ManagementChannel.class); private final TokenCredential tokenProvider; - private final Mono channelMono; + private final ChannelCacheWrapper channelCache; private final Scheduler scheduler; private final String eventHubName; private final MessageSerializer messageSerializer; @@ -75,13 +76,13 @@ public class ManagementChannel implements EventHubManagementNode { /** * Creates an instance that is connected to the {@code eventHubName}'s management node. * - * @param responseChannelMono Mono that completes with a new {@link RequestResponseChannel}. + * @param channelCache a cache that if needed obtain and cache the {@link RequestResponseChannel}. * @param eventHubName The name of the Event Hub. * @param credential Credential to authorize user for access to the Event Hub. * @param tokenManagerProvider Provides a token manager that will keep track and maintain tokens. * @param messageSerializer Maps responses from the management channel. */ - ManagementChannel(Mono responseChannelMono, String eventHubName, TokenCredential credential, + ManagementChannel(ChannelCacheWrapper channelCache, String eventHubName, TokenCredential credential, TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer, Scheduler scheduler) { @@ -90,11 +91,11 @@ public class ManagementChannel implements EventHubManagementNode { this.tokenProvider = Objects.requireNonNull(credential, "'credential' cannot be null."); this.eventHubName = Objects.requireNonNull(eventHubName, "'eventHubName' cannot be null."); this.messageSerializer = Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null."); - this.channelMono = Objects.requireNonNull(responseChannelMono, "'responseChannelMono' cannot be null."); + this.channelCache = Objects.requireNonNull(channelCache, "'channelCache' cannot be null."); this.scheduler = Objects.requireNonNull(scheduler, "'scheduler' cannot be null."); //@formatter:off - this.subscription = responseChannelMono + this.subscription = channelCache.get() .flatMapMany(e -> e.getEndpointStates().distinctUntilChanged()) .subscribe(e -> { LOGGER.info("Management endpoint state: {}", e); @@ -158,7 +159,7 @@ private Mono getProperties(Map properties, Class respo final ApplicationProperties applicationProperties = new ApplicationProperties(properties); request.setApplicationProperties(applicationProperties); - return channelMono.flatMap(channel -> channel.sendWithAck(request) + return channelCache.get().flatMap(channel -> channel.sendWithAck(request) .handle((message, sink) -> { if (RequestResponseUtils.isSuccessful(message)) { sink.next(messageSerializer.deserialize(message, responseType)); @@ -186,8 +187,6 @@ public void close() { isDisposed = true; subscription.dispose(); - if (channelMono instanceof Disposable) { - ((Disposable) channelMono).dispose(); - } + channelCache.dispose(); } }