diff --git a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md
index 484d416f2bfb7..b4ec1a1d0919a 100644
--- a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md
+++ b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md
@@ -3,6 +3,7 @@
 ## 7.11.0-beta.1 (Unreleased)
 
 ### Features Added
+- Added identifier to client. ([#28904](https://github.com/Azure/azure-sdk-for-java/issues/28904))
 
 ### Breaking Changes
 
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java
index eea607dcddfd4..a8449e77cd008 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java
@@ -3,6 +3,7 @@
 
 package com.azure.messaging.servicebus;
 
+import com.azure.core.amqp.AmqpClientOptions;
 import com.azure.core.amqp.AmqpRetryOptions;
 import com.azure.core.amqp.AmqpTransportType;
 import com.azure.core.amqp.ProxyAuthenticationType;
@@ -59,6 +60,7 @@
 import java.util.Map;
 import java.util.Objects;
 import java.util.ServiceLoader;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.regex.Pattern;
@@ -953,8 +955,16 @@ public ServiceBusSenderAsyncClient buildAsyncClient() {
                         new IllegalArgumentException("Unknown entity type: " + entityType));
             }
 
+            final String clientIdentifier;
+            if (clientOptions instanceof AmqpClientOptions) {
+                String clientOptionIdentifier = ((AmqpClientOptions) clientOptions).getIdentifier();
+                clientIdentifier = CoreUtils.isNullOrEmpty(clientOptionIdentifier) ? UUID.randomUUID().toString() : clientOptionIdentifier;
+            } else {
+                clientIdentifier = UUID.randomUUID().toString();
+            }
+
             return new ServiceBusSenderAsyncClient(entityName, entityType, connectionProcessor, retryOptions,
-                tracerProvider, messageSerializer, ServiceBusClientBuilder.this::onClientClose, null);
+                tracerProvider, messageSerializer, ServiceBusClientBuilder.this::onClientClose, null, clientIdentifier);
         }
 
         /**
@@ -1425,8 +1435,16 @@ ServiceBusReceiverAsyncClient buildAsyncClientForProcessor() {
                 maxAutoLockRenewDuration, enableAutoComplete, null,
                 maxConcurrentSessions);
 
+            final String clientIdentifier;
+            if (clientOptions instanceof AmqpClientOptions) {
+                String clientOptionIdentifier = ((AmqpClientOptions) clientOptions).getIdentifier();
+                clientIdentifier = CoreUtils.isNullOrEmpty(clientOptionIdentifier) ? UUID.randomUUID().toString() : clientOptionIdentifier;
+            } else {
+                clientIdentifier = UUID.randomUUID().toString();
+            }
+
             final ServiceBusSessionManager sessionManager = new ServiceBusSessionManager(entityPath, entityType,
-                connectionProcessor, tracerProvider, messageSerializer, receiverOptions);
+                connectionProcessor, tracerProvider, messageSerializer, receiverOptions, clientIdentifier);
 
             return new ServiceBusReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), entityPath,
                 entityType, receiverOptions, connectionProcessor, ServiceBusConstants.OPERATION_TIMEOUT,
@@ -1494,9 +1512,17 @@ private ServiceBusSessionReceiverAsyncClient buildAsyncClient(boolean isAutoComp
             final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount,
                 maxAutoLockRenewDuration, enableAutoComplete, null, maxConcurrentSessions);
 
+            final String clientIdentifier;
+            if (clientOptions instanceof AmqpClientOptions) {
+                String clientOptionIdentifier = ((AmqpClientOptions) clientOptions).getIdentifier();
+                clientIdentifier = CoreUtils.isNullOrEmpty(clientOptionIdentifier) ? UUID.randomUUID().toString() : clientOptionIdentifier;
+            } else {
+                clientIdentifier = UUID.randomUUID().toString();
+            }
+
             return new ServiceBusSessionReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(),
                 entityPath, entityType, receiverOptions, connectionProcessor, tracerProvider, messageSerializer,
-                ServiceBusClientBuilder.this::onClientClose);
+                ServiceBusClientBuilder.this::onClientClose, clientIdentifier);
         }
     }
 
@@ -1928,9 +1954,17 @@ ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed) {
             final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount,
                 maxAutoLockRenewDuration, enableAutoComplete);
 
+            final String clientIdentifier;
+            if (clientOptions instanceof AmqpClientOptions) {
+                String clientOptionIdentifier = ((AmqpClientOptions) clientOptions).getIdentifier();
+                clientIdentifier = CoreUtils.isNullOrEmpty(clientOptionIdentifier) ? UUID.randomUUID().toString() : clientOptionIdentifier;
+            } else {
+                clientIdentifier = UUID.randomUUID().toString();
+            }
+
             return new ServiceBusReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), entityPath,
                 entityType, receiverOptions, connectionProcessor, ServiceBusConstants.OPERATION_TIMEOUT,
-                tracerProvider, messageSerializer, ServiceBusClientBuilder.this::onClientClose);
+                tracerProvider, messageSerializer, ServiceBusClientBuilder.this::onClientClose, clientIdentifier);
         }
     }
 
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java
index f84294ce5c1b4..55d2d83e7f553 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java
@@ -302,6 +302,22 @@ public String getSubscriptionName() {
         return this.subscriptionName;
     }
 
+    /**
+     * Gets the identifier of the instance of {@link ServiceBusProcessorClient}.
+     *
+     * @return The identifier that can identify the instance of {@link ServiceBusProcessorClient}.
+     */
+    public synchronized String getIdentifier() {
+        if (asyncClient.get() == null) {
+            ServiceBusReceiverAsyncClient newReceiverClient = receiverBuilder == null
+                ? sessionReceiverBuilder.buildAsyncClientForProcessor()
+                : receiverBuilder.buildAsyncClient();
+            asyncClient.set(newReceiverClient);
+        }
+
+        return asyncClient.get().getIdentifier();
+    }
+
     private synchronized void receiveMessages() {
         if (receiverSubscriptions.size() > 0) {
             // For the case of start -> stop -> start again
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java
index 62396daf12dd0..7cc782acb59bd 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java
@@ -235,6 +235,7 @@ public final class ServiceBusReceiverAsyncClient implements AutoCloseable {
     private final Runnable onClientClose;
     private final ServiceBusSessionManager sessionManager;
     private final Semaphore completionLock = new Semaphore(1);
+    private final String identifier;
 
     // Starting at -1 because that is before the beginning of the stream.
     private final AtomicLong lastPeekedSequenceNumber = new AtomicLong(-1);
@@ -254,7 +255,7 @@ public final class ServiceBusReceiverAsyncClient implements AutoCloseable {
      */
     ServiceBusReceiverAsyncClient(String fullyQualifiedNamespace, String entityPath, MessagingEntityType entityType,
         ReceiverOptions receiverOptions, ServiceBusConnectionProcessor connectionProcessor, Duration cleanupInterval,
-        TracerProvider tracerProvider, MessageSerializer messageSerializer, Runnable onClientClose) {
+        TracerProvider tracerProvider, MessageSerializer messageSerializer, Runnable onClientClose, String identifier) {
         this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace,
             "'fullyQualifiedNamespace' cannot be null.");
         this.entityPath = Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
@@ -275,6 +276,7 @@ public final class ServiceBusReceiverAsyncClient implements AutoCloseable {
         });
 
         this.sessionManager = null;
+        this.identifier = identifier;
     }
 
     ServiceBusReceiverAsyncClient(String fullyQualifiedNamespace, String entityPath, MessagingEntityType entityType,
@@ -300,6 +302,8 @@ public final class ServiceBusReceiverAsyncClient implements AutoCloseable {
                 .log("Closing expired renewal operation.", renewal.getThrowable());
             renewal.close();
         });
+
+        this.identifier = sessionManager.getIdentifier();
     }
 
     /**
@@ -330,6 +334,15 @@ public String getSessionId() {
         return receiverOptions.getSessionId();
     }
 
+    /**
+     * Gets the identifier of the instance of {@link ServiceBusReceiverAsyncClient}.
+     *
+     * @return The identifier that can identify the instance of {@link ServiceBusReceiverAsyncClient}.
+     */
+    public String getIdentifier() {
+        return identifier;
+    }
+
     /**
      * Abandons a {@link ServiceBusReceivedMessage message}. This will make the message available again for processing.
      * Abandoning a message will increase the delivery count on the message.
@@ -1450,10 +1463,10 @@ private ServiceBusAsyncConsumer getOrCreateConsumer() {
         final Mono<ServiceBusReceiveLink> receiveLinkMono = connectionProcessor.flatMap(connection -> {
             if (receiverOptions.isSessionReceiver()) {
                 return connection.createReceiveLink(linkName, entityPath, receiverOptions.getReceiveMode(),
-                        null, entityType, receiverOptions.getSessionId());
+                        null, entityType, identifier, receiverOptions.getSessionId());
             } else {
                 return connection.createReceiveLink(linkName, entityPath, receiverOptions.getReceiveMode(),
-                    null, entityType);
+                    null, entityType, identifier);
             }
         }).doOnNext(next -> {
             LOGGER.atVerbose()
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java
index 53c115163708f..80bf6f88450c1 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java
@@ -107,6 +107,15 @@ public String getSessionId() {
         return asyncClient.getSessionId();
     }
 
+    /**
+     * Gets the identifier of the instance of {@link ServiceBusReceiverClient}.
+     *
+     * @return The identifier that can identify the instance of {@link ServiceBusReceiverClient}.
+     */
+    public String getIdentifier() {
+        return asyncClient.getIdentifier();
+    }
+
     /**
      * Abandons a {@link ServiceBusReceivedMessage message}. This will make the message available again for processing.
      * Abandoning a message will increase the delivery count on the message.
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java
index af77d3dfb3196..71a896dfe0b58 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java
@@ -179,13 +179,14 @@ public final class ServiceBusSenderAsyncClient implements AutoCloseable {
     private final String entityName;
     private final ServiceBusConnectionProcessor connectionProcessor;
     private final String viaEntityName;
+    private final String identifier;
 
     /**
      * Creates a new instance of this {@link ServiceBusSenderAsyncClient} that sends messages to a Service Bus entity.
      */
     ServiceBusSenderAsyncClient(String entityName, MessagingEntityType entityType,
         ServiceBusConnectionProcessor connectionProcessor, AmqpRetryOptions retryOptions, TracerProvider tracerProvider,
-        MessageSerializer messageSerializer, Runnable onClientClose, String viaEntityName) {
+        MessageSerializer messageSerializer, Runnable onClientClose, String viaEntityName, String identifier) {
         // Caching the created link so we don't invoke another link creation.
         this.messageSerializer = Objects.requireNonNull(messageSerializer,
             "'messageSerializer' cannot be null.");
@@ -198,6 +199,7 @@ public final class ServiceBusSenderAsyncClient implements AutoCloseable {
         this.entityType = entityType;
         this.viaEntityName = viaEntityName;
         this.onClientClose = onClientClose;
+        this.identifier = identifier;
     }
 
     /**
@@ -218,6 +220,15 @@ public String getEntityPath() {
         return entityName;
     }
 
+    /**
+     * Gets the identifier of the instance of {@link ServiceBusSenderAsyncClient}.
+     *
+     * @return The identifier that can identify the instance of {@link ServiceBusSenderAsyncClient}.
+     */
+    public String getIdentifier() {
+        return identifier;
+    }
+
     /**
      * Sends a message to a Service Bus queue or topic.
      *
@@ -824,9 +835,9 @@ private Mono<AmqpSendLink> getSendLink() {
             .flatMap(connection -> {
                 if (!CoreUtils.isNullOrEmpty(viaEntityName)) {
                     return connection.createSendLink("VIA-".concat(viaEntityName), viaEntityName, retryOptions,
-                        entityName);
+                        entityName, identifier);
                 } else {
-                    return connection.createSendLink(entityName, entityName, retryOptions, null);
+                    return connection.createSendLink(entityName, entityName, retryOptions, null, identifier);
                 }
             })
             .doOnNext(next -> linkName.compareAndSet(null, next.getLinkName()));
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderClient.java
index f1dabfafe7724..0f3c8ecfd740d 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderClient.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderClient.java
@@ -168,6 +168,15 @@ public String getFullyQualifiedNamespace() {
         return asyncClient.getFullyQualifiedNamespace();
     }
 
+    /**
+     * Gets the identifier of the instance of {@link ServiceBusSenderClient}.
+     *
+     * @return The identifier that can identify the instance of {@link ServiceBusSenderClient}.
+     */
+    public String getIdentifier() {
+        return asyncClient.getIdentifier();
+    }
+
     /**
      * Sends a message to a Service Bus queue or topic.
      *
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java
index 0e716b91a11eb..8ff91d60a0708 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java
@@ -63,6 +63,7 @@ class ServiceBusSessionManager implements AutoCloseable {
     private final Duration operationTimeout;
     private final TracerProvider tracerProvider;
     private final MessageSerializer messageSerializer;
+    private final String identifier;
 
     private final AtomicBoolean isDisposed = new AtomicBoolean();
     private final AtomicBoolean isStarted = new AtomicBoolean();
@@ -81,7 +82,7 @@ class ServiceBusSessionManager implements AutoCloseable {
 
     ServiceBusSessionManager(String entityPath, MessagingEntityType entityType,
         ServiceBusConnectionProcessor connectionProcessor, TracerProvider tracerProvider,
-        MessageSerializer messageSerializer, ReceiverOptions receiverOptions, ServiceBusReceiveLink receiveLink) {
+        MessageSerializer messageSerializer, ReceiverOptions receiverOptions, ServiceBusReceiveLink receiveLink, String identifier) {
         this.entityPath = entityPath;
         this.entityType = entityType;
         this.receiverOptions = receiverOptions;
@@ -90,6 +91,7 @@ class ServiceBusSessionManager implements AutoCloseable {
         this.tracerProvider = tracerProvider;
         this.messageSerializer = messageSerializer;
         this.maxSessionLockRenewDuration = receiverOptions.getMaxLockRenewDuration();
+        this.identifier = identifier;
 
         // According to the documentation, if a sequence is not finite, it should be published on their own scheduler.
         // It's possible that some of these sessions have a lot of messages.
@@ -112,9 +114,9 @@ class ServiceBusSessionManager implements AutoCloseable {
 
     ServiceBusSessionManager(String entityPath, MessagingEntityType entityType,
         ServiceBusConnectionProcessor connectionProcessor, TracerProvider tracerProvider,
-        MessageSerializer messageSerializer, ReceiverOptions receiverOptions) {
+        MessageSerializer messageSerializer, ReceiverOptions receiverOptions, String identifier) {
         this(entityPath, entityType, connectionProcessor, tracerProvider,
-            messageSerializer, receiverOptions, null);
+            messageSerializer, receiverOptions, null, identifier);
     }
 
     /**
@@ -129,6 +131,15 @@ String getLinkName(String sessionId) {
         return receiver != null ? receiver.getLinkName() : null;
     }
 
+    /**
+     * Gets the identifier of the instance of {@link ServiceBusSessionManager}.
+     *
+     * @return The identifier that can identify the instance of {@link ServiceBusSessionManager}.
+     */
+    public String getIdentifier() {
+        return this.identifier;
+    }
+
     /**
      * Gets the state of a session given its identifier.
      *
@@ -253,7 +264,7 @@ private Mono<ServiceBusReceiveLink> createSessionReceiveLink() {
         return connectionProcessor
             .flatMap(connection -> {
                 return connection.createReceiveLink(linkName, entityPath, receiverOptions.getReceiveMode(),
-                null, entityType, sessionId);
+                null, entityType, identifier, sessionId);
             });
     }
 
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java
index ed58979124be3..9be0ab93b465f 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java
@@ -98,11 +98,12 @@ public final class ServiceBusSessionReceiverAsyncClient implements AutoCloseable
     private final MessageSerializer messageSerializer;
     private final Runnable onClientClose;
     private final ServiceBusSessionManager unNamedSessionManager;  // for acceptNextSession()
+    private final String identifier;
 
     ServiceBusSessionReceiverAsyncClient(String fullyQualifiedNamespace, String entityPath,
         MessagingEntityType entityType, ReceiverOptions receiverOptions,
         ServiceBusConnectionProcessor connectionProcessor, TracerProvider tracerProvider,
-        MessageSerializer messageSerializer, Runnable onClientClose) {
+        MessageSerializer messageSerializer, Runnable onClientClose, String identifier) {
         this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace,
             "'fullyQualifiedNamespace' cannot be null.");
         this.entityPath = Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
@@ -113,7 +114,8 @@ public final class ServiceBusSessionReceiverAsyncClient implements AutoCloseable
         this.messageSerializer = Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null.");
         this.onClientClose = Objects.requireNonNull(onClientClose, "'onClientClose' cannot be null.");
         this.unNamedSessionManager = new ServiceBusSessionManager(entityPath, entityType, connectionProcessor,
-            tracerProvider, messageSerializer, receiverOptions);
+            tracerProvider, messageSerializer, receiverOptions, identifier);
+        this.identifier = identifier;
     }
 
     /**
@@ -136,7 +138,7 @@ public Mono<ServiceBusReceiverAsyncClient> acceptNextSession() {
                     receiverOptions.isEnableAutoComplete(), sessionId, null);
                 final ServiceBusSessionManager sessionSpecificManager = new ServiceBusSessionManager(entityPath,
                     entityType, connectionProcessor, tracerProvider, messageSerializer, newReceiverOptions,
-                    receiveLink);
+                    receiveLink, identifier);
                 return new ServiceBusReceiverAsyncClient(fullyQualifiedNamespace, entityPath,
                     entityType, newReceiverOptions, connectionProcessor, ServiceBusConstants.OPERATION_TIMEOUT,
                     tracerProvider, messageSerializer, () -> { }, sessionSpecificManager);
@@ -170,7 +172,7 @@ public Mono<ServiceBusReceiverAsyncClient> acceptSession(String sessionId) {
             receiverOptions.getPrefetchCount(), receiverOptions.getMaxLockRenewDuration(),
             receiverOptions.isEnableAutoComplete(), sessionId, null);
         final ServiceBusSessionManager sessionSpecificManager = new ServiceBusSessionManager(entityPath, entityType,
-            connectionProcessor, tracerProvider, messageSerializer, newReceiverOptions);
+            connectionProcessor, tracerProvider, messageSerializer, newReceiverOptions, identifier);
 
         return sessionSpecificManager.getActiveLink().map(receiveLink -> new ServiceBusReceiverAsyncClient(
             fullyQualifiedNamespace, entityPath, entityType, newReceiverOptions, connectionProcessor,
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusAmqpConnection.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusAmqpConnection.java
index 1f1c3ccf31d8a..2e96583a7be77 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusAmqpConnection.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusAmqpConnection.java
@@ -27,11 +27,11 @@ public interface ServiceBusAmqpConnection extends AmqpConnection {
      * @param entityPath The remote address to connect to for the message broker.
      * @param retryOptions Options to use when creating the link.
      * @param transferEntityPath Path if the message should be transferred this destination by message broker.
-     *
+     * @param clientIdentifier The identifier of the client.
      * @return A new or existing send link that is connected to the given {@code entityPath}.
      */
     Mono<AmqpSendLink> createSendLink(String linkName, String entityPath, AmqpRetryOptions retryOptions,
-        String transferEntityPath);
+        String transferEntityPath, String clientIdentifier);
 
     /**
      * Creates or gets an existing receive link. The same link is returned if there is an existing receive link with the
@@ -39,14 +39,14 @@ Mono<AmqpSendLink> createSendLink(String linkName, String entityPath, AmqpRetryO
      *
      * @param linkName The name of the link.
      * @param entityPath The remote address to connect to for the message broker.
+     * @param receiveMode {@link ServiceBusReceiveMode} to use when creating the link.
      * @param transferEntityPath Path if the message should be transferred to another link after being received
      *     from this link.
-     * @param receiveMode {@link ServiceBusReceiveMode} to use when creating the link.
-     *
+     * @param clientIdentifier The identifier of the client.
      * @return A new or existing receive link that is connected to the given {@code entityPath}.
      */
     Mono<ServiceBusReceiveLink> createReceiveLink(String linkName, String entityPath, ServiceBusReceiveMode receiveMode,
-        String transferEntityPath, MessagingEntityType entityType);
+        String transferEntityPath, MessagingEntityType entityType, String clientIdentifier);
 
     /**
      * Creates or gets an existing receive link for a given sessionId. The same link is returned if there is an
@@ -54,14 +54,14 @@ Mono<ServiceBusReceiveLink> createReceiveLink(String linkName, String entityPath
      *
      * @param linkName The name of the link.
      * @param entityPath The remote address to connect to for the message broker.
+     * @param receiveMode {@link ServiceBusReceiveMode} to use when creating the link.
      * @param transferEntityPath Path if the events should be transferred to another link after being received
      *     from this link.
-     * @param receiveMode {@link ServiceBusReceiveMode} to use when creating the link.
-     * @param sessionId to use when creating the link.
      * @param entityType {@link MessagingEntityType} to use when creating the link.
-     *
+     * @param clientIdentifier The identifier of the client.
+     * @param sessionId to use when creating the link.
      * @return A new or existing receive link that is connected to the given {@code entityPath}.
      */
     Mono<ServiceBusReceiveLink> createReceiveLink(String linkName, String entityPath, ServiceBusReceiveMode receiveMode,
-        String transferEntityPath, MessagingEntityType entityType, String sessionId);
+        String transferEntityPath, MessagingEntityType entityType, String clientIdentifier, String sessionId);
 }
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorAmqpConnection.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorAmqpConnection.java
index 916af3639d7b5..9ac88fcd091cf 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorAmqpConnection.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorAmqpConnection.java
@@ -141,19 +141,20 @@ public Mono<ServiceBusManagementNode> getManagementNode(String entityPath, Messa
      * @param entityPath The remote address to connect to for the message broker.
      * @param retryOptions Options to use when creating the link.
      * @param transferEntityPath Path if the message should be transferred this destination by message broker.
+     * @param clientIdentifier The identifier of the client.
      *
      * @return A new or existing send link that is connected to the given {@code entityPath}.
      */
     @Override
     public Mono<AmqpSendLink> createSendLink(String linkName, String entityPath, AmqpRetryOptions retryOptions,
-         String transferEntityPath) {
+        String transferEntityPath, String clientIdentifier) {
 
         return createSession(linkName).cast(ServiceBusSession.class).flatMap(session -> {
             LOGGER.atVerbose().addKeyValue(LINK_NAME_KEY, linkName).log("Get or create sender link.");
             final AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy(retryOptions);
 
             return session.createProducer(linkName + entityPath, entityPath, retryOptions.getTryTimeout(),
-                retryPolicy, transferEntityPath).cast(AmqpSendLink.class);
+                retryPolicy, transferEntityPath, clientIdentifier).cast(AmqpSendLink.class);
         });
     }
 
@@ -167,19 +168,20 @@ public Mono<AmqpSendLink> createSendLink(String linkName, String entityPath, Amq
      * @param transferEntityPath Path if the events should be transferred to another link after being received
      *     from this link.
      * @param entityType {@link MessagingEntityType} to use when creating the link.
+     * @param clientIdentifier The identifier of the client.
      *
      * @return A new or existing receive link that is connected to the given {@code entityPath}.
      */
     @Override
     public Mono<ServiceBusReceiveLink> createReceiveLink(String linkName, String entityPath,
-        ServiceBusReceiveMode receiveMode, String transferEntityPath, MessagingEntityType entityType) {
+        ServiceBusReceiveMode receiveMode, String transferEntityPath, MessagingEntityType entityType, String clientIdentifier) {
         return createSession(entityPath).cast(ServiceBusSession.class)
             .flatMap(session -> {
                 LOGGER.atVerbose().addKeyValue(ENTITY_PATH_KEY, entityPath).log("Get or create consumer.");
                 final AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy(retryOptions);
 
                 return session.createConsumer(linkName, entityPath, entityType, retryOptions.getTryTimeout(),
-                    retryPolicy, receiveMode);
+                    retryPolicy, receiveMode, clientIdentifier);
             });
     }
 
@@ -197,21 +199,21 @@ public Mono<AmqpSession> createSession(String sessionName) {
      * @param receiveMode Consumer options to use when creating the link.
      * @param transferEntityPath to use when creating the link.
      * @param entityType {@link MessagingEntityType} to use when creating the link.
+     * @param clientIdentifier The identifier of the client.
      * @param sessionId to use when creating the link.
      *
      * @return A new or existing receive link that is connected to the given {@code entityPath}.
      */
     @Override
-    public Mono<ServiceBusReceiveLink> createReceiveLink(String linkName, String entityPath,
-        ServiceBusReceiveMode receiveMode, String transferEntityPath, MessagingEntityType entityType,
-        String sessionId) {
+    public Mono<ServiceBusReceiveLink> createReceiveLink(String linkName, String entityPath, ServiceBusReceiveMode receiveMode,
+        String transferEntityPath, MessagingEntityType entityType, String clientIdentifier, String sessionId) {
         return createSession(entityPath).cast(ServiceBusSession.class)
             .flatMap(session -> {
                 LOGGER.atVerbose().addKeyValue(ENTITY_PATH_KEY, entityPath).log("Get or create consumer.");
                 final AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy(retryOptions);
 
                 return session.createConsumer(linkName, entityPath, entityType, retryOptions.getTryTimeout(),
-                    retryPolicy, receiveMode, sessionId);
+                    retryPolicy, receiveMode, clientIdentifier, sessionId);
             });
     }
 
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSession.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSession.java
index 82e7652ffceb3..ca169c82094d9 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSession.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSession.java
@@ -35,6 +35,8 @@
 import java.util.Map;
 import java.util.Objects;
 
+import static com.azure.core.amqp.implementation.AmqpConstants.CLIENT_IDENTIFIER;
+import static com.azure.core.amqp.implementation.AmqpConstants.CLIENT_RECEIVER_IDENTIFIER;
 import static com.azure.core.amqp.implementation.ClientConstants.ENTITY_PATH_KEY;
 import static com.azure.core.amqp.implementation.ClientConstants.LINK_NAME_KEY;
 import static com.azure.messaging.servicebus.implementation.MessageUtils.adjustServerTimeout;
@@ -90,26 +92,27 @@ class ServiceBusReactorSession extends ReactorSession implements ServiceBusSessi
 
     @Override
     public Mono<ServiceBusReceiveLink> createConsumer(String linkName, String entityPath,
-        MessagingEntityType entityType, Duration timeout, AmqpRetryPolicy retry, ServiceBusReceiveMode receiveMode) {
+            MessagingEntityType entityType, Duration timeout, AmqpRetryPolicy retry, ServiceBusReceiveMode receiveMode,
+            String clientIdentifier) {
         final Map<Symbol, Object> filter = new HashMap<>();
 
-        return createConsumer(linkName, entityPath, entityType, timeout, retry, receiveMode, filter);
+        return createConsumer(linkName, entityPath, entityType, timeout, retry, receiveMode, filter, clientIdentifier);
     }
 
     @Override
     public Mono<ServiceBusReceiveLink> createConsumer(String linkName, String entityPath,
         MessagingEntityType entityType, Duration timeout, AmqpRetryPolicy retry, ServiceBusReceiveMode receiveMode,
-        String sessionId) {
+        String clientIdentifier, String sessionId) {
 
         final Map<Symbol, Object> filter = new HashMap<>();
         filter.put(SESSION_FILTER, sessionId);
 
-        return createConsumer(linkName, entityPath, entityType, timeout, retry, receiveMode, filter);
+        return createConsumer(linkName, entityPath, entityType, timeout, retry, receiveMode, filter, clientIdentifier);
     }
 
     @Override
     public Mono<AmqpLink> createProducer(String linkName, String entityPath, Duration timeout,
-        AmqpRetryPolicy retry, String transferEntityPath) {
+        AmqpRetryPolicy retry, String transferEntityPath, String clientIdentifier) {
         Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
         Objects.requireNonNull(timeout, "'timeout' cannot be null.");
         Objects.requireNonNull(retry, "'retry' cannot be null.");
@@ -118,7 +121,7 @@ public Mono<AmqpLink> createProducer(String linkName, String entityPath, Duratio
         Map<Symbol, Object> linkProperties = new HashMap<>();
 
         linkProperties.put(LINK_TIMEOUT_PROPERTY, UnsignedInteger.valueOf(serverTimeout.toMillis()));
-
+        linkProperties.put(CLIENT_IDENTIFIER, clientIdentifier);
         if (!CoreUtils.isNullOrEmpty(transferEntityPath)) {
             linkProperties.put(LINK_TRANSFER_DESTINATION_PROPERTY, transferEntityPath);
 
@@ -169,7 +172,7 @@ protected ReactorReceiver createConsumer(String entityPath, Receiver receiver,
 
     private Mono<ServiceBusReceiveLink> createConsumer(String linkName, String entityPath,
         MessagingEntityType entityType, Duration timeout, AmqpRetryPolicy retry, ServiceBusReceiveMode receiveMode,
-        Map<Symbol, Object> filter) {
+        Map<Symbol, Object> filter, String clientIdentifier) {
         Objects.requireNonNull(linkName, "'linkName' cannot be null.");
         Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
         Objects.requireNonNull(timeout, "'timeout' cannot be null.");
@@ -179,6 +182,7 @@ private Mono<ServiceBusReceiveLink> createConsumer(String linkName, String entit
         final Map<Symbol, Object> linkProperties = new HashMap<>();
         final Duration serverTimeout = adjustServerTimeout(timeout);
         linkProperties.put(LINK_TIMEOUT_PROPERTY, UnsignedInteger.valueOf(serverTimeout.toMillis()));
+        linkProperties.put(CLIENT_RECEIVER_IDENTIFIER, clientIdentifier);
         if (entityType != null) {
             linkProperties.put(ENTITY_TYPE_PROPERTY, entityType.getValue());
         }
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusSession.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusSession.java
index 14bbd62bbff26..94649e59fa08c 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusSession.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusSession.java
@@ -29,11 +29,12 @@ public interface ServiceBusSession extends AmqpSession {
      * @param timeout Timeout required for creating and opening an AMQP link.
      * @param retryPolicy The retry policy to use when consuming messages.
      * @param receiveMode The {@link ServiceBusReceiveMode} for the messages to be received.
+     * @param clientIdentifier The identifier of the client.
      *
      * @return A newly created AMQP link.
      */
     Mono<ServiceBusReceiveLink> createConsumer(String linkName, String entityPath, MessagingEntityType entityType,
-        Duration timeout, AmqpRetryPolicy retryPolicy, ServiceBusReceiveMode receiveMode);
+        Duration timeout, AmqpRetryPolicy retryPolicy, ServiceBusReceiveMode receiveMode, String clientIdentifier);
 
     /**
      * Creates a new AMQP link that consumes events from the message broker.
@@ -43,13 +44,15 @@ Mono<ServiceBusReceiveLink> createConsumer(String linkName, String entityPath, M
      * @param timeout Timeout required for creating and opening an AMQP link.
      * @param retryPolicy The retry policy to use when consuming messages.
      * @param receiveMode The {@link ServiceBusReceiveMode} for the messages to be received.
+     * @param clientIdentifier The identifier of the client.
      * @param sessionId The sessionId for the messages to be received. If {@code null}, then the next, unnamed session
      *     is retrieved.
      *
      * @return A newly created AMQP link.
      */
     Mono<ServiceBusReceiveLink> createConsumer(String linkName, String entityPath, MessagingEntityType entityType,
-        Duration timeout, AmqpRetryPolicy retryPolicy, ServiceBusReceiveMode receiveMode, String sessionId);
+        Duration timeout, AmqpRetryPolicy retryPolicy, ServiceBusReceiveMode receiveMode, String clientIdentifier,
+        String sessionId);
 
     /**
      * Creates a new {@link AmqpLink} that can send events to the message broker.
@@ -60,9 +63,10 @@ Mono<ServiceBusReceiveLink> createConsumer(String linkName, String entityPath, M
      * @param retryPolicy The retry policy to use when sending events.
      * @param transferEntityPath The entity path this link connects to, so that it may transfer events to
      *     the message broker via this entity.
+     * @param clientIdentifier The identifier of the client.
      *
      * @return A newly created AMQP link.
      */
-    Mono<AmqpLink> createProducer(String linkName, String entityPath, Duration timeout,
-        AmqpRetryPolicy retryPolicy, String transferEntityPath);
+    Mono<AmqpLink> createProducer(String linkName, String entityPath, Duration timeout, AmqpRetryPolicy retryPolicy,
+        String transferEntityPath, String clientIdentifier);
 }
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderTest.java
index d08defd815fba..0c0b084a08a45 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderTest.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderTest.java
@@ -10,6 +10,7 @@
 import com.azure.core.credential.AzureNamedKeyCredential;
 import com.azure.core.credential.AzureSasCredential;
 import com.azure.core.util.Configuration;
+import com.azure.core.util.CoreUtils;
 import com.azure.core.util.logging.ClientLogger;
 import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusReceiverClientBuilder;
 import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusSenderClientBuilder;
@@ -65,6 +66,18 @@ class ServiceBusClientBuilderTest extends IntegrationTestBase {
         super(new ClientLogger(ServiceBusClientBuilderTest.class));
     }
 
+    @Test
+    void ensureIdentifierString() {
+        final ServiceBusClientBuilder builder = new ServiceBusClientBuilder();
+        final ServiceBusSenderAsyncClient client = builder
+            .connectionString(NAMESPACE_CONNECTION_STRING)
+            .sender()
+            .queueName(QUEUE_NAME)
+            .buildAsyncClient();
+
+        Assertions.assertFalse(CoreUtils.isNullOrEmpty(client.getIdentifier()));
+    }
+
     @Test
     void deadLetterqueueClient() {
         // Arrange
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java
index 0de7b598e27b9..eab4c030b00f2 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java
@@ -108,6 +108,7 @@ class ServiceBusReceiverAsyncClientTest {
         NAMESPACE, "some-name", "something-else");
     private static final Duration CLEANUP_INTERVAL = Duration.ofSeconds(10);
     private static final String SESSION_ID = "my-session-id";
+    private static final String CLIENT_IDENTIFIER = "my-client-identifier";
 
     private static final ClientLogger LOGGER = new ClientLogger(ServiceBusReceiverAsyncClientTest.class);
     private final String messageTrackingUUID = UUID.randomUUID().toString();
@@ -180,9 +181,9 @@ AmqpTransportType.AMQP, new AmqpRetryOptions(), ProxyOptions.SYSTEM_DEFAULTS, Sc
             .thenReturn(Mono.just(managementNode));
 
         when(connection.createReceiveLink(anyString(), anyString(), any(ServiceBusReceiveMode.class), any(),
-            any(MessagingEntityType.class))).thenReturn(Mono.just(amqpReceiveLink));
+            any(MessagingEntityType.class), anyString())).thenReturn(Mono.just(amqpReceiveLink));
         when(connection.createReceiveLink(anyString(), anyString(), any(ServiceBusReceiveMode.class), any(),
-            any(MessagingEntityType.class), anyString())).thenReturn(Mono.just(sessionReceiveLink));
+            any(MessagingEntityType.class), anyString(), anyString())).thenReturn(Mono.just(sessionReceiveLink));
 
         connectionProcessor =
             Flux.<ServiceBusAmqpConnection>create(sink -> sink.next(connection))
@@ -191,12 +192,12 @@ AmqpTransportType.AMQP, new AmqpRetryOptions(), ProxyOptions.SYSTEM_DEFAULTS, Sc
 
         receiver = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE,
             new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, false),
-            connectionProcessor, CLEANUP_INTERVAL, tracerProvider, messageSerializer, onClientClose);
+            connectionProcessor, CLEANUP_INTERVAL, tracerProvider, messageSerializer, onClientClose, CLIENT_IDENTIFIER);
 
         sessionReceiver = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE,
             new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, false, SESSION_ID,
                 null),
-            connectionProcessor, CLEANUP_INTERVAL, tracerProvider, messageSerializer, onClientClose);
+            connectionProcessor, CLEANUP_INTERVAL, tracerProvider, messageSerializer, onClientClose, CLIENT_IDENTIFIER);
     }
 
     @AfterEach
@@ -208,6 +209,16 @@ void teardown(TestInfo testInfo) throws Exception {
         Mockito.framework().clearInlineMock(this);
     }
 
+    /**
+     * Verifies that the correct Service Bus properties are set.
+     */
+    @Test
+    void properties() {
+        Assertions.assertEquals(ENTITY_PATH, receiver.getEntityPath());
+        Assertions.assertEquals(NAMESPACE, receiver.getFullyQualifiedNamespace());
+        Assertions.assertEquals(CLIENT_IDENTIFIER, receiver.getIdentifier());
+    }
+
     /**
      * Verifies that when user calls peek more than one time, It returns different object.
      */
@@ -316,7 +327,7 @@ void receivesMessageLockRenewSessionOnly() {
         ServiceBusReceiverAsyncClient mySessionReceiver = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE,
             new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, maxLockRenewDuration,
                 false, SESSION_ID, null), connectionProcessor,
-            CLEANUP_INTERVAL, tracerProvider, messageSerializer, onClientClose);
+            CLEANUP_INTERVAL, tracerProvider, messageSerializer, onClientClose, CLIENT_IDENTIFIER);
 
         // This needs to be used with "try with resource" : https://javadoc.io/static/org.mockito/mockito-core/3.9.0/org/mockito/Mockito.html#static_mocks
         try (
@@ -409,7 +420,7 @@ void completeInReceiveAndDeleteMode() {
         final ReceiverOptions options = new ReceiverOptions(ServiceBusReceiveMode.RECEIVE_AND_DELETE, PREFETCH, null, false);
         ServiceBusReceiverAsyncClient client = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH,
             MessagingEntityType.QUEUE, options, connectionProcessor, CLEANUP_INTERVAL, tracerProvider,
-            messageSerializer, onClientClose);
+            messageSerializer, onClientClose, CLIENT_IDENTIFIER);
 
         final String lockToken1 = UUID.randomUUID().toString();
 
@@ -429,7 +440,7 @@ void throwsExceptionAboutSettlingPeekedMessagesWithNullLockToken() {
         final ReceiverOptions options = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, false);
         ServiceBusReceiverAsyncClient client = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH,
             MessagingEntityType.QUEUE, options, connectionProcessor, CLEANUP_INTERVAL, tracerProvider,
-            messageSerializer, onClientClose);
+            messageSerializer, onClientClose, CLIENT_IDENTIFIER);
 
         when(receivedMessage.getLockToken()).thenReturn(null);
 
@@ -547,7 +558,7 @@ void errorSourceOnRenewMessageLock() {
         final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, true);
         final ServiceBusReceiverAsyncClient receiver2 = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH,
             MessagingEntityType.QUEUE, receiverOptions, connectionProcessor, CLEANUP_INTERVAL, tracerProvider,
-            messageSerializer, onClientClose);
+            messageSerializer, onClientClose, CLIENT_IDENTIFIER);
 
         // Act & Assert
         StepVerifier.create(receiver2.renewMessageLock(receivedMessage, maxDuration))
@@ -571,7 +582,7 @@ void errorSourceOnSessionLock() {
         final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, true, SESSION_ID,
             null);
         final ServiceBusReceiverAsyncClient sessionReceiver2 = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE,
-            receiverOptions, connectionProcessor, CLEANUP_INTERVAL, tracerProvider, messageSerializer, onClientClose);
+            receiverOptions, connectionProcessor, CLEANUP_INTERVAL, tracerProvider, messageSerializer, onClientClose, CLIENT_IDENTIFIER);
 
         // Act & Assert
         StepVerifier.create(sessionReceiver2.renewSessionLock(SESSION_ID))
@@ -644,7 +655,7 @@ void errorSourceAutoCompleteMessage() {
         final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, true);
         final ServiceBusReceiverAsyncClient receiver2 = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH,
             MessagingEntityType.QUEUE, receiverOptions, connectionProcessor, CLEANUP_INTERVAL, tracerProvider,
-            messageSerializer, onClientClose);
+            messageSerializer, onClientClose, CLIENT_IDENTIFIER);
 
         when(receivedMessage.getLockToken()).thenReturn(lockToken);
         when(messageSerializer.deserialize(any(Message.class), eq(ServiceBusReceivedMessage.class)))
@@ -692,10 +703,10 @@ void errorSourceOnReceiveMessage() {
         final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, true);
         final ServiceBusReceiverAsyncClient receiver2 = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH,
             MessagingEntityType.QUEUE, receiverOptions, connectionProcessor, CLEANUP_INTERVAL, tracerProvider,
-            messageSerializer, onClientClose);
+            messageSerializer, onClientClose, CLIENT_IDENTIFIER);
 
         when(connection.createReceiveLink(anyString(), anyString(), any(ServiceBusReceiveMode.class), any(),
-            any(MessagingEntityType.class))).thenReturn(Mono.error(new AzureException("some receive link Error.")));
+            any(MessagingEntityType.class), anyString())).thenReturn(Mono.error(new AzureException("some receive link Error.")));
 
         // Act & Assert
         StepVerifier.create(receiver2.receiveMessages().take(1))
@@ -1228,7 +1239,7 @@ void autoCompleteMessage() {
         final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, true);
         final ServiceBusReceiverAsyncClient receiver2 = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH,
             MessagingEntityType.QUEUE, receiverOptions, connectionProcessor, CLEANUP_INTERVAL, tracerProvider,
-            messageSerializer, onClientClose);
+            messageSerializer, onClientClose, CLIENT_IDENTIFIER);
 
         when(receivedMessage.getLockToken()).thenReturn(lockToken);
         when(messageSerializer.deserialize(any(Message.class), eq(ServiceBusReceivedMessage.class)))
@@ -1262,7 +1273,7 @@ void autoCompleteMessageSessionReceiver() {
             true, SESSION_ID, null);
         final ServiceBusReceiverAsyncClient sessionReceiver2 = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH,
             MessagingEntityType.QUEUE, receiverOptions, connectionProcessor, CLEANUP_INTERVAL, tracerProvider,
-            messageSerializer, onClientClose);
+            messageSerializer, onClientClose, CLIENT_IDENTIFIER);
 
         final ServiceBusReceivedMessage receivedMessage3 = mock(ServiceBusReceivedMessage.class);
 
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java
index 8bd16da14e54d..bf5a1eb02b7ce 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java
@@ -55,6 +55,7 @@ class ServiceBusReceiverClientTest {
     private static final String ENTITY_PATH = "test-entity-path";
     private static final String LOCK_TOKEN = UUID.randomUUID().toString();
     private static final String SESSION_ID = "test-session-id";
+    private static final String CLIENT_IDENTIFIER = "my-client-identifier";
 
     private static final Duration OPERATION_TIMEOUT = Duration.ofSeconds(5);
 
@@ -81,6 +82,7 @@ void setup() {
         when(asyncClient.getEntityPath()).thenReturn(ENTITY_PATH);
         when(asyncClient.getFullyQualifiedNamespace()).thenReturn(NAMESPACE);
         when(asyncClient.getReceiverOptions()).thenReturn(new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 0, null, false));
+        when(asyncClient.getIdentifier()).thenReturn(CLIENT_IDENTIFIER);
         when(sessionReceiverOptions.getSessionId()).thenReturn(SESSION_ID);
         client = new ServiceBusReceiverClient(asyncClient, false, OPERATION_TIMEOUT);
     }
@@ -100,6 +102,7 @@ void nullConstructor() {
     void properties() {
         assertEquals(NAMESPACE, client.getFullyQualifiedNamespace());
         assertEquals(ENTITY_PATH, client.getEntityPath());
+        assertEquals(CLIENT_IDENTIFIER, client.getIdentifier());
     }
 
     @Test
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java
index 1939a3c95d043..07b2578093557 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java
@@ -96,7 +96,8 @@ class ServiceBusSenderAsyncClientTest {
     private static final String ENTITY_NAME = "my-servicebus-entity";
     private static final String LINK_NAME = "my-link-name";
     private static final BinaryData TEST_CONTENTS = BinaryData.fromString("My message for service bus queue!");
-    private static final  String TXN_ID_STRING = "1";
+    private static final String TXN_ID_STRING = "1";
+    private static final String CLIENT_IDENTIFIER = "my-client-identifier";
 
     @Mock
     private AmqpSendLink sendLink;
@@ -169,7 +170,7 @@ void setup() {
                 connectionOptions.getRetry()));
 
         sender = new ServiceBusSenderAsyncClient(ENTITY_NAME, MessagingEntityType.QUEUE, connectionProcessor,
-            retryOptions, tracerProvider, serializer, onClientClose, null);
+            retryOptions, tracerProvider, serializer, onClientClose, null, CLIENT_IDENTIFIER);
 
         when(connection.getManagementNode(anyString(), any(MessagingEntityType.class)))
             .thenReturn(just(managementNode));
@@ -196,6 +197,7 @@ void teardown() {
     void verifyProperties() {
         Assertions.assertEquals(ENTITY_NAME, sender.getEntityPath());
         Assertions.assertEquals(NAMESPACE, sender.getFullyQualifiedNamespace());
+        Assertions.assertEquals(CLIENT_IDENTIFIER, sender.getIdentifier());
     }
 
     /**
@@ -213,7 +215,7 @@ void createBatchNull() {
     @Test
     void createBatchDefault() {
         // Arrange
-        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull()))
+        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), eq(CLIENT_IDENTIFIER)))
             .thenReturn(Mono.just(sendLink));
         when(sendLink.getLinkSize()).thenReturn(Mono.just(MAX_MESSAGE_LENGTH_BYTES));
 
@@ -238,7 +240,7 @@ void createBatchWhenSizeTooBig() {
         final AmqpSendLink link = mock(AmqpSendLink.class);
         when(link.getLinkSize()).thenReturn(Mono.just(maxLinkSize));
 
-        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull()))
+        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull(), eq(CLIENT_IDENTIFIER)))
             .thenReturn(Mono.just(link));
 
         // This event is 1024 bytes when serialized.
@@ -267,7 +269,7 @@ void createsMessageBatchWithSize() {
         when(link.getLinkSize()).thenReturn(Mono.just(maxLinkSize));
 
         // EC is the prefix they use when creating a link that sends to the service round-robin.
-        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull()))
+        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull(), eq(CLIENT_IDENTIFIER)))
             .thenReturn(Mono.just(link));
 
         // This is 1024 bytes when serialized.
@@ -304,7 +306,7 @@ void scheduleMessageSizeTooBig() {
         final AmqpSendLink link = mock(AmqpSendLink.class);
         when(link.getLinkSize()).thenReturn(Mono.just(maxLinkSize));
 
-        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull()))
+        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), eq(CLIENT_IDENTIFIER)))
             .thenReturn(Mono.just(link));
         when(link.getLinkSize()).thenReturn(Mono.just(maxLinkSize));
 
@@ -332,7 +334,7 @@ void sendMultipleMessagesWithTransaction() {
             Assertions.assertTrue(batch.tryAddMessage(message));
         });
 
-        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull()))
+        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull(), eq(CLIENT_IDENTIFIER)))
             .thenReturn(Mono.just(sendLink));
         when(sendLink.send(any(Message.class), any(DeliveryState.class))).thenReturn(Mono.empty());
         when(sendLink.send(anyList(), any(DeliveryState.class))).thenReturn(Mono.empty());
@@ -371,7 +373,7 @@ void sendMultipleMessages() {
             Assertions.assertTrue(batch.tryAddMessage(message));
         });
 
-        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull()))
+        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull(), eq(CLIENT_IDENTIFIER)))
             .thenReturn(Mono.just(sendLink));
         when(sendLink.send(anyList())).thenReturn(Mono.empty());
         // Act
@@ -401,9 +403,9 @@ void sendMultipleMessagesTracerSpans() {
         final ServiceBusMessageBatch batch = new ServiceBusMessageBatch(256 * 1024,
             errorContextProvider, tracerProvider1, serializer, null, null);
         sender = new ServiceBusSenderAsyncClient(ENTITY_NAME, MessagingEntityType.QUEUE, connectionProcessor,
-            retryOptions, tracerProvider1, serializer, onClientClose, null);
+            retryOptions, tracerProvider1, serializer, onClientClose, null, CLIENT_IDENTIFIER);
 
-        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull()))
+        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull(), eq(CLIENT_IDENTIFIER)))
             .thenReturn(Mono.just(sendLink));
         when(sendLink.send(anyList())).thenReturn(Mono.empty());
         when(tracer1.start(eq(AZ_TRACING_SERVICE_NAME + "send"), any(Context.class), eq(ProcessKind.SEND)))
@@ -460,7 +462,7 @@ void sendMessagesListWithTransaction() {
         final int count = 4;
         final List<ServiceBusMessage> messages = TestUtils.getServiceBusMessages(count, UUID.randomUUID().toString());
 
-        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull()))
+        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull(), eq(CLIENT_IDENTIFIER)))
             .thenReturn(Mono.just(sendLink));
         when(sendLink.send(any(Message.class), any(DeliveryState.class))).thenReturn(Mono.empty());
         when(sendLink.send(anyList(), any(DeliveryState.class))).thenReturn(Mono.empty());
@@ -492,7 +494,7 @@ void sendMessagesList() {
         final int count = 4;
         final List<ServiceBusMessage> messages = TestUtils.getServiceBusMessages(count, UUID.randomUUID().toString());
 
-        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull()))
+        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull(), eq(CLIENT_IDENTIFIER)))
             .thenReturn(Mono.just(sendLink));
         when(sendLink.send(anyList())).thenReturn(Mono.empty());
 
@@ -519,7 +521,7 @@ void sendMessagesListExceedSize() {
         final Mono<Integer> linkMaxSize = Mono.just(1);
         final List<ServiceBusMessage> messages = TestUtils.getServiceBusMessages(count, UUID.randomUUID().toString());
 
-        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull()))
+        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull(), eq(CLIENT_IDENTIFIER)))
             .thenReturn(Mono.just(sendLink));
         when(sendLink.getLinkSize()).thenReturn(linkMaxSize);
 
@@ -536,7 +538,7 @@ void sendSingleMessageThatExceedsSize() {
         // arrange
         ServiceBusMessage message = TestUtils.getServiceBusMessages(1, UUID.randomUUID().toString()).get(0);
 
-        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull()))
+        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull(), eq(CLIENT_IDENTIFIER)))
                 .thenReturn(Mono.just(sendLink));
         when(sendLink.getLinkSize()).thenReturn(Mono.just(1));
 
@@ -556,7 +558,7 @@ void sendSingleMessageWithTransaction() {
         // Arrange
         final ServiceBusMessage testData = new ServiceBusMessage(TEST_CONTENTS);
 
-        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull()))
+        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull(), eq(CLIENT_IDENTIFIER)))
             .thenReturn(Mono.just(sendLink));
 
         when(sendLink.getLinkSize()).thenReturn(Mono.just(MAX_MESSAGE_LENGTH_BYTES));
@@ -590,7 +592,7 @@ void sendSingleMessage() {
             new ServiceBusMessage(TEST_CONTENTS);
 
         // EC is the prefix they use when creating a link that sends to the service round-robin.
-        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull()))
+        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull(), eq(CLIENT_IDENTIFIER)))
             .thenReturn(Mono.just(sendLink));
 
         when(sendLink.getLinkSize()).thenReturn(Mono.just(MAX_MESSAGE_LENGTH_BYTES));
@@ -614,7 +616,7 @@ void scheduleMessage() {
         long sequenceNumberReturned = 10;
         OffsetDateTime instant = mock(OffsetDateTime.class);
 
-        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull()))
+        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), eq(CLIENT_IDENTIFIER)))
             .thenReturn(Mono.just(sendLink));
         when(sendLink.getLinkSize()).thenReturn(Mono.just(MAX_MESSAGE_LENGTH_BYTES));
         when(managementNode.schedule(anyList(), eq(instant), any(Integer.class), any(), isNull()))
@@ -637,7 +639,7 @@ void scheduleMessageWithTransaction() {
         // Arrange
         final long sequenceNumberReturned = 10;
         final OffsetDateTime instant = mock(OffsetDateTime.class);
-        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull()))
+        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), eq(CLIENT_IDENTIFIER)))
             .thenReturn(Mono.just(sendLink));
         when(sendLink.getLinkSize()).thenReturn(Mono.just(MAX_MESSAGE_LENGTH_BYTES));
         when(managementNode.schedule(anyList(), eq(instant), eq(MAX_MESSAGE_LENGTH_BYTES), eq(LINK_NAME), argThat(e -> e.getTransactionId().equals(transactionContext.getTransactionId()))))
@@ -721,7 +723,7 @@ void verifyMessageOrdering() {
         messages.add(fourthMessage);
         messages.add(fifthMessage);
 
-        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull()))
+        when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull(), eq(CLIENT_IDENTIFIER)))
             .thenReturn(Mono.just(sendLink));
         when(sendLink.send(anyList())).thenReturn(Mono.empty());
 
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderClientTest.java
index 80d5c5d46e9f2..b2e4d2e62d454 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderClientTest.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderClientTest.java
@@ -34,6 +34,7 @@
 public class ServiceBusSenderClientTest {
     private static final String NAMESPACE = "my-namespace";
     private static final String ENTITY_NAME = "my-servicebus-entity";
+    private static final String CLIENT_IDENTIFIER = "my-client-identifier";
 
     @Mock
     private ServiceBusSenderAsyncClient asyncSender;
@@ -61,6 +62,7 @@ void setup() {
         MockitoAnnotations.initMocks(this);
         when(asyncSender.getEntityPath()).thenReturn(ENTITY_NAME);
         when(asyncSender.getFullyQualifiedNamespace()).thenReturn(NAMESPACE);
+        when(asyncSender.getIdentifier()).thenReturn(CLIENT_IDENTIFIER);
         sender = new ServiceBusSenderClient(asyncSender, RETRY_TIMEOUT);
     }
 
@@ -74,6 +76,7 @@ void teardown() {
     void verifyProperties() {
         Assertions.assertEquals(ENTITY_NAME, sender.getEntityPath());
         Assertions.assertEquals(NAMESPACE, sender.getFullyQualifiedNamespace());
+        Assertions.assertEquals(CLIENT_IDENTIFIER, sender.getIdentifier());
     }
 
     /**
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerTest.java
index 3ef53b50ff2a6..9ad4ddfc664db 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerTest.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerTest.java
@@ -83,6 +83,7 @@ class ServiceBusSessionManagerTest {
     private static final String NAMESPACE = "my-namespace-foo.net";
     private static final String ENTITY_PATH = "queue-name";
     private static final MessagingEntityType ENTITY_TYPE = MessagingEntityType.QUEUE;
+    private static final String CLIENT_IDENTIFIER = "my-client-identifier";
 
     private static final ClientLogger LOGGER = new ClientLogger(ServiceBusReceiverAsyncClientTest.class);
     private final ReplayProcessor<AmqpEndpointState> endpointProcessor = ReplayProcessor.cacheLast();
@@ -173,12 +174,23 @@ void afterEach(TestInfo testInfo) throws Exception {
         }
     }
 
+    @Test
+    void properties() {
+        // Arrange
+        ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, false, null, 5);
+        sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor,
+            tracerProvider, messageSerializer, receiverOptions, CLIENT_IDENTIFIER);
+
+        // Act & Assert
+        assertEquals(CLIENT_IDENTIFIER, sessionManager.getIdentifier());
+    }
+
     @Test
     void receiveNull() {
         // Arrange
         ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, false, null, 5);
         sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor,
-            tracerProvider, messageSerializer, receiverOptions);
+            tracerProvider, messageSerializer, receiverOptions, CLIENT_IDENTIFIER);
 
         // Act & Assert
         StepVerifier.create(sessionManager.receive())
@@ -195,7 +207,7 @@ void singleUnnamedSession() {
         ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, false, null,
             5);
         sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor,
-            tracerProvider, messageSerializer, receiverOptions);
+            tracerProvider, messageSerializer, receiverOptions, CLIENT_IDENTIFIER);
 
         final String sessionId = "session-1";
         final String lockToken = "a-lock-token";
@@ -220,7 +232,7 @@ void singleUnnamedSession() {
         when(amqpReceiveLink.closeAsync()).thenReturn(Mono.empty());
 
         when(connection.createReceiveLink(anyString(), eq(ENTITY_PATH), any(ServiceBusReceiveMode.class), isNull(),
-            any(MessagingEntityType.class), isNull())).thenReturn(Mono.just(amqpReceiveLink));
+            any(MessagingEntityType.class), eq(CLIENT_IDENTIFIER), isNull())).thenReturn(Mono.just(amqpReceiveLink));
 
         when(managementNode.renewSessionLock(sessionId, linkName)).thenReturn(
             Mono.fromCallable(() -> OffsetDateTime.now().plus(Duration.ofSeconds(5))));
@@ -250,7 +262,7 @@ void singleUnnamedSessionLockRenew() {
         ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, false, null,
             1);
         sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor,
-            tracerProvider, messageSerializer, receiverOptions);
+            tracerProvider, messageSerializer, receiverOptions, CLIENT_IDENTIFIER);
 
         final String sessionId = "session-1";
         final String lockToken = "a-lock-token";
@@ -275,7 +287,7 @@ void singleUnnamedSessionLockRenew() {
         when(amqpReceiveLink.closeAsync()).thenReturn(Mono.empty());
 
         when(connection.createReceiveLink(anyString(), eq(ENTITY_PATH), any(ServiceBusReceiveMode.class), isNull(),
-            any(MessagingEntityType.class), isNull())).thenReturn(Mono.just(amqpReceiveLink));
+            any(MessagingEntityType.class), eq(CLIENT_IDENTIFIER), isNull())).thenReturn(Mono.just(amqpReceiveLink));
 
         AtomicBoolean sessionLockRenewCalled = new AtomicBoolean();
         when(managementNode.renewSessionLock(sessionId, linkName)).thenReturn(
@@ -308,7 +320,7 @@ void multipleSessions() {
         final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, true,
             null, 5);
         sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor,
-            tracerProvider, messageSerializer, receiverOptions);
+            tracerProvider, messageSerializer, receiverOptions, CLIENT_IDENTIFIER);
 
         final int numberOfMessages = 5;
         final Callable<OffsetDateTime> onRenewal = () -> OffsetDateTime.now().plus(Duration.ofSeconds(5));
@@ -356,7 +368,7 @@ void multipleSessions() {
 
         final AtomicInteger count = new AtomicInteger();
         when(connection.createReceiveLink(anyString(), eq(ENTITY_PATH), any(ServiceBusReceiveMode.class), isNull(),
-            any(MessagingEntityType.class), isNull())).thenAnswer(invocation -> {
+            any(MessagingEntityType.class), eq(CLIENT_IDENTIFIER), isNull())).thenAnswer(invocation -> {
                 final int number = count.getAndIncrement();
                 switch (number) {
                     case 0:
@@ -437,7 +449,7 @@ void multipleReceiveUnnamedSession() {
             null, 1);
 
         sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor,
-            tracerProvider, messageSerializer, receiverOptions);
+            tracerProvider, messageSerializer, receiverOptions, CLIENT_IDENTIFIER);
 
         final String sessionId = "session-1";
         final String linkName = "my-link-name";
@@ -467,7 +479,7 @@ void multipleReceiveUnnamedSession() {
 
         final AtomicInteger count = new AtomicInteger();
         when(connection.createReceiveLink(anyString(), eq(ENTITY_PATH), any(ServiceBusReceiveMode.class), isNull(),
-            any(MessagingEntityType.class), isNull())).thenAnswer(invocation -> {
+            any(MessagingEntityType.class), eq(CLIENT_IDENTIFIER), isNull())).thenAnswer(invocation -> {
                 final int number = count.getAndIncrement();
                 switch (number) {
                     case 0:
@@ -492,7 +504,7 @@ void multipleReceiveUnnamedSession() {
 
         verify(connection, times(2)).createReceiveLink(linkNameCaptor.capture(), eq(ENTITY_PATH), any(
             ServiceBusReceiveMode.class), isNull(),
-            any(MessagingEntityType.class), isNull());
+            any(MessagingEntityType.class), eq(CLIENT_IDENTIFIER), isNull());
 
         final List<String> actualLinksCreated = linkNameCaptor.getAllValues();
         assertNotNull(actualLinksCreated);
@@ -509,7 +521,7 @@ void singleUnnamedSessionCleanupAfterTimeout() {
         ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, false, null,
             2);
         sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor,
-            tracerProvider, messageSerializer, receiverOptions);
+            tracerProvider, messageSerializer, receiverOptions, CLIENT_IDENTIFIER);
 
         final String sessionId = "session-1";
         final String lockToken = "a-lock-token";
@@ -528,7 +540,7 @@ void singleUnnamedSessionCleanupAfterTimeout() {
         when(amqpReceiveLink.getSessionLockedUntil())
             .thenAnswer(invocation -> Mono.just(sessionLockedUntil));
         when(connection.createReceiveLink(anyString(), eq(ENTITY_PATH), any(ServiceBusReceiveMode.class), isNull(),
-            any(MessagingEntityType.class), isNull())).thenReturn(Mono.just(amqpReceiveLink));
+            any(MessagingEntityType.class), eq(CLIENT_IDENTIFIER), isNull())).thenReturn(Mono.just(amqpReceiveLink));
         when(amqpReceiveLink.addCredits(anyInt())).thenReturn(Mono.empty());
         when(amqpReceiveLink.closeAsync()).thenReturn(Mono.empty());
 
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClientTest.java
index c400f893ae6e9..29887381876e8 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClientTest.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClientTest.java
@@ -66,6 +66,7 @@ class ServiceBusSessionReceiverAsyncClientTest {
     private static final String NAMESPACE = "my-namespace-foo.net";
     private static final String ENTITY_PATH = "queue-name";
     private static final MessagingEntityType ENTITY_TYPE = MessagingEntityType.QUEUE;
+    private static final String CLIENT_IDENTIFIER = "my-client-identifier";
 
     private static final ClientLogger LOGGER = new ClientLogger(ServiceBusReceiverAsyncClientTest.class);
     private final ReplayProcessor<AmqpEndpointState> endpointProcessor = ReplayProcessor.cacheLast();
@@ -174,13 +175,13 @@ void acceptSession() {
         when(amqpReceiveLink.updateDisposition(lockToken, Accepted.getInstance())).thenReturn(Mono.empty());
 
         when(connection.createReceiveLink(anyString(), eq(ENTITY_PATH), any(ServiceBusReceiveMode.class), isNull(),
-            any(MessagingEntityType.class), eq(sessionId))).thenReturn(Mono.just(amqpReceiveLink));
+            any(MessagingEntityType.class), eq(CLIENT_IDENTIFIER), eq(sessionId))).thenReturn(Mono.just(amqpReceiveLink));
 
         ServiceBusSessionReceiverAsyncClient client = new ServiceBusSessionReceiverAsyncClient(
             NAMESPACE, ENTITY_PATH,
             MessagingEntityType.QUEUE, receiverOptions,
             connectionProcessor, tracerProvider,
-            messageSerializer, () -> { }
+            messageSerializer, () -> { }, CLIENT_IDENTIFIER
         );
 
         // Act & Assert
@@ -204,7 +205,7 @@ void acceptNextSession() {
         // Arrange
         ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, Duration.ZERO, false, null, null);
         sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor,
-            tracerProvider, messageSerializer, receiverOptions);
+            tracerProvider, messageSerializer, receiverOptions, CLIENT_IDENTIFIER);
 
         final int numberOfMessages = 5;
         final Callable<OffsetDateTime> onRenewal = () -> OffsetDateTime.now().plus(Duration.ofSeconds(5));
@@ -250,7 +251,7 @@ void acceptNextSession() {
 
         final AtomicInteger count = new AtomicInteger();
         when(connection.createReceiveLink(anyString(), eq(ENTITY_PATH), any(ServiceBusReceiveMode.class), isNull(),
-            any(MessagingEntityType.class), isNull())).thenAnswer(invocation -> {
+            any(MessagingEntityType.class), eq(CLIENT_IDENTIFIER), isNull())).thenAnswer(invocation -> {
                 final int number = count.getAndIncrement();
                 switch (number) {
                     case 0:
@@ -272,7 +273,7 @@ void acceptNextSession() {
             NAMESPACE, ENTITY_PATH,
             MessagingEntityType.QUEUE, receiverOptions,
             connectionProcessor, tracerProvider,
-            messageSerializer, () -> { }
+            messageSerializer, () -> { }, CLIENT_IDENTIFIER
         );
 
         // Act & Assert
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSessionTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSessionTest.java
index eb001e2adfa3b..6cd68a6c5ddd7 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSessionTest.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSessionTest.java
@@ -82,6 +82,7 @@ public class ServiceBusReactorSessionTest {
     private static final String ENTITY_PATH = "entityPath";
     private static final String VIA_ENTITY_PATH = "viaEntityPath";
     private static final String VIA_ENTITY_PATH_SENDER_LINK_NAME = "VIA-" + VIA_ENTITY_PATH;
+    private static final String CLIENT_IDENTIFIER = "clientIdentifier";
 
     @Mock
     private Reactor reactor;
@@ -220,7 +221,7 @@ void createViaSenderLink() throws IOException {
 
         // Act
         serviceBusReactorSession.createProducer(VIA_ENTITY_PATH_SENDER_LINK_NAME, VIA_ENTITY_PATH,
-            retryOptions.getTryTimeout(), retryPolicy, ENTITY_PATH)
+            retryOptions.getTryTimeout(), retryPolicy, ENTITY_PATH, CLIENT_IDENTIFIER)
             .subscribe();
 
         // Assert
@@ -250,7 +251,7 @@ void createViaSenderLinkDestinationEntityAuthorizeFails() throws IOException {
 
         // Act
         StepVerifier.create(serviceBusReactorSession.createProducer(VIA_ENTITY_PATH_SENDER_LINK_NAME, VIA_ENTITY_PATH,
-            retryOptions.getTryTimeout(), retryPolicy, ENTITY_PATH))
+            retryOptions.getTryTimeout(), retryPolicy, ENTITY_PATH, CLIENT_IDENTIFIER))
             .verifyError(RuntimeException.class);
 
         // Assert