From c1f36164fdd6f07eaec9de984a62da00f1dc9140 Mon Sep 17 00:00:00 2001 From: Zejia Jiang <96095733+ZejiaJiang@users.noreply.github.com> Date: Mon, 19 Sep 2022 13:47:33 +0800 Subject: [PATCH] Service Bus client identifier #28904 (#30573) * Service Bus client identifier #28904 * consistent code style & add some more test --- .../azure-messaging-servicebus/CHANGELOG.md | 1 + .../servicebus/ServiceBusClientBuilder.java | 42 +++++++++++++++++-- .../servicebus/ServiceBusProcessorClient.java | 16 +++++++ .../ServiceBusReceiverAsyncClient.java | 19 +++++++-- .../servicebus/ServiceBusReceiverClient.java | 9 ++++ .../ServiceBusSenderAsyncClient.java | 17 ++++++-- .../servicebus/ServiceBusSenderClient.java | 9 ++++ .../servicebus/ServiceBusSessionManager.java | 19 +++++++-- .../ServiceBusSessionReceiverAsyncClient.java | 10 +++-- .../ServiceBusAmqpConnection.java | 18 ++++---- .../ServiceBusReactorAmqpConnection.java | 18 ++++---- .../ServiceBusReactorSession.java | 18 ++++---- .../implementation/ServiceBusSession.java | 12 ++++-- .../ServiceBusClientBuilderTest.java | 13 ++++++ .../ServiceBusReceiverAsyncClientTest.java | 39 ++++++++++------- .../ServiceBusReceiverClientTest.java | 3 ++ .../ServiceBusSenderAsyncClientTest.java | 40 +++++++++--------- .../ServiceBusSenderClientTest.java | 3 ++ .../ServiceBusSessionManagerTest.java | 36 ++++++++++------ ...viceBusSessionReceiverAsyncClientTest.java | 11 ++--- .../ServiceBusReactorSessionTest.java | 5 ++- 21 files changed, 260 insertions(+), 98 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md index 484d416f2bfb7..b4ec1a1d0919a 100644 --- a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md @@ -3,6 +3,7 @@ ## 7.11.0-beta.1 (Unreleased) ### Features Added +- Added identifier to client. ([#28904](https://github.com/Azure/azure-sdk-for-java/issues/28904)) ### Breaking Changes diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java index eea607dcddfd4..a8449e77cd008 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java @@ -3,6 +3,7 @@ package com.azure.messaging.servicebus; +import com.azure.core.amqp.AmqpClientOptions; import com.azure.core.amqp.AmqpRetryOptions; import com.azure.core.amqp.AmqpTransportType; import com.azure.core.amqp.ProxyAuthenticationType; @@ -59,6 +60,7 @@ import java.util.Map; import java.util.Objects; import java.util.ServiceLoader; +import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.regex.Pattern; @@ -953,8 +955,16 @@ public ServiceBusSenderAsyncClient buildAsyncClient() { new IllegalArgumentException("Unknown entity type: " + entityType)); } + final String clientIdentifier; + if (clientOptions instanceof AmqpClientOptions) { + String clientOptionIdentifier = ((AmqpClientOptions) clientOptions).getIdentifier(); + clientIdentifier = CoreUtils.isNullOrEmpty(clientOptionIdentifier) ? UUID.randomUUID().toString() : clientOptionIdentifier; + } else { + clientIdentifier = UUID.randomUUID().toString(); + } + return new ServiceBusSenderAsyncClient(entityName, entityType, connectionProcessor, retryOptions, - tracerProvider, messageSerializer, ServiceBusClientBuilder.this::onClientClose, null); + tracerProvider, messageSerializer, ServiceBusClientBuilder.this::onClientClose, null, clientIdentifier); } /** @@ -1425,8 +1435,16 @@ ServiceBusReceiverAsyncClient buildAsyncClientForProcessor() { maxAutoLockRenewDuration, enableAutoComplete, null, maxConcurrentSessions); + final String clientIdentifier; + if (clientOptions instanceof AmqpClientOptions) { + String clientOptionIdentifier = ((AmqpClientOptions) clientOptions).getIdentifier(); + clientIdentifier = CoreUtils.isNullOrEmpty(clientOptionIdentifier) ? UUID.randomUUID().toString() : clientOptionIdentifier; + } else { + clientIdentifier = UUID.randomUUID().toString(); + } + final ServiceBusSessionManager sessionManager = new ServiceBusSessionManager(entityPath, entityType, - connectionProcessor, tracerProvider, messageSerializer, receiverOptions); + connectionProcessor, tracerProvider, messageSerializer, receiverOptions, clientIdentifier); return new ServiceBusReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), entityPath, entityType, receiverOptions, connectionProcessor, ServiceBusConstants.OPERATION_TIMEOUT, @@ -1494,9 +1512,17 @@ private ServiceBusSessionReceiverAsyncClient buildAsyncClient(boolean isAutoComp final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount, maxAutoLockRenewDuration, enableAutoComplete, null, maxConcurrentSessions); + final String clientIdentifier; + if (clientOptions instanceof AmqpClientOptions) { + String clientOptionIdentifier = ((AmqpClientOptions) clientOptions).getIdentifier(); + clientIdentifier = CoreUtils.isNullOrEmpty(clientOptionIdentifier) ? UUID.randomUUID().toString() : clientOptionIdentifier; + } else { + clientIdentifier = UUID.randomUUID().toString(); + } + return new ServiceBusSessionReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), entityPath, entityType, receiverOptions, connectionProcessor, tracerProvider, messageSerializer, - ServiceBusClientBuilder.this::onClientClose); + ServiceBusClientBuilder.this::onClientClose, clientIdentifier); } } @@ -1928,9 +1954,17 @@ ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed) { final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount, maxAutoLockRenewDuration, enableAutoComplete); + final String clientIdentifier; + if (clientOptions instanceof AmqpClientOptions) { + String clientOptionIdentifier = ((AmqpClientOptions) clientOptions).getIdentifier(); + clientIdentifier = CoreUtils.isNullOrEmpty(clientOptionIdentifier) ? UUID.randomUUID().toString() : clientOptionIdentifier; + } else { + clientIdentifier = UUID.randomUUID().toString(); + } + return new ServiceBusReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), entityPath, entityType, receiverOptions, connectionProcessor, ServiceBusConstants.OPERATION_TIMEOUT, - tracerProvider, messageSerializer, ServiceBusClientBuilder.this::onClientClose); + tracerProvider, messageSerializer, ServiceBusClientBuilder.this::onClientClose, clientIdentifier); } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java index f84294ce5c1b4..55d2d83e7f553 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java @@ -302,6 +302,22 @@ public String getSubscriptionName() { return this.subscriptionName; } + /** + * Gets the identifier of the instance of {@link ServiceBusProcessorClient}. + * + * @return The identifier that can identify the instance of {@link ServiceBusProcessorClient}. + */ + public synchronized String getIdentifier() { + if (asyncClient.get() == null) { + ServiceBusReceiverAsyncClient newReceiverClient = receiverBuilder == null + ? sessionReceiverBuilder.buildAsyncClientForProcessor() + : receiverBuilder.buildAsyncClient(); + asyncClient.set(newReceiverClient); + } + + return asyncClient.get().getIdentifier(); + } + private synchronized void receiveMessages() { if (receiverSubscriptions.size() > 0) { // For the case of start -> stop -> start again diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java index 62396daf12dd0..7cc782acb59bd 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java @@ -235,6 +235,7 @@ public final class ServiceBusReceiverAsyncClient implements AutoCloseable { private final Runnable onClientClose; private final ServiceBusSessionManager sessionManager; private final Semaphore completionLock = new Semaphore(1); + private final String identifier; // Starting at -1 because that is before the beginning of the stream. private final AtomicLong lastPeekedSequenceNumber = new AtomicLong(-1); @@ -254,7 +255,7 @@ public final class ServiceBusReceiverAsyncClient implements AutoCloseable { */ ServiceBusReceiverAsyncClient(String fullyQualifiedNamespace, String entityPath, MessagingEntityType entityType, ReceiverOptions receiverOptions, ServiceBusConnectionProcessor connectionProcessor, Duration cleanupInterval, - TracerProvider tracerProvider, MessageSerializer messageSerializer, Runnable onClientClose) { + TracerProvider tracerProvider, MessageSerializer messageSerializer, Runnable onClientClose, String identifier) { this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace, "'fullyQualifiedNamespace' cannot be null."); this.entityPath = Objects.requireNonNull(entityPath, "'entityPath' cannot be null."); @@ -275,6 +276,7 @@ public final class ServiceBusReceiverAsyncClient implements AutoCloseable { }); this.sessionManager = null; + this.identifier = identifier; } ServiceBusReceiverAsyncClient(String fullyQualifiedNamespace, String entityPath, MessagingEntityType entityType, @@ -300,6 +302,8 @@ public final class ServiceBusReceiverAsyncClient implements AutoCloseable { .log("Closing expired renewal operation.", renewal.getThrowable()); renewal.close(); }); + + this.identifier = sessionManager.getIdentifier(); } /** @@ -330,6 +334,15 @@ public String getSessionId() { return receiverOptions.getSessionId(); } + /** + * Gets the identifier of the instance of {@link ServiceBusReceiverAsyncClient}. + * + * @return The identifier that can identify the instance of {@link ServiceBusReceiverAsyncClient}. + */ + public String getIdentifier() { + return identifier; + } + /** * Abandons a {@link ServiceBusReceivedMessage message}. This will make the message available again for processing. * Abandoning a message will increase the delivery count on the message. @@ -1450,10 +1463,10 @@ private ServiceBusAsyncConsumer getOrCreateConsumer() { final Mono receiveLinkMono = connectionProcessor.flatMap(connection -> { if (receiverOptions.isSessionReceiver()) { return connection.createReceiveLink(linkName, entityPath, receiverOptions.getReceiveMode(), - null, entityType, receiverOptions.getSessionId()); + null, entityType, identifier, receiverOptions.getSessionId()); } else { return connection.createReceiveLink(linkName, entityPath, receiverOptions.getReceiveMode(), - null, entityType); + null, entityType, identifier); } }).doOnNext(next -> { LOGGER.atVerbose() diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java index 53c115163708f..80bf6f88450c1 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java @@ -107,6 +107,15 @@ public String getSessionId() { return asyncClient.getSessionId(); } + /** + * Gets the identifier of the instance of {@link ServiceBusReceiverClient}. + * + * @return The identifier that can identify the instance of {@link ServiceBusReceiverClient}. + */ + public String getIdentifier() { + return asyncClient.getIdentifier(); + } + /** * Abandons a {@link ServiceBusReceivedMessage message}. This will make the message available again for processing. * Abandoning a message will increase the delivery count on the message. diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java index af77d3dfb3196..71a896dfe0b58 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java @@ -179,13 +179,14 @@ public final class ServiceBusSenderAsyncClient implements AutoCloseable { private final String entityName; private final ServiceBusConnectionProcessor connectionProcessor; private final String viaEntityName; + private final String identifier; /** * Creates a new instance of this {@link ServiceBusSenderAsyncClient} that sends messages to a Service Bus entity. */ ServiceBusSenderAsyncClient(String entityName, MessagingEntityType entityType, ServiceBusConnectionProcessor connectionProcessor, AmqpRetryOptions retryOptions, TracerProvider tracerProvider, - MessageSerializer messageSerializer, Runnable onClientClose, String viaEntityName) { + MessageSerializer messageSerializer, Runnable onClientClose, String viaEntityName, String identifier) { // Caching the created link so we don't invoke another link creation. this.messageSerializer = Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null."); @@ -198,6 +199,7 @@ public final class ServiceBusSenderAsyncClient implements AutoCloseable { this.entityType = entityType; this.viaEntityName = viaEntityName; this.onClientClose = onClientClose; + this.identifier = identifier; } /** @@ -218,6 +220,15 @@ public String getEntityPath() { return entityName; } + /** + * Gets the identifier of the instance of {@link ServiceBusSenderAsyncClient}. + * + * @return The identifier that can identify the instance of {@link ServiceBusSenderAsyncClient}. + */ + public String getIdentifier() { + return identifier; + } + /** * Sends a message to a Service Bus queue or topic. * @@ -824,9 +835,9 @@ private Mono getSendLink() { .flatMap(connection -> { if (!CoreUtils.isNullOrEmpty(viaEntityName)) { return connection.createSendLink("VIA-".concat(viaEntityName), viaEntityName, retryOptions, - entityName); + entityName, identifier); } else { - return connection.createSendLink(entityName, entityName, retryOptions, null); + return connection.createSendLink(entityName, entityName, retryOptions, null, identifier); } }) .doOnNext(next -> linkName.compareAndSet(null, next.getLinkName())); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderClient.java index f1dabfafe7724..0f3c8ecfd740d 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderClient.java @@ -168,6 +168,15 @@ public String getFullyQualifiedNamespace() { return asyncClient.getFullyQualifiedNamespace(); } + /** + * Gets the identifier of the instance of {@link ServiceBusSenderClient}. + * + * @return The identifier that can identify the instance of {@link ServiceBusSenderClient}. + */ + public String getIdentifier() { + return asyncClient.getIdentifier(); + } + /** * Sends a message to a Service Bus queue or topic. * diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java index 0e716b91a11eb..8ff91d60a0708 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java @@ -63,6 +63,7 @@ class ServiceBusSessionManager implements AutoCloseable { private final Duration operationTimeout; private final TracerProvider tracerProvider; private final MessageSerializer messageSerializer; + private final String identifier; private final AtomicBoolean isDisposed = new AtomicBoolean(); private final AtomicBoolean isStarted = new AtomicBoolean(); @@ -81,7 +82,7 @@ class ServiceBusSessionManager implements AutoCloseable { ServiceBusSessionManager(String entityPath, MessagingEntityType entityType, ServiceBusConnectionProcessor connectionProcessor, TracerProvider tracerProvider, - MessageSerializer messageSerializer, ReceiverOptions receiverOptions, ServiceBusReceiveLink receiveLink) { + MessageSerializer messageSerializer, ReceiverOptions receiverOptions, ServiceBusReceiveLink receiveLink, String identifier) { this.entityPath = entityPath; this.entityType = entityType; this.receiverOptions = receiverOptions; @@ -90,6 +91,7 @@ class ServiceBusSessionManager implements AutoCloseable { this.tracerProvider = tracerProvider; this.messageSerializer = messageSerializer; this.maxSessionLockRenewDuration = receiverOptions.getMaxLockRenewDuration(); + this.identifier = identifier; // According to the documentation, if a sequence is not finite, it should be published on their own scheduler. // It's possible that some of these sessions have a lot of messages. @@ -112,9 +114,9 @@ class ServiceBusSessionManager implements AutoCloseable { ServiceBusSessionManager(String entityPath, MessagingEntityType entityType, ServiceBusConnectionProcessor connectionProcessor, TracerProvider tracerProvider, - MessageSerializer messageSerializer, ReceiverOptions receiverOptions) { + MessageSerializer messageSerializer, ReceiverOptions receiverOptions, String identifier) { this(entityPath, entityType, connectionProcessor, tracerProvider, - messageSerializer, receiverOptions, null); + messageSerializer, receiverOptions, null, identifier); } /** @@ -129,6 +131,15 @@ String getLinkName(String sessionId) { return receiver != null ? receiver.getLinkName() : null; } + /** + * Gets the identifier of the instance of {@link ServiceBusSessionManager}. + * + * @return The identifier that can identify the instance of {@link ServiceBusSessionManager}. + */ + public String getIdentifier() { + return this.identifier; + } + /** * Gets the state of a session given its identifier. * @@ -253,7 +264,7 @@ private Mono createSessionReceiveLink() { return connectionProcessor .flatMap(connection -> { return connection.createReceiveLink(linkName, entityPath, receiverOptions.getReceiveMode(), - null, entityType, sessionId); + null, entityType, identifier, sessionId); }); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java index ed58979124be3..9be0ab93b465f 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java @@ -98,11 +98,12 @@ public final class ServiceBusSessionReceiverAsyncClient implements AutoCloseable private final MessageSerializer messageSerializer; private final Runnable onClientClose; private final ServiceBusSessionManager unNamedSessionManager; // for acceptNextSession() + private final String identifier; ServiceBusSessionReceiverAsyncClient(String fullyQualifiedNamespace, String entityPath, MessagingEntityType entityType, ReceiverOptions receiverOptions, ServiceBusConnectionProcessor connectionProcessor, TracerProvider tracerProvider, - MessageSerializer messageSerializer, Runnable onClientClose) { + MessageSerializer messageSerializer, Runnable onClientClose, String identifier) { this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace, "'fullyQualifiedNamespace' cannot be null."); this.entityPath = Objects.requireNonNull(entityPath, "'entityPath' cannot be null."); @@ -113,7 +114,8 @@ public final class ServiceBusSessionReceiverAsyncClient implements AutoCloseable this.messageSerializer = Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null."); this.onClientClose = Objects.requireNonNull(onClientClose, "'onClientClose' cannot be null."); this.unNamedSessionManager = new ServiceBusSessionManager(entityPath, entityType, connectionProcessor, - tracerProvider, messageSerializer, receiverOptions); + tracerProvider, messageSerializer, receiverOptions, identifier); + this.identifier = identifier; } /** @@ -136,7 +138,7 @@ public Mono acceptNextSession() { receiverOptions.isEnableAutoComplete(), sessionId, null); final ServiceBusSessionManager sessionSpecificManager = new ServiceBusSessionManager(entityPath, entityType, connectionProcessor, tracerProvider, messageSerializer, newReceiverOptions, - receiveLink); + receiveLink, identifier); return new ServiceBusReceiverAsyncClient(fullyQualifiedNamespace, entityPath, entityType, newReceiverOptions, connectionProcessor, ServiceBusConstants.OPERATION_TIMEOUT, tracerProvider, messageSerializer, () -> { }, sessionSpecificManager); @@ -170,7 +172,7 @@ public Mono acceptSession(String sessionId) { receiverOptions.getPrefetchCount(), receiverOptions.getMaxLockRenewDuration(), receiverOptions.isEnableAutoComplete(), sessionId, null); final ServiceBusSessionManager sessionSpecificManager = new ServiceBusSessionManager(entityPath, entityType, - connectionProcessor, tracerProvider, messageSerializer, newReceiverOptions); + connectionProcessor, tracerProvider, messageSerializer, newReceiverOptions, identifier); return sessionSpecificManager.getActiveLink().map(receiveLink -> new ServiceBusReceiverAsyncClient( fullyQualifiedNamespace, entityPath, entityType, newReceiverOptions, connectionProcessor, diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusAmqpConnection.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusAmqpConnection.java index 1f1c3ccf31d8a..2e96583a7be77 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusAmqpConnection.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusAmqpConnection.java @@ -27,11 +27,11 @@ public interface ServiceBusAmqpConnection extends AmqpConnection { * @param entityPath The remote address to connect to for the message broker. * @param retryOptions Options to use when creating the link. * @param transferEntityPath Path if the message should be transferred this destination by message broker. - * + * @param clientIdentifier The identifier of the client. * @return A new or existing send link that is connected to the given {@code entityPath}. */ Mono createSendLink(String linkName, String entityPath, AmqpRetryOptions retryOptions, - String transferEntityPath); + String transferEntityPath, String clientIdentifier); /** * Creates or gets an existing receive link. The same link is returned if there is an existing receive link with the @@ -39,14 +39,14 @@ Mono createSendLink(String linkName, String entityPath, AmqpRetryO * * @param linkName The name of the link. * @param entityPath The remote address to connect to for the message broker. + * @param receiveMode {@link ServiceBusReceiveMode} to use when creating the link. * @param transferEntityPath Path if the message should be transferred to another link after being received * from this link. - * @param receiveMode {@link ServiceBusReceiveMode} to use when creating the link. - * + * @param clientIdentifier The identifier of the client. * @return A new or existing receive link that is connected to the given {@code entityPath}. */ Mono createReceiveLink(String linkName, String entityPath, ServiceBusReceiveMode receiveMode, - String transferEntityPath, MessagingEntityType entityType); + String transferEntityPath, MessagingEntityType entityType, String clientIdentifier); /** * Creates or gets an existing receive link for a given sessionId. The same link is returned if there is an @@ -54,14 +54,14 @@ Mono createReceiveLink(String linkName, String entityPath * * @param linkName The name of the link. * @param entityPath The remote address to connect to for the message broker. + * @param receiveMode {@link ServiceBusReceiveMode} to use when creating the link. * @param transferEntityPath Path if the events should be transferred to another link after being received * from this link. - * @param receiveMode {@link ServiceBusReceiveMode} to use when creating the link. - * @param sessionId to use when creating the link. * @param entityType {@link MessagingEntityType} to use when creating the link. - * + * @param clientIdentifier The identifier of the client. + * @param sessionId to use when creating the link. * @return A new or existing receive link that is connected to the given {@code entityPath}. */ Mono createReceiveLink(String linkName, String entityPath, ServiceBusReceiveMode receiveMode, - String transferEntityPath, MessagingEntityType entityType, String sessionId); + String transferEntityPath, MessagingEntityType entityType, String clientIdentifier, String sessionId); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorAmqpConnection.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorAmqpConnection.java index 916af3639d7b5..9ac88fcd091cf 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorAmqpConnection.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorAmqpConnection.java @@ -141,19 +141,20 @@ public Mono getManagementNode(String entityPath, Messa * @param entityPath The remote address to connect to for the message broker. * @param retryOptions Options to use when creating the link. * @param transferEntityPath Path if the message should be transferred this destination by message broker. + * @param clientIdentifier The identifier of the client. * * @return A new or existing send link that is connected to the given {@code entityPath}. */ @Override public Mono createSendLink(String linkName, String entityPath, AmqpRetryOptions retryOptions, - String transferEntityPath) { + String transferEntityPath, String clientIdentifier) { return createSession(linkName).cast(ServiceBusSession.class).flatMap(session -> { LOGGER.atVerbose().addKeyValue(LINK_NAME_KEY, linkName).log("Get or create sender link."); final AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy(retryOptions); return session.createProducer(linkName + entityPath, entityPath, retryOptions.getTryTimeout(), - retryPolicy, transferEntityPath).cast(AmqpSendLink.class); + retryPolicy, transferEntityPath, clientIdentifier).cast(AmqpSendLink.class); }); } @@ -167,19 +168,20 @@ public Mono createSendLink(String linkName, String entityPath, Amq * @param transferEntityPath Path if the events should be transferred to another link after being received * from this link. * @param entityType {@link MessagingEntityType} to use when creating the link. + * @param clientIdentifier The identifier of the client. * * @return A new or existing receive link that is connected to the given {@code entityPath}. */ @Override public Mono createReceiveLink(String linkName, String entityPath, - ServiceBusReceiveMode receiveMode, String transferEntityPath, MessagingEntityType entityType) { + ServiceBusReceiveMode receiveMode, String transferEntityPath, MessagingEntityType entityType, String clientIdentifier) { return createSession(entityPath).cast(ServiceBusSession.class) .flatMap(session -> { LOGGER.atVerbose().addKeyValue(ENTITY_PATH_KEY, entityPath).log("Get or create consumer."); final AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy(retryOptions); return session.createConsumer(linkName, entityPath, entityType, retryOptions.getTryTimeout(), - retryPolicy, receiveMode); + retryPolicy, receiveMode, clientIdentifier); }); } @@ -197,21 +199,21 @@ public Mono createSession(String sessionName) { * @param receiveMode Consumer options to use when creating the link. * @param transferEntityPath to use when creating the link. * @param entityType {@link MessagingEntityType} to use when creating the link. + * @param clientIdentifier The identifier of the client. * @param sessionId to use when creating the link. * * @return A new or existing receive link that is connected to the given {@code entityPath}. */ @Override - public Mono createReceiveLink(String linkName, String entityPath, - ServiceBusReceiveMode receiveMode, String transferEntityPath, MessagingEntityType entityType, - String sessionId) { + public Mono createReceiveLink(String linkName, String entityPath, ServiceBusReceiveMode receiveMode, + String transferEntityPath, MessagingEntityType entityType, String clientIdentifier, String sessionId) { return createSession(entityPath).cast(ServiceBusSession.class) .flatMap(session -> { LOGGER.atVerbose().addKeyValue(ENTITY_PATH_KEY, entityPath).log("Get or create consumer."); final AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy(retryOptions); return session.createConsumer(linkName, entityPath, entityType, retryOptions.getTryTimeout(), - retryPolicy, receiveMode, sessionId); + retryPolicy, receiveMode, clientIdentifier, sessionId); }); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSession.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSession.java index 82e7652ffceb3..ca169c82094d9 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSession.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSession.java @@ -35,6 +35,8 @@ import java.util.Map; import java.util.Objects; +import static com.azure.core.amqp.implementation.AmqpConstants.CLIENT_IDENTIFIER; +import static com.azure.core.amqp.implementation.AmqpConstants.CLIENT_RECEIVER_IDENTIFIER; import static com.azure.core.amqp.implementation.ClientConstants.ENTITY_PATH_KEY; import static com.azure.core.amqp.implementation.ClientConstants.LINK_NAME_KEY; import static com.azure.messaging.servicebus.implementation.MessageUtils.adjustServerTimeout; @@ -90,26 +92,27 @@ class ServiceBusReactorSession extends ReactorSession implements ServiceBusSessi @Override public Mono createConsumer(String linkName, String entityPath, - MessagingEntityType entityType, Duration timeout, AmqpRetryPolicy retry, ServiceBusReceiveMode receiveMode) { + MessagingEntityType entityType, Duration timeout, AmqpRetryPolicy retry, ServiceBusReceiveMode receiveMode, + String clientIdentifier) { final Map filter = new HashMap<>(); - return createConsumer(linkName, entityPath, entityType, timeout, retry, receiveMode, filter); + return createConsumer(linkName, entityPath, entityType, timeout, retry, receiveMode, filter, clientIdentifier); } @Override public Mono createConsumer(String linkName, String entityPath, MessagingEntityType entityType, Duration timeout, AmqpRetryPolicy retry, ServiceBusReceiveMode receiveMode, - String sessionId) { + String clientIdentifier, String sessionId) { final Map filter = new HashMap<>(); filter.put(SESSION_FILTER, sessionId); - return createConsumer(linkName, entityPath, entityType, timeout, retry, receiveMode, filter); + return createConsumer(linkName, entityPath, entityType, timeout, retry, receiveMode, filter, clientIdentifier); } @Override public Mono createProducer(String linkName, String entityPath, Duration timeout, - AmqpRetryPolicy retry, String transferEntityPath) { + AmqpRetryPolicy retry, String transferEntityPath, String clientIdentifier) { Objects.requireNonNull(entityPath, "'entityPath' cannot be null."); Objects.requireNonNull(timeout, "'timeout' cannot be null."); Objects.requireNonNull(retry, "'retry' cannot be null."); @@ -118,7 +121,7 @@ public Mono createProducer(String linkName, String entityPath, Duratio Map linkProperties = new HashMap<>(); linkProperties.put(LINK_TIMEOUT_PROPERTY, UnsignedInteger.valueOf(serverTimeout.toMillis())); - + linkProperties.put(CLIENT_IDENTIFIER, clientIdentifier); if (!CoreUtils.isNullOrEmpty(transferEntityPath)) { linkProperties.put(LINK_TRANSFER_DESTINATION_PROPERTY, transferEntityPath); @@ -169,7 +172,7 @@ protected ReactorReceiver createConsumer(String entityPath, Receiver receiver, private Mono createConsumer(String linkName, String entityPath, MessagingEntityType entityType, Duration timeout, AmqpRetryPolicy retry, ServiceBusReceiveMode receiveMode, - Map filter) { + Map filter, String clientIdentifier) { Objects.requireNonNull(linkName, "'linkName' cannot be null."); Objects.requireNonNull(entityPath, "'entityPath' cannot be null."); Objects.requireNonNull(timeout, "'timeout' cannot be null."); @@ -179,6 +182,7 @@ private Mono createConsumer(String linkName, String entit final Map linkProperties = new HashMap<>(); final Duration serverTimeout = adjustServerTimeout(timeout); linkProperties.put(LINK_TIMEOUT_PROPERTY, UnsignedInteger.valueOf(serverTimeout.toMillis())); + linkProperties.put(CLIENT_RECEIVER_IDENTIFIER, clientIdentifier); if (entityType != null) { linkProperties.put(ENTITY_TYPE_PROPERTY, entityType.getValue()); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusSession.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusSession.java index 14bbd62bbff26..94649e59fa08c 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusSession.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusSession.java @@ -29,11 +29,12 @@ public interface ServiceBusSession extends AmqpSession { * @param timeout Timeout required for creating and opening an AMQP link. * @param retryPolicy The retry policy to use when consuming messages. * @param receiveMode The {@link ServiceBusReceiveMode} for the messages to be received. + * @param clientIdentifier The identifier of the client. * * @return A newly created AMQP link. */ Mono createConsumer(String linkName, String entityPath, MessagingEntityType entityType, - Duration timeout, AmqpRetryPolicy retryPolicy, ServiceBusReceiveMode receiveMode); + Duration timeout, AmqpRetryPolicy retryPolicy, ServiceBusReceiveMode receiveMode, String clientIdentifier); /** * Creates a new AMQP link that consumes events from the message broker. @@ -43,13 +44,15 @@ Mono createConsumer(String linkName, String entityPath, M * @param timeout Timeout required for creating and opening an AMQP link. * @param retryPolicy The retry policy to use when consuming messages. * @param receiveMode The {@link ServiceBusReceiveMode} for the messages to be received. + * @param clientIdentifier The identifier of the client. * @param sessionId The sessionId for the messages to be received. If {@code null}, then the next, unnamed session * is retrieved. * * @return A newly created AMQP link. */ Mono createConsumer(String linkName, String entityPath, MessagingEntityType entityType, - Duration timeout, AmqpRetryPolicy retryPolicy, ServiceBusReceiveMode receiveMode, String sessionId); + Duration timeout, AmqpRetryPolicy retryPolicy, ServiceBusReceiveMode receiveMode, String clientIdentifier, + String sessionId); /** * Creates a new {@link AmqpLink} that can send events to the message broker. @@ -60,9 +63,10 @@ Mono createConsumer(String linkName, String entityPath, M * @param retryPolicy The retry policy to use when sending events. * @param transferEntityPath The entity path this link connects to, so that it may transfer events to * the message broker via this entity. + * @param clientIdentifier The identifier of the client. * * @return A newly created AMQP link. */ - Mono createProducer(String linkName, String entityPath, Duration timeout, - AmqpRetryPolicy retryPolicy, String transferEntityPath); + Mono createProducer(String linkName, String entityPath, Duration timeout, AmqpRetryPolicy retryPolicy, + String transferEntityPath, String clientIdentifier); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderTest.java index d08defd815fba..0c0b084a08a45 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderTest.java @@ -10,6 +10,7 @@ import com.azure.core.credential.AzureNamedKeyCredential; import com.azure.core.credential.AzureSasCredential; import com.azure.core.util.Configuration; +import com.azure.core.util.CoreUtils; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusReceiverClientBuilder; import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusSenderClientBuilder; @@ -65,6 +66,18 @@ class ServiceBusClientBuilderTest extends IntegrationTestBase { super(new ClientLogger(ServiceBusClientBuilderTest.class)); } + @Test + void ensureIdentifierString() { + final ServiceBusClientBuilder builder = new ServiceBusClientBuilder(); + final ServiceBusSenderAsyncClient client = builder + .connectionString(NAMESPACE_CONNECTION_STRING) + .sender() + .queueName(QUEUE_NAME) + .buildAsyncClient(); + + Assertions.assertFalse(CoreUtils.isNullOrEmpty(client.getIdentifier())); + } + @Test void deadLetterqueueClient() { // Arrange diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java index 0de7b598e27b9..eab4c030b00f2 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java @@ -108,6 +108,7 @@ class ServiceBusReceiverAsyncClientTest { NAMESPACE, "some-name", "something-else"); private static final Duration CLEANUP_INTERVAL = Duration.ofSeconds(10); private static final String SESSION_ID = "my-session-id"; + private static final String CLIENT_IDENTIFIER = "my-client-identifier"; private static final ClientLogger LOGGER = new ClientLogger(ServiceBusReceiverAsyncClientTest.class); private final String messageTrackingUUID = UUID.randomUUID().toString(); @@ -180,9 +181,9 @@ AmqpTransportType.AMQP, new AmqpRetryOptions(), ProxyOptions.SYSTEM_DEFAULTS, Sc .thenReturn(Mono.just(managementNode)); when(connection.createReceiveLink(anyString(), anyString(), any(ServiceBusReceiveMode.class), any(), - any(MessagingEntityType.class))).thenReturn(Mono.just(amqpReceiveLink)); + any(MessagingEntityType.class), anyString())).thenReturn(Mono.just(amqpReceiveLink)); when(connection.createReceiveLink(anyString(), anyString(), any(ServiceBusReceiveMode.class), any(), - any(MessagingEntityType.class), anyString())).thenReturn(Mono.just(sessionReceiveLink)); + any(MessagingEntityType.class), anyString(), anyString())).thenReturn(Mono.just(sessionReceiveLink)); connectionProcessor = Flux.create(sink -> sink.next(connection)) @@ -191,12 +192,12 @@ AmqpTransportType.AMQP, new AmqpRetryOptions(), ProxyOptions.SYSTEM_DEFAULTS, Sc receiver = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, false), - connectionProcessor, CLEANUP_INTERVAL, tracerProvider, messageSerializer, onClientClose); + connectionProcessor, CLEANUP_INTERVAL, tracerProvider, messageSerializer, onClientClose, CLIENT_IDENTIFIER); sessionReceiver = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, false, SESSION_ID, null), - connectionProcessor, CLEANUP_INTERVAL, tracerProvider, messageSerializer, onClientClose); + connectionProcessor, CLEANUP_INTERVAL, tracerProvider, messageSerializer, onClientClose, CLIENT_IDENTIFIER); } @AfterEach @@ -208,6 +209,16 @@ void teardown(TestInfo testInfo) throws Exception { Mockito.framework().clearInlineMock(this); } + /** + * Verifies that the correct Service Bus properties are set. + */ + @Test + void properties() { + Assertions.assertEquals(ENTITY_PATH, receiver.getEntityPath()); + Assertions.assertEquals(NAMESPACE, receiver.getFullyQualifiedNamespace()); + Assertions.assertEquals(CLIENT_IDENTIFIER, receiver.getIdentifier()); + } + /** * Verifies that when user calls peek more than one time, It returns different object. */ @@ -316,7 +327,7 @@ void receivesMessageLockRenewSessionOnly() { ServiceBusReceiverAsyncClient mySessionReceiver = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, maxLockRenewDuration, false, SESSION_ID, null), connectionProcessor, - CLEANUP_INTERVAL, tracerProvider, messageSerializer, onClientClose); + CLEANUP_INTERVAL, tracerProvider, messageSerializer, onClientClose, CLIENT_IDENTIFIER); // This needs to be used with "try with resource" : https://javadoc.io/static/org.mockito/mockito-core/3.9.0/org/mockito/Mockito.html#static_mocks try ( @@ -409,7 +420,7 @@ void completeInReceiveAndDeleteMode() { final ReceiverOptions options = new ReceiverOptions(ServiceBusReceiveMode.RECEIVE_AND_DELETE, PREFETCH, null, false); ServiceBusReceiverAsyncClient client = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, options, connectionProcessor, CLEANUP_INTERVAL, tracerProvider, - messageSerializer, onClientClose); + messageSerializer, onClientClose, CLIENT_IDENTIFIER); final String lockToken1 = UUID.randomUUID().toString(); @@ -429,7 +440,7 @@ void throwsExceptionAboutSettlingPeekedMessagesWithNullLockToken() { final ReceiverOptions options = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, false); ServiceBusReceiverAsyncClient client = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, options, connectionProcessor, CLEANUP_INTERVAL, tracerProvider, - messageSerializer, onClientClose); + messageSerializer, onClientClose, CLIENT_IDENTIFIER); when(receivedMessage.getLockToken()).thenReturn(null); @@ -547,7 +558,7 @@ void errorSourceOnRenewMessageLock() { final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, true); final ServiceBusReceiverAsyncClient receiver2 = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, receiverOptions, connectionProcessor, CLEANUP_INTERVAL, tracerProvider, - messageSerializer, onClientClose); + messageSerializer, onClientClose, CLIENT_IDENTIFIER); // Act & Assert StepVerifier.create(receiver2.renewMessageLock(receivedMessage, maxDuration)) @@ -571,7 +582,7 @@ void errorSourceOnSessionLock() { final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, true, SESSION_ID, null); final ServiceBusReceiverAsyncClient sessionReceiver2 = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, - receiverOptions, connectionProcessor, CLEANUP_INTERVAL, tracerProvider, messageSerializer, onClientClose); + receiverOptions, connectionProcessor, CLEANUP_INTERVAL, tracerProvider, messageSerializer, onClientClose, CLIENT_IDENTIFIER); // Act & Assert StepVerifier.create(sessionReceiver2.renewSessionLock(SESSION_ID)) @@ -644,7 +655,7 @@ void errorSourceAutoCompleteMessage() { final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, true); final ServiceBusReceiverAsyncClient receiver2 = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, receiverOptions, connectionProcessor, CLEANUP_INTERVAL, tracerProvider, - messageSerializer, onClientClose); + messageSerializer, onClientClose, CLIENT_IDENTIFIER); when(receivedMessage.getLockToken()).thenReturn(lockToken); when(messageSerializer.deserialize(any(Message.class), eq(ServiceBusReceivedMessage.class))) @@ -692,10 +703,10 @@ void errorSourceOnReceiveMessage() { final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, true); final ServiceBusReceiverAsyncClient receiver2 = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, receiverOptions, connectionProcessor, CLEANUP_INTERVAL, tracerProvider, - messageSerializer, onClientClose); + messageSerializer, onClientClose, CLIENT_IDENTIFIER); when(connection.createReceiveLink(anyString(), anyString(), any(ServiceBusReceiveMode.class), any(), - any(MessagingEntityType.class))).thenReturn(Mono.error(new AzureException("some receive link Error."))); + any(MessagingEntityType.class), anyString())).thenReturn(Mono.error(new AzureException("some receive link Error."))); // Act & Assert StepVerifier.create(receiver2.receiveMessages().take(1)) @@ -1228,7 +1239,7 @@ void autoCompleteMessage() { final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, true); final ServiceBusReceiverAsyncClient receiver2 = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, receiverOptions, connectionProcessor, CLEANUP_INTERVAL, tracerProvider, - messageSerializer, onClientClose); + messageSerializer, onClientClose, CLIENT_IDENTIFIER); when(receivedMessage.getLockToken()).thenReturn(lockToken); when(messageSerializer.deserialize(any(Message.class), eq(ServiceBusReceivedMessage.class))) @@ -1262,7 +1273,7 @@ void autoCompleteMessageSessionReceiver() { true, SESSION_ID, null); final ServiceBusReceiverAsyncClient sessionReceiver2 = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, receiverOptions, connectionProcessor, CLEANUP_INTERVAL, tracerProvider, - messageSerializer, onClientClose); + messageSerializer, onClientClose, CLIENT_IDENTIFIER); final ServiceBusReceivedMessage receivedMessage3 = mock(ServiceBusReceivedMessage.class); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java index 8bd16da14e54d..bf5a1eb02b7ce 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java @@ -55,6 +55,7 @@ class ServiceBusReceiverClientTest { private static final String ENTITY_PATH = "test-entity-path"; private static final String LOCK_TOKEN = UUID.randomUUID().toString(); private static final String SESSION_ID = "test-session-id"; + private static final String CLIENT_IDENTIFIER = "my-client-identifier"; private static final Duration OPERATION_TIMEOUT = Duration.ofSeconds(5); @@ -81,6 +82,7 @@ void setup() { when(asyncClient.getEntityPath()).thenReturn(ENTITY_PATH); when(asyncClient.getFullyQualifiedNamespace()).thenReturn(NAMESPACE); when(asyncClient.getReceiverOptions()).thenReturn(new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 0, null, false)); + when(asyncClient.getIdentifier()).thenReturn(CLIENT_IDENTIFIER); when(sessionReceiverOptions.getSessionId()).thenReturn(SESSION_ID); client = new ServiceBusReceiverClient(asyncClient, false, OPERATION_TIMEOUT); } @@ -100,6 +102,7 @@ void nullConstructor() { void properties() { assertEquals(NAMESPACE, client.getFullyQualifiedNamespace()); assertEquals(ENTITY_PATH, client.getEntityPath()); + assertEquals(CLIENT_IDENTIFIER, client.getIdentifier()); } @Test diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java index 1939a3c95d043..07b2578093557 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java @@ -96,7 +96,8 @@ class ServiceBusSenderAsyncClientTest { private static final String ENTITY_NAME = "my-servicebus-entity"; private static final String LINK_NAME = "my-link-name"; private static final BinaryData TEST_CONTENTS = BinaryData.fromString("My message for service bus queue!"); - private static final String TXN_ID_STRING = "1"; + private static final String TXN_ID_STRING = "1"; + private static final String CLIENT_IDENTIFIER = "my-client-identifier"; @Mock private AmqpSendLink sendLink; @@ -169,7 +170,7 @@ void setup() { connectionOptions.getRetry())); sender = new ServiceBusSenderAsyncClient(ENTITY_NAME, MessagingEntityType.QUEUE, connectionProcessor, - retryOptions, tracerProvider, serializer, onClientClose, null); + retryOptions, tracerProvider, serializer, onClientClose, null, CLIENT_IDENTIFIER); when(connection.getManagementNode(anyString(), any(MessagingEntityType.class))) .thenReturn(just(managementNode)); @@ -196,6 +197,7 @@ void teardown() { void verifyProperties() { Assertions.assertEquals(ENTITY_NAME, sender.getEntityPath()); Assertions.assertEquals(NAMESPACE, sender.getFullyQualifiedNamespace()); + Assertions.assertEquals(CLIENT_IDENTIFIER, sender.getIdentifier()); } /** @@ -213,7 +215,7 @@ void createBatchNull() { @Test void createBatchDefault() { // Arrange - when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull())) + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), eq(CLIENT_IDENTIFIER))) .thenReturn(Mono.just(sendLink)); when(sendLink.getLinkSize()).thenReturn(Mono.just(MAX_MESSAGE_LENGTH_BYTES)); @@ -238,7 +240,7 @@ void createBatchWhenSizeTooBig() { final AmqpSendLink link = mock(AmqpSendLink.class); when(link.getLinkSize()).thenReturn(Mono.just(maxLinkSize)); - when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull())) + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull(), eq(CLIENT_IDENTIFIER))) .thenReturn(Mono.just(link)); // This event is 1024 bytes when serialized. @@ -267,7 +269,7 @@ void createsMessageBatchWithSize() { when(link.getLinkSize()).thenReturn(Mono.just(maxLinkSize)); // EC is the prefix they use when creating a link that sends to the service round-robin. - when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull())) + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull(), eq(CLIENT_IDENTIFIER))) .thenReturn(Mono.just(link)); // This is 1024 bytes when serialized. @@ -304,7 +306,7 @@ void scheduleMessageSizeTooBig() { final AmqpSendLink link = mock(AmqpSendLink.class); when(link.getLinkSize()).thenReturn(Mono.just(maxLinkSize)); - when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull())) + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), eq(CLIENT_IDENTIFIER))) .thenReturn(Mono.just(link)); when(link.getLinkSize()).thenReturn(Mono.just(maxLinkSize)); @@ -332,7 +334,7 @@ void sendMultipleMessagesWithTransaction() { Assertions.assertTrue(batch.tryAddMessage(message)); }); - when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull())) + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull(), eq(CLIENT_IDENTIFIER))) .thenReturn(Mono.just(sendLink)); when(sendLink.send(any(Message.class), any(DeliveryState.class))).thenReturn(Mono.empty()); when(sendLink.send(anyList(), any(DeliveryState.class))).thenReturn(Mono.empty()); @@ -371,7 +373,7 @@ void sendMultipleMessages() { Assertions.assertTrue(batch.tryAddMessage(message)); }); - when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull())) + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull(), eq(CLIENT_IDENTIFIER))) .thenReturn(Mono.just(sendLink)); when(sendLink.send(anyList())).thenReturn(Mono.empty()); // Act @@ -401,9 +403,9 @@ void sendMultipleMessagesTracerSpans() { final ServiceBusMessageBatch batch = new ServiceBusMessageBatch(256 * 1024, errorContextProvider, tracerProvider1, serializer, null, null); sender = new ServiceBusSenderAsyncClient(ENTITY_NAME, MessagingEntityType.QUEUE, connectionProcessor, - retryOptions, tracerProvider1, serializer, onClientClose, null); + retryOptions, tracerProvider1, serializer, onClientClose, null, CLIENT_IDENTIFIER); - when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull())) + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull(), eq(CLIENT_IDENTIFIER))) .thenReturn(Mono.just(sendLink)); when(sendLink.send(anyList())).thenReturn(Mono.empty()); when(tracer1.start(eq(AZ_TRACING_SERVICE_NAME + "send"), any(Context.class), eq(ProcessKind.SEND))) @@ -460,7 +462,7 @@ void sendMessagesListWithTransaction() { final int count = 4; final List messages = TestUtils.getServiceBusMessages(count, UUID.randomUUID().toString()); - when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull())) + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull(), eq(CLIENT_IDENTIFIER))) .thenReturn(Mono.just(sendLink)); when(sendLink.send(any(Message.class), any(DeliveryState.class))).thenReturn(Mono.empty()); when(sendLink.send(anyList(), any(DeliveryState.class))).thenReturn(Mono.empty()); @@ -492,7 +494,7 @@ void sendMessagesList() { final int count = 4; final List messages = TestUtils.getServiceBusMessages(count, UUID.randomUUID().toString()); - when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull())) + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull(), eq(CLIENT_IDENTIFIER))) .thenReturn(Mono.just(sendLink)); when(sendLink.send(anyList())).thenReturn(Mono.empty()); @@ -519,7 +521,7 @@ void sendMessagesListExceedSize() { final Mono linkMaxSize = Mono.just(1); final List messages = TestUtils.getServiceBusMessages(count, UUID.randomUUID().toString()); - when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull())) + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull(), eq(CLIENT_IDENTIFIER))) .thenReturn(Mono.just(sendLink)); when(sendLink.getLinkSize()).thenReturn(linkMaxSize); @@ -536,7 +538,7 @@ void sendSingleMessageThatExceedsSize() { // arrange ServiceBusMessage message = TestUtils.getServiceBusMessages(1, UUID.randomUUID().toString()).get(0); - when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull())) + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull(), eq(CLIENT_IDENTIFIER))) .thenReturn(Mono.just(sendLink)); when(sendLink.getLinkSize()).thenReturn(Mono.just(1)); @@ -556,7 +558,7 @@ void sendSingleMessageWithTransaction() { // Arrange final ServiceBusMessage testData = new ServiceBusMessage(TEST_CONTENTS); - when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull())) + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull(), eq(CLIENT_IDENTIFIER))) .thenReturn(Mono.just(sendLink)); when(sendLink.getLinkSize()).thenReturn(Mono.just(MAX_MESSAGE_LENGTH_BYTES)); @@ -590,7 +592,7 @@ void sendSingleMessage() { new ServiceBusMessage(TEST_CONTENTS); // EC is the prefix they use when creating a link that sends to the service round-robin. - when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull())) + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull(), eq(CLIENT_IDENTIFIER))) .thenReturn(Mono.just(sendLink)); when(sendLink.getLinkSize()).thenReturn(Mono.just(MAX_MESSAGE_LENGTH_BYTES)); @@ -614,7 +616,7 @@ void scheduleMessage() { long sequenceNumberReturned = 10; OffsetDateTime instant = mock(OffsetDateTime.class); - when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull())) + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), eq(CLIENT_IDENTIFIER))) .thenReturn(Mono.just(sendLink)); when(sendLink.getLinkSize()).thenReturn(Mono.just(MAX_MESSAGE_LENGTH_BYTES)); when(managementNode.schedule(anyList(), eq(instant), any(Integer.class), any(), isNull())) @@ -637,7 +639,7 @@ void scheduleMessageWithTransaction() { // Arrange final long sequenceNumberReturned = 10; final OffsetDateTime instant = mock(OffsetDateTime.class); - when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull())) + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), eq(CLIENT_IDENTIFIER))) .thenReturn(Mono.just(sendLink)); when(sendLink.getLinkSize()).thenReturn(Mono.just(MAX_MESSAGE_LENGTH_BYTES)); when(managementNode.schedule(anyList(), eq(instant), eq(MAX_MESSAGE_LENGTH_BYTES), eq(LINK_NAME), argThat(e -> e.getTransactionId().equals(transactionContext.getTransactionId())))) @@ -721,7 +723,7 @@ void verifyMessageOrdering() { messages.add(fourthMessage); messages.add(fifthMessage); - when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull())) + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull(), eq(CLIENT_IDENTIFIER))) .thenReturn(Mono.just(sendLink)); when(sendLink.send(anyList())).thenReturn(Mono.empty()); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderClientTest.java index 80d5c5d46e9f2..b2e4d2e62d454 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderClientTest.java @@ -34,6 +34,7 @@ public class ServiceBusSenderClientTest { private static final String NAMESPACE = "my-namespace"; private static final String ENTITY_NAME = "my-servicebus-entity"; + private static final String CLIENT_IDENTIFIER = "my-client-identifier"; @Mock private ServiceBusSenderAsyncClient asyncSender; @@ -61,6 +62,7 @@ void setup() { MockitoAnnotations.initMocks(this); when(asyncSender.getEntityPath()).thenReturn(ENTITY_NAME); when(asyncSender.getFullyQualifiedNamespace()).thenReturn(NAMESPACE); + when(asyncSender.getIdentifier()).thenReturn(CLIENT_IDENTIFIER); sender = new ServiceBusSenderClient(asyncSender, RETRY_TIMEOUT); } @@ -74,6 +76,7 @@ void teardown() { void verifyProperties() { Assertions.assertEquals(ENTITY_NAME, sender.getEntityPath()); Assertions.assertEquals(NAMESPACE, sender.getFullyQualifiedNamespace()); + Assertions.assertEquals(CLIENT_IDENTIFIER, sender.getIdentifier()); } /** diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerTest.java index 3ef53b50ff2a6..9ad4ddfc664db 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerTest.java @@ -83,6 +83,7 @@ class ServiceBusSessionManagerTest { private static final String NAMESPACE = "my-namespace-foo.net"; private static final String ENTITY_PATH = "queue-name"; private static final MessagingEntityType ENTITY_TYPE = MessagingEntityType.QUEUE; + private static final String CLIENT_IDENTIFIER = "my-client-identifier"; private static final ClientLogger LOGGER = new ClientLogger(ServiceBusReceiverAsyncClientTest.class); private final ReplayProcessor endpointProcessor = ReplayProcessor.cacheLast(); @@ -173,12 +174,23 @@ void afterEach(TestInfo testInfo) throws Exception { } } + @Test + void properties() { + // Arrange + ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, false, null, 5); + sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor, + tracerProvider, messageSerializer, receiverOptions, CLIENT_IDENTIFIER); + + // Act & Assert + assertEquals(CLIENT_IDENTIFIER, sessionManager.getIdentifier()); + } + @Test void receiveNull() { // Arrange ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, false, null, 5); sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor, - tracerProvider, messageSerializer, receiverOptions); + tracerProvider, messageSerializer, receiverOptions, CLIENT_IDENTIFIER); // Act & Assert StepVerifier.create(sessionManager.receive()) @@ -195,7 +207,7 @@ void singleUnnamedSession() { ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, false, null, 5); sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor, - tracerProvider, messageSerializer, receiverOptions); + tracerProvider, messageSerializer, receiverOptions, CLIENT_IDENTIFIER); final String sessionId = "session-1"; final String lockToken = "a-lock-token"; @@ -220,7 +232,7 @@ void singleUnnamedSession() { when(amqpReceiveLink.closeAsync()).thenReturn(Mono.empty()); when(connection.createReceiveLink(anyString(), eq(ENTITY_PATH), any(ServiceBusReceiveMode.class), isNull(), - any(MessagingEntityType.class), isNull())).thenReturn(Mono.just(amqpReceiveLink)); + any(MessagingEntityType.class), eq(CLIENT_IDENTIFIER), isNull())).thenReturn(Mono.just(amqpReceiveLink)); when(managementNode.renewSessionLock(sessionId, linkName)).thenReturn( Mono.fromCallable(() -> OffsetDateTime.now().plus(Duration.ofSeconds(5)))); @@ -250,7 +262,7 @@ void singleUnnamedSessionLockRenew() { ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, false, null, 1); sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor, - tracerProvider, messageSerializer, receiverOptions); + tracerProvider, messageSerializer, receiverOptions, CLIENT_IDENTIFIER); final String sessionId = "session-1"; final String lockToken = "a-lock-token"; @@ -275,7 +287,7 @@ void singleUnnamedSessionLockRenew() { when(amqpReceiveLink.closeAsync()).thenReturn(Mono.empty()); when(connection.createReceiveLink(anyString(), eq(ENTITY_PATH), any(ServiceBusReceiveMode.class), isNull(), - any(MessagingEntityType.class), isNull())).thenReturn(Mono.just(amqpReceiveLink)); + any(MessagingEntityType.class), eq(CLIENT_IDENTIFIER), isNull())).thenReturn(Mono.just(amqpReceiveLink)); AtomicBoolean sessionLockRenewCalled = new AtomicBoolean(); when(managementNode.renewSessionLock(sessionId, linkName)).thenReturn( @@ -308,7 +320,7 @@ void multipleSessions() { final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, true, null, 5); sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor, - tracerProvider, messageSerializer, receiverOptions); + tracerProvider, messageSerializer, receiverOptions, CLIENT_IDENTIFIER); final int numberOfMessages = 5; final Callable onRenewal = () -> OffsetDateTime.now().plus(Duration.ofSeconds(5)); @@ -356,7 +368,7 @@ void multipleSessions() { final AtomicInteger count = new AtomicInteger(); when(connection.createReceiveLink(anyString(), eq(ENTITY_PATH), any(ServiceBusReceiveMode.class), isNull(), - any(MessagingEntityType.class), isNull())).thenAnswer(invocation -> { + any(MessagingEntityType.class), eq(CLIENT_IDENTIFIER), isNull())).thenAnswer(invocation -> { final int number = count.getAndIncrement(); switch (number) { case 0: @@ -437,7 +449,7 @@ void multipleReceiveUnnamedSession() { null, 1); sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor, - tracerProvider, messageSerializer, receiverOptions); + tracerProvider, messageSerializer, receiverOptions, CLIENT_IDENTIFIER); final String sessionId = "session-1"; final String linkName = "my-link-name"; @@ -467,7 +479,7 @@ void multipleReceiveUnnamedSession() { final AtomicInteger count = new AtomicInteger(); when(connection.createReceiveLink(anyString(), eq(ENTITY_PATH), any(ServiceBusReceiveMode.class), isNull(), - any(MessagingEntityType.class), isNull())).thenAnswer(invocation -> { + any(MessagingEntityType.class), eq(CLIENT_IDENTIFIER), isNull())).thenAnswer(invocation -> { final int number = count.getAndIncrement(); switch (number) { case 0: @@ -492,7 +504,7 @@ void multipleReceiveUnnamedSession() { verify(connection, times(2)).createReceiveLink(linkNameCaptor.capture(), eq(ENTITY_PATH), any( ServiceBusReceiveMode.class), isNull(), - any(MessagingEntityType.class), isNull()); + any(MessagingEntityType.class), eq(CLIENT_IDENTIFIER), isNull()); final List actualLinksCreated = linkNameCaptor.getAllValues(); assertNotNull(actualLinksCreated); @@ -509,7 +521,7 @@ void singleUnnamedSessionCleanupAfterTimeout() { ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, false, null, 2); sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor, - tracerProvider, messageSerializer, receiverOptions); + tracerProvider, messageSerializer, receiverOptions, CLIENT_IDENTIFIER); final String sessionId = "session-1"; final String lockToken = "a-lock-token"; @@ -528,7 +540,7 @@ void singleUnnamedSessionCleanupAfterTimeout() { when(amqpReceiveLink.getSessionLockedUntil()) .thenAnswer(invocation -> Mono.just(sessionLockedUntil)); when(connection.createReceiveLink(anyString(), eq(ENTITY_PATH), any(ServiceBusReceiveMode.class), isNull(), - any(MessagingEntityType.class), isNull())).thenReturn(Mono.just(amqpReceiveLink)); + any(MessagingEntityType.class), eq(CLIENT_IDENTIFIER), isNull())).thenReturn(Mono.just(amqpReceiveLink)); when(amqpReceiveLink.addCredits(anyInt())).thenReturn(Mono.empty()); when(amqpReceiveLink.closeAsync()).thenReturn(Mono.empty()); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClientTest.java index c400f893ae6e9..29887381876e8 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClientTest.java @@ -66,6 +66,7 @@ class ServiceBusSessionReceiverAsyncClientTest { private static final String NAMESPACE = "my-namespace-foo.net"; private static final String ENTITY_PATH = "queue-name"; private static final MessagingEntityType ENTITY_TYPE = MessagingEntityType.QUEUE; + private static final String CLIENT_IDENTIFIER = "my-client-identifier"; private static final ClientLogger LOGGER = new ClientLogger(ServiceBusReceiverAsyncClientTest.class); private final ReplayProcessor endpointProcessor = ReplayProcessor.cacheLast(); @@ -174,13 +175,13 @@ void acceptSession() { when(amqpReceiveLink.updateDisposition(lockToken, Accepted.getInstance())).thenReturn(Mono.empty()); when(connection.createReceiveLink(anyString(), eq(ENTITY_PATH), any(ServiceBusReceiveMode.class), isNull(), - any(MessagingEntityType.class), eq(sessionId))).thenReturn(Mono.just(amqpReceiveLink)); + any(MessagingEntityType.class), eq(CLIENT_IDENTIFIER), eq(sessionId))).thenReturn(Mono.just(amqpReceiveLink)); ServiceBusSessionReceiverAsyncClient client = new ServiceBusSessionReceiverAsyncClient( NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, receiverOptions, connectionProcessor, tracerProvider, - messageSerializer, () -> { } + messageSerializer, () -> { }, CLIENT_IDENTIFIER ); // Act & Assert @@ -204,7 +205,7 @@ void acceptNextSession() { // Arrange ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, Duration.ZERO, false, null, null); sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor, - tracerProvider, messageSerializer, receiverOptions); + tracerProvider, messageSerializer, receiverOptions, CLIENT_IDENTIFIER); final int numberOfMessages = 5; final Callable onRenewal = () -> OffsetDateTime.now().plus(Duration.ofSeconds(5)); @@ -250,7 +251,7 @@ void acceptNextSession() { final AtomicInteger count = new AtomicInteger(); when(connection.createReceiveLink(anyString(), eq(ENTITY_PATH), any(ServiceBusReceiveMode.class), isNull(), - any(MessagingEntityType.class), isNull())).thenAnswer(invocation -> { + any(MessagingEntityType.class), eq(CLIENT_IDENTIFIER), isNull())).thenAnswer(invocation -> { final int number = count.getAndIncrement(); switch (number) { case 0: @@ -272,7 +273,7 @@ void acceptNextSession() { NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, receiverOptions, connectionProcessor, tracerProvider, - messageSerializer, () -> { } + messageSerializer, () -> { }, CLIENT_IDENTIFIER ); // Act & Assert diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSessionTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSessionTest.java index eb001e2adfa3b..6cd68a6c5ddd7 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSessionTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSessionTest.java @@ -82,6 +82,7 @@ public class ServiceBusReactorSessionTest { private static final String ENTITY_PATH = "entityPath"; private static final String VIA_ENTITY_PATH = "viaEntityPath"; private static final String VIA_ENTITY_PATH_SENDER_LINK_NAME = "VIA-" + VIA_ENTITY_PATH; + private static final String CLIENT_IDENTIFIER = "clientIdentifier"; @Mock private Reactor reactor; @@ -220,7 +221,7 @@ void createViaSenderLink() throws IOException { // Act serviceBusReactorSession.createProducer(VIA_ENTITY_PATH_SENDER_LINK_NAME, VIA_ENTITY_PATH, - retryOptions.getTryTimeout(), retryPolicy, ENTITY_PATH) + retryOptions.getTryTimeout(), retryPolicy, ENTITY_PATH, CLIENT_IDENTIFIER) .subscribe(); // Assert @@ -250,7 +251,7 @@ void createViaSenderLinkDestinationEntityAuthorizeFails() throws IOException { // Act StepVerifier.create(serviceBusReactorSession.createProducer(VIA_ENTITY_PATH_SENDER_LINK_NAME, VIA_ENTITY_PATH, - retryOptions.getTryTimeout(), retryPolicy, ENTITY_PATH)) + retryOptions.getTryTimeout(), retryPolicy, ENTITY_PATH, CLIENT_IDENTIFIER)) .verifyError(RuntimeException.class); // Assert