Skip to content

Commit

Permalink
Adding safety to Amqp Session operations, SessionCache to cache React…
Browse files Browse the repository at this point in the history
…orSession instances and integrating RequestResponseChannelCache
  • Loading branch information
anuchandy committed Jul 17, 2024
1 parent d56e1e3 commit da01ad4
Show file tree
Hide file tree
Showing 35 changed files with 2,106 additions and 270 deletions.
2 changes: 2 additions & 0 deletions eng/versioning/version_client.txt
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,8 @@ io.clientcore:http-stress;1.0.0-beta.1;1.0.0-beta.1
# In the pom, the version update tag after the version should name the unreleased package and the dependency version:
# <!-- {x-version-update;unreleased_com.azure:azure-core;dependency} -->

unreleased_com.azure:azure-core-amqp;2.10.0-beta.1

# Released Beta dependencies: Copy the entry from above, prepend "beta_", remove the current
# version and set the version to the released beta. Released beta dependencies are only valid
# for dependency versions. These entries are specifically for when we've released a beta for
Expand Down
10 changes: 10 additions & 0 deletions sdk/core/azure-core-amqp/spotbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,14 @@
<Bug pattern="SE_NO_SERIALVERSIONID" />
<Class name="com.azure.core.amqp.implementation.handler.ReceiverUnsettledDeliveries$DispositionWork" />
</Match>

<Match>
<Bug pattern="SE_BAD_FIELD" />
<Class name="com.azure.core.amqp.implementation.ReactorSessionCache$Entry" />
</Match>

<Match>
<Bug pattern="SE_NO_SERIALVERSIONID" />
<Class name="com.azure.core.amqp.implementation.ReactorSessionCache$Entry" />
</Match>
</FindBugsFilter>
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp.implementation;

import reactor.core.publisher.Mono;

import java.util.Objects;

/**
* A temporary type to side by side support {@link RequestResponseChannel} caching that
* <ul>
* <li>uses {@link AmqpChannelProcessor} in v2 mode without "com.azure.core.amqp.internal.session-channel-cache.v2"
* opt-in or in v1 mode,</li>
* <li>or uses {@link RequestResponseChannelCache} when "com.azure.core.amqp.internal.session-channel-cache.v2"
* is explicitly opted-in in v2 mode.</li>
* </ul>
* <p>
* TODO (anu): remove this temporary type when removing v1 and 'RequestResponseChannelCache' is no longer opt-in for v2.
* </p>
*/
final class ChannelCacheWrapper {
private final AmqpChannelProcessor<RequestResponseChannel> channelProcessor;
private final RequestResponseChannelCache channelCache;

/**
* Creates channel cache for V1 client or V2 client without "com.azure.core.amqp.internal.session-channel-cache.v2"
* opted-in.
*
* @param channelProcessor the channel processor for caching {@link RequestResponseChannel}.
*/
ChannelCacheWrapper(AmqpChannelProcessor<RequestResponseChannel> channelProcessor) {
this.channelProcessor = Objects.requireNonNull(channelProcessor, "'channelProcessor' cannot be null.");
this.channelCache = null;
}

/**
* Creates channel cache for V1 client with "com.azure.core.amqp.internal.session-channel-cache.v2" opted-in.
*
* @param channelCache the cache for {@link RequestResponseChannel}.
*/
ChannelCacheWrapper(RequestResponseChannelCache channelCache) {
this.channelCache = Objects.requireNonNull(channelCache, "'channelCache' cannot be null.");
this.channelProcessor = null;
}

Mono<RequestResponseChannel> get() {
if (channelCache != null) {
return channelCache.get();
} else {
return channelProcessor;
}
}

Mono<Void> closeAsync() {
if (channelCache != null) {
return channelCache.closeAsync();
} else {
return channelProcessor.flatMap(RequestResponseChannel::closeAsync);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,22 @@
*/
public class ManagementChannel implements AmqpManagementNode {
private final TokenManager tokenManager;
private final AmqpChannelProcessor<RequestResponseChannel> createChannel;
private final ChannelCacheWrapper channelCache;
private final String fullyQualifiedNamespace;
private final ClientLogger logger;
private final String entityPath;

/**
* Creates a new instance of ManagementChannel.
*
* @param createChannel Creates a new AMQP channel.
* @param channelCache The request response channel cache.
* @param fullyQualifiedNamespace Fully qualified namespace for the message broker.
* @param entityPath The entity path for the message broker.
* @param tokenManager Manages tokens for authorization.
*/
public ManagementChannel(AmqpChannelProcessor<RequestResponseChannel> createChannel, String fullyQualifiedNamespace,
String entityPath, TokenManager tokenManager) {
this.createChannel = Objects.requireNonNull(createChannel, "'createChannel' cannot be null.");
public ManagementChannel(ChannelCacheWrapper channelCache, String fullyQualifiedNamespace, String entityPath,
TokenManager tokenManager) {
this.channelCache = Objects.requireNonNull(channelCache, "'channelCache' cannot be null.");
this.fullyQualifiedNamespace
= Objects.requireNonNull(fullyQualifiedNamespace, "'fullyQualifiedNamespace' cannot be null.");
this.entityPath = Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
Expand All @@ -62,7 +62,7 @@ public ManagementChannel(AmqpChannelProcessor<RequestResponseChannel> createChan

@Override
public Mono<AmqpAnnotatedMessage> send(AmqpAnnotatedMessage message) {
return isAuthorized().then(createChannel.flatMap(channel -> {
return isAuthorized().then(channelCache.get().flatMap(channel -> {
final Message protonJMessage = MessageUtils.toProtonJMessage(message);

return channel.sendWithAck(protonJMessage)
Expand All @@ -74,7 +74,7 @@ public Mono<AmqpAnnotatedMessage> send(AmqpAnnotatedMessage message) {

@Override
public Mono<AmqpAnnotatedMessage> send(AmqpAnnotatedMessage message, DeliveryOutcome deliveryOutcome) {
return isAuthorized().then(createChannel.flatMap(channel -> {
return isAuthorized().then(channelCache.get().flatMap(channel -> {
final Message protonJMessage = MessageUtils.toProtonJMessage(message);
final DeliveryState protonJDeliveryState = MessageUtils.toProtonJDeliveryState(deliveryOutcome);

Expand All @@ -87,7 +87,7 @@ public Mono<AmqpAnnotatedMessage> send(AmqpAnnotatedMessage message, DeliveryOut

@Override
public Mono<Void> closeAsync() {
return createChannel.flatMap(channel -> channel.closeAsync()).cache();
return channelCache.closeAsync().cache();
}

private void handleResponse(Message response, SynchronousSink<AmqpAnnotatedMessage> sink,
Expand Down
Loading

0 comments on commit da01ad4

Please sign in to comment.