Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable EventHub Management Node to use RequestResponseChannelCache #41805

Merged
merged 1 commit into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

### Features Added

- Integrated RequestResponseChannelCache (CBS, Management channel cache) and ReactorSessionCache, these caches are activated when V2 stack is opted-in using the configuration `com.azure.messaging.eventhubs.v2`. ([39107](https://github.com/Azure/azure-sdk-for-java/pull/39107))
- Integrated RequestResponseChannelCache (CBS, Management channel cache) and ReactorSessionCache, these caches are activated when V2 stack is opted-in using the configuration `com.azure.messaging.eventhubs.v2`. ([39107](https://github.com/Azure/azure-sdk-for-java/pull/39107)), ([41805](https://github.com/Azure/azure-sdk-for-java/pull/41805))

### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -995,8 +995,8 @@ EventHubAsyncClient buildAsyncClient() {
if (isSharedConnection.get()) {
synchronized (connectionLock) {
if (eventHubConnectionProcessor == null) {
final boolean useSessionChannelCache = true; // v2StackSupport.isSessionChannelCacheEnabled(configuration);
if (v2StackSupport.isV2StackEnabled(configuration)) {
final boolean useSessionChannelCache = v2StackSupport.isSessionChannelCacheEnabled(configuration);
eventHubConnectionProcessor = new ConnectionCacheWrapper(buildConnectionCache(messageSerializer, meter, useSessionChannelCache));
} else {
eventHubConnectionProcessor = new ConnectionCacheWrapper(buildConnectionProcessor(messageSerializer, meter));
Expand All @@ -1010,7 +1010,7 @@ EventHubAsyncClient buildAsyncClient() {
LOGGER.info("# of open clients with shared connection: {}", numberOfOpenClients);
} else {
if (v2StackSupport.isV2StackEnabled(configuration)) {
final boolean useSessionChannelCache = true; // v2StackSupport.isSessionChannelCacheEnabled(configuration);
final boolean useSessionChannelCache = v2StackSupport.isSessionChannelCacheEnabled(configuration);
processor = new ConnectionCacheWrapper(buildConnectionCache(messageSerializer, meter, useSessionChannelCache));
} else {
processor = new ConnectionCacheWrapper(buildConnectionProcessor(messageSerializer, meter));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ final class V2StackSupport {
private static final String SESSION_CHANNEL_CACHE_KEY = "com.azure.core.amqp.cache";
private static final ConfigurationProperty<Boolean> SESSION_CHANNEL_CACHE_PROPERTY = ConfigurationPropertyBuilder.ofBoolean(SESSION_CHANNEL_CACHE_KEY)
.environmentVariableName(SESSION_CHANNEL_CACHE_KEY)
.defaultValue(false) // "SessionCache" and "RequestResponseChannelCache" requires explicit opt in along with v2 stack opt in.
.defaultValue(true) // "SessionCache" and "RequestResponseChannelCache" are enabled by default if v2 stack is opted in.
.shared(true)
.build();
private final AtomicReference<Boolean> sessionChannelCacheFlag = new AtomicReference<>();
Expand All @@ -64,17 +64,18 @@ boolean isV2StackEnabled(Configuration configuration) {
}

/**
* SessionCache and RequestResponseChannelCache not opted-in default, the application may opt in but only when
* v2 stack is also enabled via 'com.azure.messaging.eventhubs.v2'.
* SessionCache and RequestResponseChannelCache are enabled by default if the v2 stack is opted in via
* 'com.azure.messaging.eventhubs.v2', but application may opt out these two caches by setting
* 'com.azure.core.amqp.cache' to false.
*
* @param configuration the client configuration.
* @return true if SessionCache and RequestResponseChannelCache is opted-in.
* @return true if SessionCache and RequestResponseChannelCache are enabled.
*/
boolean isSessionChannelCacheEnabled(Configuration configuration) {
if (!isV2StackEnabled(configuration)) {
return false;
}
return isOptedIn(configuration, SESSION_CHANNEL_CACHE_PROPERTY, sessionChannelCacheFlag);
return !isOptedOut(configuration, SESSION_CHANNEL_CACHE_PROPERTY, sessionChannelCacheFlag);
}

private boolean isOptedOut(Configuration configuration, ConfigurationProperty<Boolean> configProperty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.implementation.AmqpChannelProcessor;
import com.azure.core.amqp.implementation.AmqpLinkProvider;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.amqp.implementation.AmqpSendLink;
import com.azure.core.amqp.implementation.ChannelCacheWrapper;
import com.azure.core.amqp.implementation.ConnectionOptions;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.ProtonSessionWrapper;
import com.azure.core.amqp.implementation.ReactorConnection;
import com.azure.core.amqp.implementation.ReactorHandlerProvider;
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.ReactorSession;
import com.azure.core.amqp.implementation.RequestResponseChannel;
import com.azure.core.amqp.implementation.RequestResponseChannelCache;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.TokenManagerProvider;
import com.azure.core.credential.TokenCredential;
Expand Down Expand Up @@ -54,6 +58,7 @@ public class EventHubReactorAmqpConnection extends ReactorConnection implements
private final Scheduler scheduler;
private final String eventHubName;
private final boolean isV2;
private final boolean useSessionChannelCache;

private volatile ManagementChannel managementChannel;

Expand Down Expand Up @@ -81,6 +86,7 @@ public EventHubReactorAmqpConnection(String connectionId, ConnectionOptions conn
this.messageSerializer = messageSerializer;
this.eventHubName = eventHubName;
this.isV2 = isV2;
this.useSessionChannelCache = useSessionChannelCache;
this.retryOptions = connectionOptions.getRetry();
this.tokenCredential = connectionOptions.getTokenCredential();
this.scheduler = connectionOptions.getScheduler();
Expand Down Expand Up @@ -171,11 +177,20 @@ protected ReactorSession createSession(ProtonSessionWrapper session) {

private synchronized ManagementChannel getOrCreateManagementChannel() {
if (managementChannel == null) {
managementChannel = new ManagementChannel(
createRequestResponseChannel(MANAGEMENT_SESSION_NAME, MANAGEMENT_LINK_NAME, MANAGEMENT_ADDRESS),
eventHubName, tokenCredential, tokenManagerProvider, this.messageSerializer, scheduler);
final ChannelCacheWrapper channelCache;
if (useSessionChannelCache) {
final AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy(retryOptions);
final RequestResponseChannelCache cache
= new RequestResponseChannelCache(this, MANAGEMENT_ADDRESS, MANAGEMENT_SESSION_NAME, MANAGEMENT_LINK_NAME, retryPolicy);
channelCache = new ChannelCacheWrapper(cache);
} else {
final AmqpChannelProcessor<RequestResponseChannel> cache
= createRequestResponseChannel(MANAGEMENT_SESSION_NAME, MANAGEMENT_LINK_NAME, MANAGEMENT_ADDRESS);
channelCache = new ChannelCacheWrapper(cache);
}
managementChannel = new ManagementChannel(channelCache, eventHubName, tokenCredential, tokenManagerProvider,
this.messageSerializer, scheduler);
}

return managementChannel;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.exception.AmqpResponseCode;
import com.azure.core.amqp.implementation.AmqpConstants;
import com.azure.core.amqp.implementation.ChannelCacheWrapper;
import com.azure.core.amqp.implementation.ExceptionUtil;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.RequestResponseChannel;
Expand Down Expand Up @@ -60,7 +61,7 @@ public class ManagementChannel implements EventHubManagementNode {

private static final ClientLogger LOGGER = new ClientLogger(ManagementChannel.class);
private final TokenCredential tokenProvider;
private final Mono<RequestResponseChannel> channelMono;
private final ChannelCacheWrapper channelCache;
private final Scheduler scheduler;
private final String eventHubName;
private final MessageSerializer messageSerializer;
Expand All @@ -75,13 +76,13 @@ public class ManagementChannel implements EventHubManagementNode {
/**
* Creates an instance that is connected to the {@code eventHubName}'s management node.
*
* @param responseChannelMono Mono that completes with a new {@link RequestResponseChannel}.
* @param channelCache a cache that if needed obtain and cache the {@link RequestResponseChannel}.
* @param eventHubName The name of the Event Hub.
* @param credential Credential to authorize user for access to the Event Hub.
* @param tokenManagerProvider Provides a token manager that will keep track and maintain tokens.
* @param messageSerializer Maps responses from the management channel.
*/
ManagementChannel(Mono<RequestResponseChannel> responseChannelMono, String eventHubName, TokenCredential credential,
ManagementChannel(ChannelCacheWrapper channelCache, String eventHubName, TokenCredential credential,
TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer,
Scheduler scheduler) {

Expand All @@ -90,11 +91,11 @@ public class ManagementChannel implements EventHubManagementNode {
this.tokenProvider = Objects.requireNonNull(credential, "'credential' cannot be null.");
this.eventHubName = Objects.requireNonNull(eventHubName, "'eventHubName' cannot be null.");
this.messageSerializer = Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null.");
this.channelMono = Objects.requireNonNull(responseChannelMono, "'responseChannelMono' cannot be null.");
this.channelCache = Objects.requireNonNull(channelCache, "'channelCache' cannot be null.");
this.scheduler = Objects.requireNonNull(scheduler, "'scheduler' cannot be null.");

//@formatter:off
this.subscription = responseChannelMono
this.subscription = channelCache.get()
.flatMapMany(e -> e.getEndpointStates().distinctUntilChanged())
.subscribe(e -> {
LOGGER.info("Management endpoint state: {}", e);
Expand Down Expand Up @@ -158,7 +159,7 @@ private <T> Mono<T> getProperties(Map<String, Object> properties, Class<T> respo
final ApplicationProperties applicationProperties = new ApplicationProperties(properties);
request.setApplicationProperties(applicationProperties);

return channelMono.flatMap(channel -> channel.sendWithAck(request)
return channelCache.get().flatMap(channel -> channel.sendWithAck(request)
.handle((message, sink) -> {
if (RequestResponseUtils.isSuccessful(message)) {
sink.next(messageSerializer.deserialize(message, responseType));
Expand Down Expand Up @@ -186,8 +187,6 @@ public void close() {
isDisposed = true;
subscription.dispose();

if (channelMono instanceof Disposable) {
((Disposable) channelMono).dispose();
}
channelCache.dispose();
}
}