Skip to content

Commit

Permalink
Adding safety to Amqp Session operations, SessionCache to cache React…
Browse files Browse the repository at this point in the history
…orSession instances and integrating RequestResponseChannelCache
  • Loading branch information
anuchandy committed Apr 3, 2024
1 parent 9406f48 commit 28c5ed6
Show file tree
Hide file tree
Showing 31 changed files with 2,037 additions and 274 deletions.
2 changes: 2 additions & 0 deletions eng/versioning/version_client.txt
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,8 @@ com.azure.tools:azure-sdk-build-tool;1.0.0;1.1.0-beta.1

unreleased_com.azure:azure-xml;1.0.0-beta.4
unreleased_com.azure:azure-core;1.48.0-beta.1
unreleased_com.azure:azure-core-amqp;2.10.0-beta.1
unreleased_com.azure:azure-core-test;1.25.0-beta.1
unreleased_com.azure:azure-core-http-jdk-httpclient;1.0.0-beta.12
unreleased_com.azure:azure-core-http-vertx;1.0.0-beta.17

Expand Down
10 changes: 10 additions & 0 deletions sdk/core/azure-core-amqp/spotbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,14 @@
<Bug pattern="SE_NO_SERIALVERSIONID" />
<Class name="com.azure.core.amqp.implementation.handler.ReceiverUnsettledDeliveries$DispositionWork" />
</Match>

<Match>
<Bug pattern="SE_BAD_FIELD" />
<Class name="com.azure.core.amqp.implementation.ReactorSessionCache$Entry" />
</Match>

<Match>
<Bug pattern="SE_NO_SERIALVERSIONID" />
<Class name="com.azure.core.amqp.implementation.ReactorSessionCache$Entry" />
</Match>
</FindBugsFilter>
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp.implementation;

import reactor.core.publisher.Mono;

import java.util.Objects;

/**
* A temporary type to side by side support {@link RequestResponseChannel} caching that
* <ul>
* <li>uses {@link AmqpChannelProcessor} in v2 mode without "com.azure.core.amqp.internal.session-channel-cache.v2"
* opt-in or in v1 mode,</li>
* <li>or uses {@link RequestResponseChannelCache} when "com.azure.core.amqp.internal.session-channel-cache.v2"
* is explicitly opted-in in v2 mode.</li>
* </ul>
* <p>
* TODO (anu): remove this temporary type when removing v1 and 'RequestResponseChannelCache' is no longer opt-in for v2.
* </p>
*/
final class ChannelCacheWrapper {
private final AmqpChannelProcessor<RequestResponseChannel> channelProcessor;
private final RequestResponseChannelCache channelCache;

/**
* Creates channel cache for V1 client or V2 client without "com.azure.core.amqp.internal.session-channel-cache.v2"
* opted-in.
*
* @param channelProcessor the channel processor for caching {@link RequestResponseChannel}.
*/
ChannelCacheWrapper(AmqpChannelProcessor<RequestResponseChannel> channelProcessor) {
this.channelProcessor = Objects.requireNonNull(channelProcessor, "'channelProcessor' cannot be null.");
this.channelCache = null;
}

/**
* Creates channel cache for V1 client with "com.azure.core.amqp.internal.session-channel-cache.v2" opted-in.
*
* @param channelCache the cache for {@link RequestResponseChannel}.
*/
ChannelCacheWrapper(RequestResponseChannelCache channelCache) {
this.channelCache = Objects.requireNonNull(channelCache, "'channelCache' cannot be null.");
this.channelProcessor = null;
}

Mono<RequestResponseChannel> get() {
if (channelCache != null) {
return channelCache.get();
} else {
return channelProcessor;
}
}

Mono<Void> closeAsync() {
if (channelCache != null) {
return channelCache.closeAsync();
} else {
return channelProcessor.flatMap(RequestResponseChannel::closeAsync);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,22 @@
*/
public class ManagementChannel implements AmqpManagementNode {
private final TokenManager tokenManager;
private final AmqpChannelProcessor<RequestResponseChannel> createChannel;
private final ChannelCacheWrapper channelCache;
private final String fullyQualifiedNamespace;
private final ClientLogger logger;
private final String entityPath;

/**
* Creates a new instance of ManagementChannel.
*
* @param createChannel Creates a new AMQP channel.
* @param channelCache The request response channel cache.
* @param fullyQualifiedNamespace Fully qualified namespace for the message broker.
* @param entityPath The entity path for the message broker.
* @param tokenManager Manages tokens for authorization.
*/
public ManagementChannel(AmqpChannelProcessor<RequestResponseChannel> createChannel, String fullyQualifiedNamespace,
String entityPath, TokenManager tokenManager) {
this.createChannel = Objects.requireNonNull(createChannel, "'createChannel' cannot be null.");
public ManagementChannel(ChannelCacheWrapper channelCache, String fullyQualifiedNamespace, String entityPath,
TokenManager tokenManager) {
this.channelCache = Objects.requireNonNull(channelCache, "'channelCache' cannot be null.");
this.fullyQualifiedNamespace
= Objects.requireNonNull(fullyQualifiedNamespace, "'fullyQualifiedNamespace' cannot be null.");
this.entityPath = Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
Expand All @@ -62,7 +62,7 @@ public ManagementChannel(AmqpChannelProcessor<RequestResponseChannel> createChan

@Override
public Mono<AmqpAnnotatedMessage> send(AmqpAnnotatedMessage message) {
return isAuthorized().then(createChannel.flatMap(channel -> {
return isAuthorized().then(channelCache.get().flatMap(channel -> {
final Message protonJMessage = MessageUtils.toProtonJMessage(message);

return channel.sendWithAck(protonJMessage)
Expand All @@ -74,7 +74,7 @@ public Mono<AmqpAnnotatedMessage> send(AmqpAnnotatedMessage message) {

@Override
public Mono<AmqpAnnotatedMessage> send(AmqpAnnotatedMessage message, DeliveryOutcome deliveryOutcome) {
return isAuthorized().then(createChannel.flatMap(channel -> {
return isAuthorized().then(channelCache.get().flatMap(channel -> {
final Message protonJMessage = MessageUtils.toProtonJMessage(message);
final DeliveryState protonJDeliveryState = MessageUtils.toProtonJDeliveryState(deliveryOutcome);

Expand All @@ -87,7 +87,7 @@ public Mono<AmqpAnnotatedMessage> send(AmqpAnnotatedMessage message, DeliveryOut

@Override
public Mono<Void> closeAsync() {
return createChannel.flatMap(channel -> channel.closeAsync()).cache();
return channelCache.closeAsync().cache();
}

private void handleResponse(Message response, SynchronousSink<AmqpAnnotatedMessage> sink,
Expand Down
Loading

0 comments on commit 28c5ed6

Please sign in to comment.