diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClient.java index d4e07229dd039..7ef35cba767b3 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClient.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClient.java @@ -5,6 +5,7 @@ import com.azure.core.annotation.ServiceClient; import com.azure.core.util.logging.ClientLogger; +import com.azure.core.util.tracing.Tracer; import com.azure.messaging.eventhubs.implementation.PartitionProcessor; import com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsTracer; import com.azure.messaging.eventhubs.models.ErrorContext; @@ -80,7 +81,7 @@ public class EventProcessorClient { boolean trackLastEnqueuedEventProperties, Consumer processError, Map initialPartitionEventPosition, int maxBatchSize, Duration maxWaitTime, boolean batchReceiveMode, Duration loadBalancerUpdateInterval, Duration partitionOwnershipExpirationInterval, - LoadBalancingStrategy loadBalancingStrategy) { + LoadBalancingStrategy loadBalancingStrategy, Tracer tracer) { Objects.requireNonNull(eventHubClientBuilder, "eventHubClientBuilder cannot be null."); Objects.requireNonNull(consumerGroup, "consumerGroup cannot be null."); @@ -100,7 +101,7 @@ public class EventProcessorClient { this.consumerGroup = consumerGroup.toLowerCase(Locale.ROOT); this.loadBalancerUpdateInterval = loadBalancerUpdateInterval; - EventHubsTracer ehTracer = new EventHubsTracer(eventHubClientBuilder.createTracer(), fullyQualifiedNamespace, eventHubName); + EventHubsTracer ehTracer = new EventHubsTracer(tracer, fullyQualifiedNamespace, eventHubName); this.partitionPumpManager = new PartitionPumpManager(checkpointStore, partitionProcessorFactory, eventHubClientBuilder, trackLastEnqueuedEventProperties, ehTracer, initialPartitionEventPosition, maxBatchSize, maxWaitTime, batchReceiveMode); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClientBuilder.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClientBuilder.java index a54dbe81f764a..5a4d4c4a80bdb 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClientBuilder.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClientBuilder.java @@ -746,7 +746,7 @@ public EventProcessorClient buildEventProcessorClient() { return new EventProcessorClient(eventHubClientBuilder, consumerGroup, getPartitionProcessorSupplier(), checkpointStore, trackLastEnqueuedEventProperties, processError, initialPartitionEventPosition, maxBatchSize, maxWaitTime, processEventBatch != null, - loadBalancingUpdateInterval, partitionOwnershipExpirationInterval, loadBalancingStrategy); + loadBalancingUpdateInterval, partitionOwnershipExpirationInterval, loadBalancingStrategy, eventHubClientBuilder.createTracer()); } private Supplier getPartitionProcessorSupplier() { diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientErrorHandlingTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientErrorHandlingTest.java index d7d9d9968ca99..bd44f2baca044 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientErrorHandlingTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientErrorHandlingTest.java @@ -3,6 +3,7 @@ package com.azure.messaging.eventhubs; +import com.azure.core.util.metrics.Meter; import com.azure.core.util.tracing.Tracer; import com.azure.messaging.eventhubs.implementation.PartitionProcessor; import com.azure.messaging.eventhubs.models.Checkpoint; @@ -57,6 +58,9 @@ public class EventProcessorClientErrorHandlingTest { @Mock private Tracer tracer; + @Mock + private Meter meter; + private CountDownLatch countDownLatch; @BeforeEach @@ -67,7 +71,6 @@ public void setup() { when(eventHubAsyncClient.getFullyQualifiedNamespace()).thenReturn("test-ns"); when(eventHubAsyncClient.getEventHubName()).thenReturn("test-eh"); when(eventHubAsyncClient.getIdentifier()).thenReturn("my-client-identifier"); - when(eventHubClientBuilder.createTracer()).thenReturn(tracer); } @ParameterizedTest(name = "{displayName} with [{arguments}]") @@ -80,7 +83,7 @@ public void testCheckpointStoreErrors(CheckpointStore checkpointStore) throws In Assertions.assertEquals("NONE", errorContext.getPartitionContext().getPartitionId()); Assertions.assertEquals("cg", errorContext.getPartitionContext().getConsumerGroup()); Assertions.assertTrue(errorContext.getThrowable() instanceof IllegalStateException); - }, new HashMap<>(), 1, null, false, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED); + }, new HashMap<>(), 1, null, false, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED, tracer); client.start(); boolean completed = countDownLatch.await(3, TimeUnit.SECONDS); try { @@ -101,7 +104,7 @@ public void testProcessEventHandlerError() throws InterruptedException { EventProcessorClient client = new EventProcessorClient(eventHubClientBuilder, "cg", () -> new BadProcessEventHandler(countDownLatch), new SampleCheckpointStore(), false, errorContext -> { }, new HashMap<>(), 1, null, false, Duration.ofSeconds(10), Duration.ofMinutes(1), - LoadBalancingStrategy.BALANCED); + LoadBalancingStrategy.BALANCED, tracer); client.start(); boolean completed = countDownLatch.await(3, TimeUnit.SECONDS); client.stop(); @@ -117,7 +120,7 @@ public void testInitHandlerError() throws InterruptedException { EventProcessorClient client = new EventProcessorClient(eventHubClientBuilder, "cg", () -> new BadInitHandler(countDownLatch), new SampleCheckpointStore(), false, errorContext -> { }, new HashMap<>(), 1, null, false, Duration.ofSeconds(10), Duration.ofMinutes(1), - LoadBalancingStrategy.BALANCED); + LoadBalancingStrategy.BALANCED, tracer); client.start(); boolean completed = countDownLatch.await(3, TimeUnit.SECONDS); client.stop(); @@ -134,7 +137,7 @@ public void testCloseHandlerError() throws InterruptedException { EventProcessorClient client = new EventProcessorClient(eventHubClientBuilder, "cg", () -> new BadCloseHandler(countDownLatch), new SampleCheckpointStore(), false, errorContext -> { }, new HashMap<>(), 1, null, false, Duration.ofSeconds(10), Duration.ofMinutes(1), - LoadBalancingStrategy.BALANCED); + LoadBalancingStrategy.BALANCED, tracer); client.start(); boolean completed = countDownLatch.await(3, TimeUnit.SECONDS); client.stop(); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java index f7bae1a949c60..dd1029b712b0b 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java @@ -152,7 +152,7 @@ public void testWithSimplePartitionProcessor() throws Exception { // Act final EventProcessorClient eventProcessorClient = new EventProcessorClient(eventHubClientBuilder, "test-consumer", () -> testPartitionProcessor, checkpointStore, false, ec -> { }, new HashMap<>(), - 1, null, false, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED); + 1, null, false, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED, tracer); eventProcessorClient.start(); TimeUnit.SECONDS.sleep(10); @@ -251,7 +251,7 @@ public void testProcessSpans() throws Exception { //Act EventProcessorClient eventProcessorClient = new EventProcessorClient(eventHubClientBuilder, "test-consumer", TestPartitionProcessor::new, checkpointStore, false, ec -> { }, new HashMap<>(), - 1, null, false, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED); + 1, null, false, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED, tracer1); eventProcessorClient.start(); assertTrue(latch.await(10, TimeUnit.SECONDS)); @@ -334,7 +334,7 @@ public void testProcessBatchSpans() throws Exception { //Act EventProcessorClient eventProcessorClient = new EventProcessorClient(eventHubClientBuilder, "test-consumer", TestPartitionProcessor::new, checkpointStore, false, ec -> { }, new HashMap<>(), - 2, null, true, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED); + 2, null, true, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED, tracer1); eventProcessorClient.start(); assertTrue(latch.await(10, TimeUnit.SECONDS)); @@ -401,7 +401,7 @@ public void testProcessSpansWithoutDiagnosticId() throws Exception { //Act EventProcessorClient eventProcessorClient = new EventProcessorClient(eventHubClientBuilder, "test-consumer", () -> testPartitionProcessor, checkpointStore, false, ec -> { }, new HashMap<>(), - 1, null, false, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED); + 1, null, false, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED, tracer); eventProcessorClient.start(); boolean success = countDownLatch.await(10, TimeUnit.SECONDS); @@ -468,7 +468,7 @@ public void testWithMultiplePartitions() throws Exception { final EventProcessorClient eventProcessorClient = new EventProcessorClient(eventHubClientBuilder, "test-consumer", TestPartitionProcessor::new, checkpointStore, false, ec -> { }, new HashMap<>(), - 1, null, false, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED); + 1, null, false, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED, null); eventProcessorClient.start(); final boolean completed = count.await(10, TimeUnit.SECONDS); eventProcessorClient.stop(); @@ -523,7 +523,7 @@ public void testPrefetchCountSet() throws Exception { final EventProcessorClient eventProcessorClient = new EventProcessorClient(eventHubClientBuilder, consumerGroup, () -> testPartitionProcessor, checkpointStore, false, ec -> { }, new HashMap<>(), - 2, Duration.ofSeconds(1), true, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED); + 2, Duration.ofSeconds(1), true, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED, null); // Act eventProcessorClient.start(); @@ -567,7 +567,7 @@ public void testDefaultPrefetch() throws Exception { final EventProcessorClient eventProcessorClient = new EventProcessorClient(eventHubClientBuilder, consumerGroup, () -> testPartitionProcessor, checkpointStore, false, ec -> { }, new HashMap<>(), - 2, Duration.ofSeconds(1), true, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED); + 2, Duration.ofSeconds(1), true, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED, null); // Act eventProcessorClient.start(); @@ -609,7 +609,7 @@ public void testBatchReceive() throws Exception { final EventProcessorClient eventProcessorClient = new EventProcessorClient(eventHubClientBuilder, "test-consumer", () -> testPartitionProcessor, checkpointStore, false, ec -> { }, new HashMap<>(), - 2, Duration.ofSeconds(1), true, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED); + 2, Duration.ofSeconds(1), true, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED, null); // Act eventProcessorClient.start(); @@ -649,7 +649,7 @@ public void testBatchReceiveHeartBeat() throws InterruptedException { final EventProcessorClient eventProcessorClient = new EventProcessorClient(eventHubClientBuilder, "test-consumer", () -> testPartitionProcessor, checkpointStore, false, ec -> { }, new HashMap<>(), - 2, Duration.ofSeconds(1), true, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED); + 2, Duration.ofSeconds(1), true, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED, null); // Act eventProcessorClient.start(); @@ -710,7 +710,7 @@ public void testSingleEventReceiveHeartBeat() throws InterruptedException { final EventProcessorClient eventProcessorClient = new EventProcessorClient(eventHubClientBuilder, "test-consumer", () -> testPartitionProcessor, checkpointStore, false, ec -> { }, new HashMap<>(), - 1, Duration.ofSeconds(1), false, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED); + 1, Duration.ofSeconds(1), false, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED, null); eventProcessorClient.start(); boolean completed = countDownLatch.await(20, TimeUnit.SECONDS); eventProcessorClient.stop();