Skip to content

Commit

Permalink
Adding safety to Amqp Session operations, SessionCache to cache React…
Browse files Browse the repository at this point in the history
…orSession instances and integrating RequestResponseChannelCache
  • Loading branch information
anuchandy committed Aug 30, 2024
1 parent 2b40a53 commit 33dd3ee
Show file tree
Hide file tree
Showing 40 changed files with 2,277 additions and 333 deletions.
2 changes: 2 additions & 0 deletions eng/versioning/version_client.txt
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,8 @@ io.clientcore:http-stress;1.0.0-beta.1;1.0.0-beta.1
# <!-- {x-version-update;unreleased_com.azure:azure-core;dependency} -->
unreleased_com.azure:azure-identity;1.14.0-beta.2

unreleased_com.azure:azure-core-amqp;2.10.0-beta.1

# Released Beta dependencies: Copy the entry from above, prepend "beta_", remove the current
# version and set the version to the released beta. Released beta dependencies are only valid
# for dependency versions. These entries are specifically for when we've released a beta for
Expand Down
11 changes: 11 additions & 0 deletions sdk/core/azure-core-amqp/spotbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
<Class name="com.azure.core.amqp.implementation.ReactorDispatcher" />
<Class name="com.azure.core.amqp.implementation.ReactorSession" />
<Class name="com.azure.core.amqp.implementation.ReceiveLinkHandlerWrapper" />
<Class name="com.azure.core.amqp.implementation.RequestResponseChannelCache" />
</Or>
<Method name="&lt;init&gt;" />
</And>
Expand Down Expand Up @@ -120,4 +121,14 @@
<Bug pattern="SE_NO_SERIALVERSIONID" />
<Class name="com.azure.core.amqp.implementation.handler.ReceiverUnsettledDeliveries$DispositionWork" />
</Match>

<Match>
<Bug pattern="SE_BAD_FIELD" />
<Class name="com.azure.core.amqp.implementation.ReactorSessionCache$Entry" />
</Match>

<Match>
<Bug pattern="SE_NO_SERIALVERSIONID" />
<Class name="com.azure.core.amqp.implementation.ReactorSessionCache$Entry" />
</Match>
</FindBugsFilter>
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp.implementation;

import reactor.core.publisher.Mono;

import java.util.Objects;

/**
* A temporary type to side by side support {@link RequestResponseChannel} caching that
* <ul>
* <li>uses {@link AmqpChannelProcessor} in v2 mode without "com.azure.core.amqp.cache"
* opt-in or in v1 mode,</li>
* <li>or uses {@link RequestResponseChannelCache} when "com.azure.core.amqp.cache"
* is explicitly opted-in in v2 mode.</li>
* </ul>
* <p>
* TODO (anu): remove this temporary type when removing v1 and 'RequestResponseChannelCache' is no longer opt-in for v2.
* </p>
*/
public final class ChannelCacheWrapper {
private final Mono<RequestResponseChannel> channelProcessor;
private final RequestResponseChannelCache channelCache;

/**
* Creates channel cache for V1 client or V2 client without "com.azure.core.amqp.cache"
* opted-in.
*
* @param channelProcessor the {@link AmqpChannelProcessor} for caching {@link RequestResponseChannel}.
*/
public ChannelCacheWrapper(Mono<RequestResponseChannel> channelProcessor) {
this.channelProcessor = Objects.requireNonNull(channelProcessor, "'channelProcessor' cannot be null.");
this.channelCache = null;
}

/**
* Creates channel cache for V1 client with "com.azure.core.amqp.cache" opted-in.
*
* @param channelCache the cache for {@link RequestResponseChannel}.
*/
public ChannelCacheWrapper(RequestResponseChannelCache channelCache) {
this.channelCache = Objects.requireNonNull(channelCache, "'channelCache' cannot be null.");
this.channelProcessor = null;
}

/**
* Gets the underlying cache as Mono.
*
* @return the cache as Mono.
*/
public Mono<RequestResponseChannel> get() {
if (channelCache != null) {
return channelCache.get();
} else {
return channelProcessor;
}
}

/**
* Closes the cache.
*
* @return Mono that completes when cache is closed.
*/
public Mono<Void> closeAsync() {
if (channelCache != null) {
return channelCache.closeAsync();
} else {
return channelProcessor.flatMap(RequestResponseChannel::closeAsync);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public final class ClientConstants {
public static final String IS_PARTIAL_DELIVERY_KEY = "delivery.isPartial";
public static final String IS_SETTLED_DELIVERY_KEY = "delivery.isSettled";
public static final String SESSION_NAME_KEY = "sessionName";
public static final String SESSION_ID_KEY = "sessionId";
public static final String FULLY_QUALIFIED_NAMESPACE_KEY = "namespace";
public static final String OPERATION_NAME_KEY = "amqpOperation";
public static final String DELIVERY_KEY = "delivery";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,22 @@
*/
public class ManagementChannel implements AmqpManagementNode {
private final TokenManager tokenManager;
private final AmqpChannelProcessor<RequestResponseChannel> createChannel;
private final ChannelCacheWrapper channelCache;
private final String fullyQualifiedNamespace;
private final ClientLogger logger;
private final String entityPath;

/**
* Creates a new instance of ManagementChannel.
*
* @param createChannel Creates a new AMQP channel.
* @param channelCache The request response channel cache.
* @param fullyQualifiedNamespace Fully qualified namespace for the message broker.
* @param entityPath The entity path for the message broker.
* @param tokenManager Manages tokens for authorization.
*/
public ManagementChannel(AmqpChannelProcessor<RequestResponseChannel> createChannel, String fullyQualifiedNamespace,
String entityPath, TokenManager tokenManager) {
this.createChannel = Objects.requireNonNull(createChannel, "'createChannel' cannot be null.");
public ManagementChannel(ChannelCacheWrapper channelCache, String fullyQualifiedNamespace, String entityPath,
TokenManager tokenManager) {
this.channelCache = Objects.requireNonNull(channelCache, "'channelCache' cannot be null.");
this.fullyQualifiedNamespace
= Objects.requireNonNull(fullyQualifiedNamespace, "'fullyQualifiedNamespace' cannot be null.");
this.entityPath = Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
Expand All @@ -62,7 +62,7 @@ public ManagementChannel(AmqpChannelProcessor<RequestResponseChannel> createChan

@Override
public Mono<AmqpAnnotatedMessage> send(AmqpAnnotatedMessage message) {
return isAuthorized().then(createChannel.flatMap(channel -> {
return isAuthorized().then(channelCache.get().flatMap(channel -> {
final Message protonJMessage = MessageUtils.toProtonJMessage(message);

return channel.sendWithAck(protonJMessage)
Expand All @@ -74,7 +74,7 @@ public Mono<AmqpAnnotatedMessage> send(AmqpAnnotatedMessage message) {

@Override
public Mono<AmqpAnnotatedMessage> send(AmqpAnnotatedMessage message, DeliveryOutcome deliveryOutcome) {
return isAuthorized().then(createChannel.flatMap(channel -> {
return isAuthorized().then(channelCache.get().flatMap(channel -> {
final Message protonJMessage = MessageUtils.toProtonJMessage(message);
final DeliveryState protonJDeliveryState = MessageUtils.toProtonJDeliveryState(deliveryOutcome);

Expand All @@ -87,7 +87,7 @@ public Mono<AmqpAnnotatedMessage> send(AmqpAnnotatedMessage message, DeliveryOut

@Override
public Mono<Void> closeAsync() {
return createChannel.flatMap(channel -> channel.closeAsync()).cache();
return channelCache.closeAsync().cache();
}

private void handleResponse(Message response, SynchronousSink<AmqpAnnotatedMessage> sink,
Expand Down
Loading

0 comments on commit 33dd3ee

Please sign in to comment.