Skip to content

Commit

Permalink
Service Bus client identifier #28904 (#30573)
Browse files Browse the repository at this point in the history
* Service Bus client identifier #28904

* consistent code style & add some more test
  • Loading branch information
ZejiaJiang authored Sep 19, 2022
1 parent 114bfea commit c1f3616
Show file tree
Hide file tree
Showing 21 changed files with 260 additions and 98 deletions.
1 change: 1 addition & 0 deletions sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 7.11.0-beta.1 (Unreleased)

### Features Added
- Added identifier to client. ([#28904](https://github.com/Azure/azure-sdk-for-java/issues/28904))

### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpClientOptions;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.amqp.ProxyAuthenticationType;
Expand Down Expand Up @@ -59,6 +60,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -953,8 +955,16 @@ public ServiceBusSenderAsyncClient buildAsyncClient() {
new IllegalArgumentException("Unknown entity type: " + entityType));
}

final String clientIdentifier;
if (clientOptions instanceof AmqpClientOptions) {
String clientOptionIdentifier = ((AmqpClientOptions) clientOptions).getIdentifier();
clientIdentifier = CoreUtils.isNullOrEmpty(clientOptionIdentifier) ? UUID.randomUUID().toString() : clientOptionIdentifier;
} else {
clientIdentifier = UUID.randomUUID().toString();
}

return new ServiceBusSenderAsyncClient(entityName, entityType, connectionProcessor, retryOptions,
tracerProvider, messageSerializer, ServiceBusClientBuilder.this::onClientClose, null);
tracerProvider, messageSerializer, ServiceBusClientBuilder.this::onClientClose, null, clientIdentifier);
}

/**
Expand Down Expand Up @@ -1425,8 +1435,16 @@ ServiceBusReceiverAsyncClient buildAsyncClientForProcessor() {
maxAutoLockRenewDuration, enableAutoComplete, null,
maxConcurrentSessions);

final String clientIdentifier;
if (clientOptions instanceof AmqpClientOptions) {
String clientOptionIdentifier = ((AmqpClientOptions) clientOptions).getIdentifier();
clientIdentifier = CoreUtils.isNullOrEmpty(clientOptionIdentifier) ? UUID.randomUUID().toString() : clientOptionIdentifier;
} else {
clientIdentifier = UUID.randomUUID().toString();
}

final ServiceBusSessionManager sessionManager = new ServiceBusSessionManager(entityPath, entityType,
connectionProcessor, tracerProvider, messageSerializer, receiverOptions);
connectionProcessor, tracerProvider, messageSerializer, receiverOptions, clientIdentifier);

return new ServiceBusReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), entityPath,
entityType, receiverOptions, connectionProcessor, ServiceBusConstants.OPERATION_TIMEOUT,
Expand Down Expand Up @@ -1494,9 +1512,17 @@ private ServiceBusSessionReceiverAsyncClient buildAsyncClient(boolean isAutoComp
final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount,
maxAutoLockRenewDuration, enableAutoComplete, null, maxConcurrentSessions);

final String clientIdentifier;
if (clientOptions instanceof AmqpClientOptions) {
String clientOptionIdentifier = ((AmqpClientOptions) clientOptions).getIdentifier();
clientIdentifier = CoreUtils.isNullOrEmpty(clientOptionIdentifier) ? UUID.randomUUID().toString() : clientOptionIdentifier;
} else {
clientIdentifier = UUID.randomUUID().toString();
}

return new ServiceBusSessionReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(),
entityPath, entityType, receiverOptions, connectionProcessor, tracerProvider, messageSerializer,
ServiceBusClientBuilder.this::onClientClose);
ServiceBusClientBuilder.this::onClientClose, clientIdentifier);
}
}

Expand Down Expand Up @@ -1928,9 +1954,17 @@ ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed) {
final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount,
maxAutoLockRenewDuration, enableAutoComplete);

final String clientIdentifier;
if (clientOptions instanceof AmqpClientOptions) {
String clientOptionIdentifier = ((AmqpClientOptions) clientOptions).getIdentifier();
clientIdentifier = CoreUtils.isNullOrEmpty(clientOptionIdentifier) ? UUID.randomUUID().toString() : clientOptionIdentifier;
} else {
clientIdentifier = UUID.randomUUID().toString();
}

return new ServiceBusReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), entityPath,
entityType, receiverOptions, connectionProcessor, ServiceBusConstants.OPERATION_TIMEOUT,
tracerProvider, messageSerializer, ServiceBusClientBuilder.this::onClientClose);
tracerProvider, messageSerializer, ServiceBusClientBuilder.this::onClientClose, clientIdentifier);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,22 @@ public String getSubscriptionName() {
return this.subscriptionName;
}

/**
* Gets the identifier of the instance of {@link ServiceBusProcessorClient}.
*
* @return The identifier that can identify the instance of {@link ServiceBusProcessorClient}.
*/
public synchronized String getIdentifier() {
if (asyncClient.get() == null) {
ServiceBusReceiverAsyncClient newReceiverClient = receiverBuilder == null
? sessionReceiverBuilder.buildAsyncClientForProcessor()
: receiverBuilder.buildAsyncClient();
asyncClient.set(newReceiverClient);
}

return asyncClient.get().getIdentifier();
}

private synchronized void receiveMessages() {
if (receiverSubscriptions.size() > 0) {
// For the case of start -> stop -> start again
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ public final class ServiceBusReceiverAsyncClient implements AutoCloseable {
private final Runnable onClientClose;
private final ServiceBusSessionManager sessionManager;
private final Semaphore completionLock = new Semaphore(1);
private final String identifier;

// Starting at -1 because that is before the beginning of the stream.
private final AtomicLong lastPeekedSequenceNumber = new AtomicLong(-1);
Expand All @@ -254,7 +255,7 @@ public final class ServiceBusReceiverAsyncClient implements AutoCloseable {
*/
ServiceBusReceiverAsyncClient(String fullyQualifiedNamespace, String entityPath, MessagingEntityType entityType,
ReceiverOptions receiverOptions, ServiceBusConnectionProcessor connectionProcessor, Duration cleanupInterval,
TracerProvider tracerProvider, MessageSerializer messageSerializer, Runnable onClientClose) {
TracerProvider tracerProvider, MessageSerializer messageSerializer, Runnable onClientClose, String identifier) {
this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace,
"'fullyQualifiedNamespace' cannot be null.");
this.entityPath = Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
Expand All @@ -275,6 +276,7 @@ public final class ServiceBusReceiverAsyncClient implements AutoCloseable {
});

this.sessionManager = null;
this.identifier = identifier;
}

ServiceBusReceiverAsyncClient(String fullyQualifiedNamespace, String entityPath, MessagingEntityType entityType,
Expand All @@ -300,6 +302,8 @@ public final class ServiceBusReceiverAsyncClient implements AutoCloseable {
.log("Closing expired renewal operation.", renewal.getThrowable());
renewal.close();
});

this.identifier = sessionManager.getIdentifier();
}

/**
Expand Down Expand Up @@ -330,6 +334,15 @@ public String getSessionId() {
return receiverOptions.getSessionId();
}

/**
* Gets the identifier of the instance of {@link ServiceBusReceiverAsyncClient}.
*
* @return The identifier that can identify the instance of {@link ServiceBusReceiverAsyncClient}.
*/
public String getIdentifier() {
return identifier;
}

/**
* Abandons a {@link ServiceBusReceivedMessage message}. This will make the message available again for processing.
* Abandoning a message will increase the delivery count on the message.
Expand Down Expand Up @@ -1450,10 +1463,10 @@ private ServiceBusAsyncConsumer getOrCreateConsumer() {
final Mono<ServiceBusReceiveLink> receiveLinkMono = connectionProcessor.flatMap(connection -> {
if (receiverOptions.isSessionReceiver()) {
return connection.createReceiveLink(linkName, entityPath, receiverOptions.getReceiveMode(),
null, entityType, receiverOptions.getSessionId());
null, entityType, identifier, receiverOptions.getSessionId());
} else {
return connection.createReceiveLink(linkName, entityPath, receiverOptions.getReceiveMode(),
null, entityType);
null, entityType, identifier);
}
}).doOnNext(next -> {
LOGGER.atVerbose()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,15 @@ public String getSessionId() {
return asyncClient.getSessionId();
}

/**
* Gets the identifier of the instance of {@link ServiceBusReceiverClient}.
*
* @return The identifier that can identify the instance of {@link ServiceBusReceiverClient}.
*/
public String getIdentifier() {
return asyncClient.getIdentifier();
}

/**
* Abandons a {@link ServiceBusReceivedMessage message}. This will make the message available again for processing.
* Abandoning a message will increase the delivery count on the message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,14 @@ public final class ServiceBusSenderAsyncClient implements AutoCloseable {
private final String entityName;
private final ServiceBusConnectionProcessor connectionProcessor;
private final String viaEntityName;
private final String identifier;

/**
* Creates a new instance of this {@link ServiceBusSenderAsyncClient} that sends messages to a Service Bus entity.
*/
ServiceBusSenderAsyncClient(String entityName, MessagingEntityType entityType,
ServiceBusConnectionProcessor connectionProcessor, AmqpRetryOptions retryOptions, TracerProvider tracerProvider,
MessageSerializer messageSerializer, Runnable onClientClose, String viaEntityName) {
MessageSerializer messageSerializer, Runnable onClientClose, String viaEntityName, String identifier) {
// Caching the created link so we don't invoke another link creation.
this.messageSerializer = Objects.requireNonNull(messageSerializer,
"'messageSerializer' cannot be null.");
Expand All @@ -198,6 +199,7 @@ public final class ServiceBusSenderAsyncClient implements AutoCloseable {
this.entityType = entityType;
this.viaEntityName = viaEntityName;
this.onClientClose = onClientClose;
this.identifier = identifier;
}

/**
Expand All @@ -218,6 +220,15 @@ public String getEntityPath() {
return entityName;
}

/**
* Gets the identifier of the instance of {@link ServiceBusSenderAsyncClient}.
*
* @return The identifier that can identify the instance of {@link ServiceBusSenderAsyncClient}.
*/
public String getIdentifier() {
return identifier;
}

/**
* Sends a message to a Service Bus queue or topic.
*
Expand Down Expand Up @@ -824,9 +835,9 @@ private Mono<AmqpSendLink> getSendLink() {
.flatMap(connection -> {
if (!CoreUtils.isNullOrEmpty(viaEntityName)) {
return connection.createSendLink("VIA-".concat(viaEntityName), viaEntityName, retryOptions,
entityName);
entityName, identifier);
} else {
return connection.createSendLink(entityName, entityName, retryOptions, null);
return connection.createSendLink(entityName, entityName, retryOptions, null, identifier);
}
})
.doOnNext(next -> linkName.compareAndSet(null, next.getLinkName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,15 @@ public String getFullyQualifiedNamespace() {
return asyncClient.getFullyQualifiedNamespace();
}

/**
* Gets the identifier of the instance of {@link ServiceBusSenderClient}.
*
* @return The identifier that can identify the instance of {@link ServiceBusSenderClient}.
*/
public String getIdentifier() {
return asyncClient.getIdentifier();
}

/**
* Sends a message to a Service Bus queue or topic.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class ServiceBusSessionManager implements AutoCloseable {
private final Duration operationTimeout;
private final TracerProvider tracerProvider;
private final MessageSerializer messageSerializer;
private final String identifier;

private final AtomicBoolean isDisposed = new AtomicBoolean();
private final AtomicBoolean isStarted = new AtomicBoolean();
Expand All @@ -81,7 +82,7 @@ class ServiceBusSessionManager implements AutoCloseable {

ServiceBusSessionManager(String entityPath, MessagingEntityType entityType,
ServiceBusConnectionProcessor connectionProcessor, TracerProvider tracerProvider,
MessageSerializer messageSerializer, ReceiverOptions receiverOptions, ServiceBusReceiveLink receiveLink) {
MessageSerializer messageSerializer, ReceiverOptions receiverOptions, ServiceBusReceiveLink receiveLink, String identifier) {
this.entityPath = entityPath;
this.entityType = entityType;
this.receiverOptions = receiverOptions;
Expand All @@ -90,6 +91,7 @@ class ServiceBusSessionManager implements AutoCloseable {
this.tracerProvider = tracerProvider;
this.messageSerializer = messageSerializer;
this.maxSessionLockRenewDuration = receiverOptions.getMaxLockRenewDuration();
this.identifier = identifier;

// According to the documentation, if a sequence is not finite, it should be published on their own scheduler.
// It's possible that some of these sessions have a lot of messages.
Expand All @@ -112,9 +114,9 @@ class ServiceBusSessionManager implements AutoCloseable {

ServiceBusSessionManager(String entityPath, MessagingEntityType entityType,
ServiceBusConnectionProcessor connectionProcessor, TracerProvider tracerProvider,
MessageSerializer messageSerializer, ReceiverOptions receiverOptions) {
MessageSerializer messageSerializer, ReceiverOptions receiverOptions, String identifier) {
this(entityPath, entityType, connectionProcessor, tracerProvider,
messageSerializer, receiverOptions, null);
messageSerializer, receiverOptions, null, identifier);
}

/**
Expand All @@ -129,6 +131,15 @@ String getLinkName(String sessionId) {
return receiver != null ? receiver.getLinkName() : null;
}

/**
* Gets the identifier of the instance of {@link ServiceBusSessionManager}.
*
* @return The identifier that can identify the instance of {@link ServiceBusSessionManager}.
*/
public String getIdentifier() {
return this.identifier;
}

/**
* Gets the state of a session given its identifier.
*
Expand Down Expand Up @@ -253,7 +264,7 @@ private Mono<ServiceBusReceiveLink> createSessionReceiveLink() {
return connectionProcessor
.flatMap(connection -> {
return connection.createReceiveLink(linkName, entityPath, receiverOptions.getReceiveMode(),
null, entityType, sessionId);
null, entityType, identifier, sessionId);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,12 @@ public final class ServiceBusSessionReceiverAsyncClient implements AutoCloseable
private final MessageSerializer messageSerializer;
private final Runnable onClientClose;
private final ServiceBusSessionManager unNamedSessionManager; // for acceptNextSession()
private final String identifier;

ServiceBusSessionReceiverAsyncClient(String fullyQualifiedNamespace, String entityPath,
MessagingEntityType entityType, ReceiverOptions receiverOptions,
ServiceBusConnectionProcessor connectionProcessor, TracerProvider tracerProvider,
MessageSerializer messageSerializer, Runnable onClientClose) {
MessageSerializer messageSerializer, Runnable onClientClose, String identifier) {
this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace,
"'fullyQualifiedNamespace' cannot be null.");
this.entityPath = Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
Expand All @@ -113,7 +114,8 @@ public final class ServiceBusSessionReceiverAsyncClient implements AutoCloseable
this.messageSerializer = Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null.");
this.onClientClose = Objects.requireNonNull(onClientClose, "'onClientClose' cannot be null.");
this.unNamedSessionManager = new ServiceBusSessionManager(entityPath, entityType, connectionProcessor,
tracerProvider, messageSerializer, receiverOptions);
tracerProvider, messageSerializer, receiverOptions, identifier);
this.identifier = identifier;
}

/**
Expand All @@ -136,7 +138,7 @@ public Mono<ServiceBusReceiverAsyncClient> acceptNextSession() {
receiverOptions.isEnableAutoComplete(), sessionId, null);
final ServiceBusSessionManager sessionSpecificManager = new ServiceBusSessionManager(entityPath,
entityType, connectionProcessor, tracerProvider, messageSerializer, newReceiverOptions,
receiveLink);
receiveLink, identifier);
return new ServiceBusReceiverAsyncClient(fullyQualifiedNamespace, entityPath,
entityType, newReceiverOptions, connectionProcessor, ServiceBusConstants.OPERATION_TIMEOUT,
tracerProvider, messageSerializer, () -> { }, sessionSpecificManager);
Expand Down Expand Up @@ -170,7 +172,7 @@ public Mono<ServiceBusReceiverAsyncClient> acceptSession(String sessionId) {
receiverOptions.getPrefetchCount(), receiverOptions.getMaxLockRenewDuration(),
receiverOptions.isEnableAutoComplete(), sessionId, null);
final ServiceBusSessionManager sessionSpecificManager = new ServiceBusSessionManager(entityPath, entityType,
connectionProcessor, tracerProvider, messageSerializer, newReceiverOptions);
connectionProcessor, tracerProvider, messageSerializer, newReceiverOptions, identifier);

return sessionSpecificManager.getActiveLink().map(receiveLink -> new ServiceBusReceiverAsyncClient(
fullyQualifiedNamespace, entityPath, entityType, newReceiverOptions, connectionProcessor,
Expand Down
Loading

0 comments on commit c1f3616

Please sign in to comment.