Skip to content

Commit

Permalink
pass tracer explicitly
Browse files Browse the repository at this point in the history
  • Loading branch information
lmolkova committed Mar 4, 2023
1 parent 3acd19a commit 145b0e9
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.azure.core.annotation.ServiceClient;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.tracing.Tracer;
import com.azure.messaging.eventhubs.implementation.PartitionProcessor;
import com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsTracer;
import com.azure.messaging.eventhubs.models.ErrorContext;
Expand Down Expand Up @@ -80,7 +81,7 @@ public class EventProcessorClient {
boolean trackLastEnqueuedEventProperties, Consumer<ErrorContext> processError,
Map<String, EventPosition> initialPartitionEventPosition, int maxBatchSize, Duration maxWaitTime,
boolean batchReceiveMode, Duration loadBalancerUpdateInterval, Duration partitionOwnershipExpirationInterval,
LoadBalancingStrategy loadBalancingStrategy) {
LoadBalancingStrategy loadBalancingStrategy, Tracer tracer) {

Objects.requireNonNull(eventHubClientBuilder, "eventHubClientBuilder cannot be null.");
Objects.requireNonNull(consumerGroup, "consumerGroup cannot be null.");
Expand All @@ -100,7 +101,7 @@ public class EventProcessorClient {
this.consumerGroup = consumerGroup.toLowerCase(Locale.ROOT);
this.loadBalancerUpdateInterval = loadBalancerUpdateInterval;

EventHubsTracer ehTracer = new EventHubsTracer(eventHubClientBuilder.createTracer(), fullyQualifiedNamespace, eventHubName);
EventHubsTracer ehTracer = new EventHubsTracer(tracer, fullyQualifiedNamespace, eventHubName);
this.partitionPumpManager = new PartitionPumpManager(checkpointStore, partitionProcessorFactory,
eventHubClientBuilder, trackLastEnqueuedEventProperties, ehTracer, initialPartitionEventPosition,
maxBatchSize, maxWaitTime, batchReceiveMode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ public EventProcessorClient buildEventProcessorClient() {
return new EventProcessorClient(eventHubClientBuilder, consumerGroup,
getPartitionProcessorSupplier(), checkpointStore, trackLastEnqueuedEventProperties,
processError, initialPartitionEventPosition, maxBatchSize, maxWaitTime, processEventBatch != null,
loadBalancingUpdateInterval, partitionOwnershipExpirationInterval, loadBalancingStrategy);
loadBalancingUpdateInterval, partitionOwnershipExpirationInterval, loadBalancingStrategy, eventHubClientBuilder.createTracer());
}

private Supplier<PartitionProcessor> getPartitionProcessorSupplier() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.messaging.eventhubs;

import com.azure.core.util.metrics.Meter;
import com.azure.core.util.tracing.Tracer;
import com.azure.messaging.eventhubs.implementation.PartitionProcessor;
import com.azure.messaging.eventhubs.models.Checkpoint;
Expand Down Expand Up @@ -57,6 +58,9 @@ public class EventProcessorClientErrorHandlingTest {
@Mock
private Tracer tracer;

@Mock
private Meter meter;

private CountDownLatch countDownLatch;

@BeforeEach
Expand All @@ -67,7 +71,6 @@ public void setup() {
when(eventHubAsyncClient.getFullyQualifiedNamespace()).thenReturn("test-ns");
when(eventHubAsyncClient.getEventHubName()).thenReturn("test-eh");
when(eventHubAsyncClient.getIdentifier()).thenReturn("my-client-identifier");
when(eventHubClientBuilder.createTracer()).thenReturn(tracer);
}

@ParameterizedTest(name = "{displayName} with [{arguments}]")
Expand All @@ -80,7 +83,7 @@ public void testCheckpointStoreErrors(CheckpointStore checkpointStore) throws In
Assertions.assertEquals("NONE", errorContext.getPartitionContext().getPartitionId());
Assertions.assertEquals("cg", errorContext.getPartitionContext().getConsumerGroup());
Assertions.assertTrue(errorContext.getThrowable() instanceof IllegalStateException);
}, new HashMap<>(), 1, null, false, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED);
}, new HashMap<>(), 1, null, false, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED, tracer);
client.start();
boolean completed = countDownLatch.await(3, TimeUnit.SECONDS);
try {
Expand All @@ -101,7 +104,7 @@ public void testProcessEventHandlerError() throws InterruptedException {
EventProcessorClient client = new EventProcessorClient(eventHubClientBuilder, "cg",
() -> new BadProcessEventHandler(countDownLatch), new SampleCheckpointStore(), false,
errorContext -> { }, new HashMap<>(), 1, null, false, Duration.ofSeconds(10), Duration.ofMinutes(1),
LoadBalancingStrategy.BALANCED);
LoadBalancingStrategy.BALANCED, tracer);
client.start();
boolean completed = countDownLatch.await(3, TimeUnit.SECONDS);
client.stop();
Expand All @@ -117,7 +120,7 @@ public void testInitHandlerError() throws InterruptedException {
EventProcessorClient client = new EventProcessorClient(eventHubClientBuilder, "cg",
() -> new BadInitHandler(countDownLatch), new SampleCheckpointStore(), false,
errorContext -> { }, new HashMap<>(), 1, null, false, Duration.ofSeconds(10), Duration.ofMinutes(1),
LoadBalancingStrategy.BALANCED);
LoadBalancingStrategy.BALANCED, tracer);
client.start();
boolean completed = countDownLatch.await(3, TimeUnit.SECONDS);
client.stop();
Expand All @@ -134,7 +137,7 @@ public void testCloseHandlerError() throws InterruptedException {
EventProcessorClient client = new EventProcessorClient(eventHubClientBuilder, "cg",
() -> new BadCloseHandler(countDownLatch), new SampleCheckpointStore(), false,
errorContext -> { }, new HashMap<>(), 1, null, false, Duration.ofSeconds(10), Duration.ofMinutes(1),
LoadBalancingStrategy.BALANCED);
LoadBalancingStrategy.BALANCED, tracer);
client.start();
boolean completed = countDownLatch.await(3, TimeUnit.SECONDS);
client.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void testWithSimplePartitionProcessor() throws Exception {
// Act
final EventProcessorClient eventProcessorClient = new EventProcessorClient(eventHubClientBuilder, "test-consumer",
() -> testPartitionProcessor, checkpointStore, false, ec -> { }, new HashMap<>(),
1, null, false, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED);
1, null, false, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED, tracer);
eventProcessorClient.start();
TimeUnit.SECONDS.sleep(10);

Expand Down Expand Up @@ -251,7 +251,7 @@ public void testProcessSpans() throws Exception {
//Act
EventProcessorClient eventProcessorClient = new EventProcessorClient(eventHubClientBuilder, "test-consumer",
TestPartitionProcessor::new, checkpointStore, false, ec -> { }, new HashMap<>(),
1, null, false, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED);
1, null, false, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED, tracer1);

eventProcessorClient.start();
assertTrue(latch.await(10, TimeUnit.SECONDS));
Expand Down Expand Up @@ -334,7 +334,7 @@ public void testProcessBatchSpans() throws Exception {
//Act
EventProcessorClient eventProcessorClient = new EventProcessorClient(eventHubClientBuilder, "test-consumer",
TestPartitionProcessor::new, checkpointStore, false, ec -> { }, new HashMap<>(),
2, null, true, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED);
2, null, true, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED, tracer1);

eventProcessorClient.start();
assertTrue(latch.await(10, TimeUnit.SECONDS));
Expand Down Expand Up @@ -401,7 +401,7 @@ public void testProcessSpansWithoutDiagnosticId() throws Exception {
//Act
EventProcessorClient eventProcessorClient = new EventProcessorClient(eventHubClientBuilder, "test-consumer",
() -> testPartitionProcessor, checkpointStore, false, ec -> { }, new HashMap<>(),
1, null, false, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED);
1, null, false, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED, tracer);

eventProcessorClient.start();
boolean success = countDownLatch.await(10, TimeUnit.SECONDS);
Expand Down Expand Up @@ -468,7 +468,7 @@ public void testWithMultiplePartitions() throws Exception {
final EventProcessorClient eventProcessorClient = new EventProcessorClient(eventHubClientBuilder,
"test-consumer",
TestPartitionProcessor::new, checkpointStore, false, ec -> { }, new HashMap<>(),
1, null, false, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED);
1, null, false, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED, null);
eventProcessorClient.start();
final boolean completed = count.await(10, TimeUnit.SECONDS);
eventProcessorClient.stop();
Expand Down Expand Up @@ -523,7 +523,7 @@ public void testPrefetchCountSet() throws Exception {

final EventProcessorClient eventProcessorClient = new EventProcessorClient(eventHubClientBuilder, consumerGroup,
() -> testPartitionProcessor, checkpointStore, false, ec -> { }, new HashMap<>(),
2, Duration.ofSeconds(1), true, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED);
2, Duration.ofSeconds(1), true, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED, null);

// Act
eventProcessorClient.start();
Expand Down Expand Up @@ -567,7 +567,7 @@ public void testDefaultPrefetch() throws Exception {

final EventProcessorClient eventProcessorClient = new EventProcessorClient(eventHubClientBuilder, consumerGroup,
() -> testPartitionProcessor, checkpointStore, false, ec -> { }, new HashMap<>(),
2, Duration.ofSeconds(1), true, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED);
2, Duration.ofSeconds(1), true, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED, null);

// Act
eventProcessorClient.start();
Expand Down Expand Up @@ -609,7 +609,7 @@ public void testBatchReceive() throws Exception {

final EventProcessorClient eventProcessorClient = new EventProcessorClient(eventHubClientBuilder, "test-consumer",
() -> testPartitionProcessor, checkpointStore, false, ec -> { }, new HashMap<>(),
2, Duration.ofSeconds(1), true, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED);
2, Duration.ofSeconds(1), true, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED, null);

// Act
eventProcessorClient.start();
Expand Down Expand Up @@ -649,7 +649,7 @@ public void testBatchReceiveHeartBeat() throws InterruptedException {

final EventProcessorClient eventProcessorClient = new EventProcessorClient(eventHubClientBuilder, "test-consumer",
() -> testPartitionProcessor, checkpointStore, false, ec -> { }, new HashMap<>(),
2, Duration.ofSeconds(1), true, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED);
2, Duration.ofSeconds(1), true, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED, null);

// Act
eventProcessorClient.start();
Expand Down Expand Up @@ -710,7 +710,7 @@ public void testSingleEventReceiveHeartBeat() throws InterruptedException {

final EventProcessorClient eventProcessorClient = new EventProcessorClient(eventHubClientBuilder, "test-consumer",
() -> testPartitionProcessor, checkpointStore, false, ec -> { }, new HashMap<>(),
1, Duration.ofSeconds(1), false, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED);
1, Duration.ofSeconds(1), false, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED, null);
eventProcessorClient.start();
boolean completed = countDownLatch.await(20, TimeUnit.SECONDS);
eventProcessorClient.stop();
Expand Down

0 comments on commit 145b0e9

Please sign in to comment.