Skip to content

Commit

Permalink
EventHub Management Node to use new cache (#41805)
Browse files Browse the repository at this point in the history
  • Loading branch information
anuchandy authored Sep 11, 2024
1 parent 5faff56 commit adbd2f7
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 21 deletions.
2 changes: 1 addition & 1 deletion sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

### Features Added

- Integrated RequestResponseChannelCache (CBS, Management channel cache) and ReactorSessionCache, these caches are activated when V2 stack is opted-in using the configuration `com.azure.messaging.eventhubs.v2`. ([39107](https://github.com/Azure/azure-sdk-for-java/pull/39107))
- Integrated RequestResponseChannelCache (CBS, Management channel cache) and ReactorSessionCache, these caches are activated when V2 stack is opted-in using the configuration `com.azure.messaging.eventhubs.v2`. ([39107](https://github.com/Azure/azure-sdk-for-java/pull/39107)), ([41805](https://github.com/Azure/azure-sdk-for-java/pull/41805))

### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -995,8 +995,8 @@ EventHubAsyncClient buildAsyncClient() {
if (isSharedConnection.get()) {
synchronized (connectionLock) {
if (eventHubConnectionProcessor == null) {
final boolean useSessionChannelCache = true; // v2StackSupport.isSessionChannelCacheEnabled(configuration);
if (v2StackSupport.isV2StackEnabled(configuration)) {
final boolean useSessionChannelCache = v2StackSupport.isSessionChannelCacheEnabled(configuration);
eventHubConnectionProcessor = new ConnectionCacheWrapper(buildConnectionCache(messageSerializer, meter, useSessionChannelCache));
} else {
eventHubConnectionProcessor = new ConnectionCacheWrapper(buildConnectionProcessor(messageSerializer, meter));
Expand All @@ -1010,7 +1010,7 @@ EventHubAsyncClient buildAsyncClient() {
LOGGER.info("# of open clients with shared connection: {}", numberOfOpenClients);
} else {
if (v2StackSupport.isV2StackEnabled(configuration)) {
final boolean useSessionChannelCache = true; // v2StackSupport.isSessionChannelCacheEnabled(configuration);
final boolean useSessionChannelCache = v2StackSupport.isSessionChannelCacheEnabled(configuration);
processor = new ConnectionCacheWrapper(buildConnectionCache(messageSerializer, meter, useSessionChannelCache));
} else {
processor = new ConnectionCacheWrapper(buildConnectionProcessor(messageSerializer, meter));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ final class V2StackSupport {
private static final String SESSION_CHANNEL_CACHE_KEY = "com.azure.core.amqp.cache";
private static final ConfigurationProperty<Boolean> SESSION_CHANNEL_CACHE_PROPERTY = ConfigurationPropertyBuilder.ofBoolean(SESSION_CHANNEL_CACHE_KEY)
.environmentVariableName(SESSION_CHANNEL_CACHE_KEY)
.defaultValue(false) // "SessionCache" and "RequestResponseChannelCache" requires explicit opt in along with v2 stack opt in.
.defaultValue(true) // "SessionCache" and "RequestResponseChannelCache" are enabled by default if v2 stack is opted in.
.shared(true)
.build();
private final AtomicReference<Boolean> sessionChannelCacheFlag = new AtomicReference<>();
Expand All @@ -64,17 +64,18 @@ boolean isV2StackEnabled(Configuration configuration) {
}

/**
* SessionCache and RequestResponseChannelCache not opted-in default, the application may opt in but only when
* v2 stack is also enabled via 'com.azure.messaging.eventhubs.v2'.
* SessionCache and RequestResponseChannelCache are enabled by default if the v2 stack is opted in via
* 'com.azure.messaging.eventhubs.v2', but application may opt out these two caches by setting
* 'com.azure.core.amqp.cache' to false.
*
* @param configuration the client configuration.
* @return true if SessionCache and RequestResponseChannelCache is opted-in.
* @return true if SessionCache and RequestResponseChannelCache are enabled.
*/
boolean isSessionChannelCacheEnabled(Configuration configuration) {
if (!isV2StackEnabled(configuration)) {
return false;
}
return isOptedIn(configuration, SESSION_CHANNEL_CACHE_PROPERTY, sessionChannelCacheFlag);
return !isOptedOut(configuration, SESSION_CHANNEL_CACHE_PROPERTY, sessionChannelCacheFlag);
}

private boolean isOptedOut(Configuration configuration, ConfigurationProperty<Boolean> configProperty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.implementation.AmqpChannelProcessor;
import com.azure.core.amqp.implementation.AmqpLinkProvider;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.amqp.implementation.AmqpSendLink;
import com.azure.core.amqp.implementation.ChannelCacheWrapper;
import com.azure.core.amqp.implementation.ConnectionOptions;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.ProtonSessionWrapper;
import com.azure.core.amqp.implementation.ReactorConnection;
import com.azure.core.amqp.implementation.ReactorHandlerProvider;
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.ReactorSession;
import com.azure.core.amqp.implementation.RequestResponseChannel;
import com.azure.core.amqp.implementation.RequestResponseChannelCache;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.TokenManagerProvider;
import com.azure.core.credential.TokenCredential;
Expand Down Expand Up @@ -54,6 +58,7 @@ public class EventHubReactorAmqpConnection extends ReactorConnection implements
private final Scheduler scheduler;
private final String eventHubName;
private final boolean isV2;
private final boolean useSessionChannelCache;

private volatile ManagementChannel managementChannel;

Expand Down Expand Up @@ -81,6 +86,7 @@ public EventHubReactorAmqpConnection(String connectionId, ConnectionOptions conn
this.messageSerializer = messageSerializer;
this.eventHubName = eventHubName;
this.isV2 = isV2;
this.useSessionChannelCache = useSessionChannelCache;
this.retryOptions = connectionOptions.getRetry();
this.tokenCredential = connectionOptions.getTokenCredential();
this.scheduler = connectionOptions.getScheduler();
Expand Down Expand Up @@ -171,11 +177,20 @@ protected ReactorSession createSession(ProtonSessionWrapper session) {

private synchronized ManagementChannel getOrCreateManagementChannel() {
if (managementChannel == null) {
managementChannel = new ManagementChannel(
createRequestResponseChannel(MANAGEMENT_SESSION_NAME, MANAGEMENT_LINK_NAME, MANAGEMENT_ADDRESS),
eventHubName, tokenCredential, tokenManagerProvider, this.messageSerializer, scheduler);
final ChannelCacheWrapper channelCache;
if (useSessionChannelCache) {
final AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy(retryOptions);
final RequestResponseChannelCache cache
= new RequestResponseChannelCache(this, MANAGEMENT_ADDRESS, MANAGEMENT_SESSION_NAME, MANAGEMENT_LINK_NAME, retryPolicy);
channelCache = new ChannelCacheWrapper(cache);
} else {
final AmqpChannelProcessor<RequestResponseChannel> cache
= createRequestResponseChannel(MANAGEMENT_SESSION_NAME, MANAGEMENT_LINK_NAME, MANAGEMENT_ADDRESS);
channelCache = new ChannelCacheWrapper(cache);
}
managementChannel = new ManagementChannel(channelCache, eventHubName, tokenCredential, tokenManagerProvider,
this.messageSerializer, scheduler);
}

return managementChannel;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.exception.AmqpResponseCode;
import com.azure.core.amqp.implementation.AmqpConstants;
import com.azure.core.amqp.implementation.ChannelCacheWrapper;
import com.azure.core.amqp.implementation.ExceptionUtil;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.RequestResponseChannel;
Expand Down Expand Up @@ -60,7 +61,7 @@ public class ManagementChannel implements EventHubManagementNode {

private static final ClientLogger LOGGER = new ClientLogger(ManagementChannel.class);
private final TokenCredential tokenProvider;
private final Mono<RequestResponseChannel> channelMono;
private final ChannelCacheWrapper channelCache;
private final Scheduler scheduler;
private final String eventHubName;
private final MessageSerializer messageSerializer;
Expand All @@ -75,13 +76,13 @@ public class ManagementChannel implements EventHubManagementNode {
/**
* Creates an instance that is connected to the {@code eventHubName}'s management node.
*
* @param responseChannelMono Mono that completes with a new {@link RequestResponseChannel}.
* @param channelCache a cache that if needed obtain and cache the {@link RequestResponseChannel}.
* @param eventHubName The name of the Event Hub.
* @param credential Credential to authorize user for access to the Event Hub.
* @param tokenManagerProvider Provides a token manager that will keep track and maintain tokens.
* @param messageSerializer Maps responses from the management channel.
*/
ManagementChannel(Mono<RequestResponseChannel> responseChannelMono, String eventHubName, TokenCredential credential,
ManagementChannel(ChannelCacheWrapper channelCache, String eventHubName, TokenCredential credential,
TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer,
Scheduler scheduler) {

Expand All @@ -90,11 +91,11 @@ public class ManagementChannel implements EventHubManagementNode {
this.tokenProvider = Objects.requireNonNull(credential, "'credential' cannot be null.");
this.eventHubName = Objects.requireNonNull(eventHubName, "'eventHubName' cannot be null.");
this.messageSerializer = Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null.");
this.channelMono = Objects.requireNonNull(responseChannelMono, "'responseChannelMono' cannot be null.");
this.channelCache = Objects.requireNonNull(channelCache, "'channelCache' cannot be null.");
this.scheduler = Objects.requireNonNull(scheduler, "'scheduler' cannot be null.");

//@formatter:off
this.subscription = responseChannelMono
this.subscription = channelCache.get()
.flatMapMany(e -> e.getEndpointStates().distinctUntilChanged())
.subscribe(e -> {
LOGGER.info("Management endpoint state: {}", e);
Expand Down Expand Up @@ -158,7 +159,7 @@ private <T> Mono<T> getProperties(Map<String, Object> properties, Class<T> respo
final ApplicationProperties applicationProperties = new ApplicationProperties(properties);
request.setApplicationProperties(applicationProperties);

return channelMono.flatMap(channel -> channel.sendWithAck(request)
return channelCache.get().flatMap(channel -> channel.sendWithAck(request)
.handle((message, sink) -> {
if (RequestResponseUtils.isSuccessful(message)) {
sink.next(messageSerializer.deserialize(message, responseType));
Expand Down Expand Up @@ -186,8 +187,6 @@ public void close() {
isDisposed = true;
subscription.dispose();

if (channelMono instanceof Disposable) {
((Disposable) channelMono).dispose();
}
channelCache.dispose();
}
}

0 comments on commit adbd2f7

Please sign in to comment.