Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(issue#25182): eventhub and svervicebus processor client start span independently #26180

Merged
merged 6 commits into from
Jan 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Release History

## 5.10.5-beta.1 (Unreleased)

### Bugs Fixed

- Fixed a bug that when received message does not have trace context, span is not created. ([#25182](https://github.com/Azure/azure-sdk-for-java/issues/25182))

## 5.10.4 (2022-01-18)

### Other Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
Expand Down Expand Up @@ -366,11 +367,12 @@ private void cleanup(PartitionOwnership claimedOwnership, PartitionPump partitio
*/
private Context startProcessTracingSpan(EventData eventData, String eventHubName, String fullyQualifiedNamespace) {
Object diagnosticId = eventData.getProperties().get(DIAGNOSTIC_ID_KEY);
if (diagnosticId == null || !tracerProvider.isEnabled()) {
if (tracerProvider == null || !tracerProvider.isEnabled()) {
return Context.NONE;
}

Context spanContext = tracerProvider.extractContext(diagnosticId.toString(), Context.NONE)
Context spanContext = Objects.isNull(diagnosticId) ? Context.NONE : tracerProvider.extractContext(diagnosticId.toString(), Context.NONE);
spanContext = spanContext
.addData(ENTITY_PATH_KEY, eventHubName)
.addData(HOST_NAME_KEY, fullyQualifiedNamespace)
.addData(AZ_TRACING_NAMESPACE_KEY, AZ_NAMESPACE_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,72 @@ public void testProcessSpans() throws Exception {
verify(tracer1, times(1)).end(eq("success"), isNull(), any());
}

/**
* Tests process start spans invoked without diagnostic id from event data of upstream for {@link EventProcessorClient}.
*
* @throws Exception if an error occurs while running the test.
*/
@Test
public void testProcessSpansWithoutDiagnosticId() throws Exception {
//Arrange
final Tracer tracer1 = mock(Tracer.class);
final List<Tracer> tracers = Collections.singletonList(tracer1);
TracerProvider tracerProvider = new TracerProvider(tracers);
when(eventHubClientBuilder.getPrefetchCount()).thenReturn(DEFAULT_PREFETCH_COUNT);
when(eventHubClientBuilder.buildAsyncClient()).thenReturn(eventHubAsyncClient);
when(eventHubAsyncClient.getFullyQualifiedNamespace()).thenReturn("test-ns");
when(eventHubAsyncClient.getEventHubName()).thenReturn("test-eh");
when(eventHubAsyncClient.getPartitionIds()).thenReturn(Flux.just("1"));
when(eventHubAsyncClient
.createConsumer(anyString(), anyInt()))
.thenReturn(consumer1);
when(eventData1.getSequenceNumber()).thenReturn(1L);
when(eventData1.getOffset()).thenReturn(1L);
when(eventData1.getEnqueuedTime()).thenReturn(Instant.ofEpochSecond(1560639208));
when(eventData2.getSequenceNumber()).thenReturn(2L);
when(eventData2.getOffset()).thenReturn(100L);
when(eventData2.getEnqueuedTime()).thenReturn(Instant.ofEpochSecond(1560639208));
when(eventData3.getSequenceNumber()).thenReturn(3L);
when(eventData3.getOffset()).thenReturn(150L);
when(eventData3.getEnqueuedTime()).thenReturn(Instant.ofEpochSecond(1560639208));

Map<String, Object> properties = new HashMap<>();

when(eventData1.getProperties()).thenReturn(properties);
when(consumer1.receiveFromPartition(anyString(), any(EventPosition.class), any(ReceiveOptions.class)))
.thenReturn(Flux.just(getEvent(eventData1), getEvent(eventData2), getEvent(eventData3)));

when(tracer1.start(eq("EventHubs.process"), any(), eq(ProcessKind.PROCESS))).thenAnswer(
invocation -> {
Context passed = invocation.getArgument(1, Context.class);
assertTrue(passed.getData(MESSAGE_ENQUEUED_TIME).isPresent());
return passed.addData(SPAN_CONTEXT_KEY, "value1").addData("scope", (AutoCloseable) () -> {
return;
}).addData(PARENT_SPAN_KEY, "value2");
}
);

final SampleCheckpointStore checkpointStore = new SampleCheckpointStore();

CountDownLatch countDownLatch = new CountDownLatch(3);
TestPartitionProcessor testPartitionProcessor = new TestPartitionProcessor();
testPartitionProcessor.countDownLatch = countDownLatch;
//Act
EventProcessorClient eventProcessorClient = new EventProcessorClient(eventHubClientBuilder, "test-consumer",
() -> testPartitionProcessor, checkpointStore, false, tracerProvider, ec -> { }, new HashMap<>(),
1, null, false, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED);

eventProcessorClient.start();
boolean success = countDownLatch.await(10, TimeUnit.SECONDS);
eventProcessorClient.stop();

assertTrue(success);

//Assert
verify(tracer1, times(3)).start(eq("EventHubs.process"), any(), eq(ProcessKind.PROCESS));
verify(tracer1, times(3)).end(eq("success"), isNull(), any());
}

/**
* Tests {@link EventProcessorClient} that processes events from an Event Hub configured with multiple partitions.
*
Expand Down
6 changes: 1 addition & 5 deletions sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,9 @@

## 7.6.0-beta.1 (Unreleased)

### Features Added

### Breaking Changes

### Bugs Fixed

### Other Changes
- Fixed a bug that when received message does not have trace context, span is not created. ([#25182](https://github.com/Azure/azure-sdk-for-java/issues/25182))

## 7.5.2 (2022-01-14)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,11 +364,11 @@ private Context startProcessTracingSpan(ServiceBusReceivedMessage receivedMessag
String fullyQualifiedNamespace) {

Object diagnosticId = receivedMessage.getApplicationProperties().get(DIAGNOSTIC_ID_KEY);
if (diagnosticId == null || !tracerProvider.isEnabled()) {
if (tracerProvider == null || !tracerProvider.isEnabled()) {
return Context.NONE;
}

Context spanContext = tracerProvider.extractContext(diagnosticId.toString(), Context.NONE);
Context spanContext = Objects.isNull(diagnosticId) ? Context.NONE : tracerProvider.extractContext(diagnosticId.toString(), Context.NONE);

spanContext = spanContext
.addData(ENTITY_PATH_KEY, entityPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,60 @@ public void testProcessorWithTracingEnabled() throws InterruptedException {

}

@Test
public void testProcessorWithTracingEnabledWithoutDiagnosticId() throws InterruptedException {
final Tracer tracer = mock(Tracer.class);
final List<Tracer> tracers = Collections.singletonList(tracer);
final int numberOfTimes = 5;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good, just curious, why we need to send 5 times message, rather than just 1 time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I referred to the previous tests. Most of them send 5 or 10 times for test. I did the same times for safety. Send 1 time works too.

final TracerProvider tracerProvider = new TracerProvider(tracers);

when(tracer.start(eq("ServiceBus.process"), any(), eq(ProcessKind.PROCESS))).thenAnswer(
invocation -> {
Context passed = invocation.getArgument(1, Context.class);
assertTrue(passed.getData(MESSAGE_ENQUEUED_TIME).isPresent());
return passed.addData(SPAN_CONTEXT_KEY, "value1").addData("scope", (AutoCloseable) () -> {
return;
}).addData(PARENT_SPAN_KEY, "value2");
}
);
Flux<ServiceBusMessageContext> messageFlux =
Flux.create(emitter -> {
for (int i = 0; i < numberOfTimes; i++) {
ServiceBusReceivedMessage serviceBusReceivedMessage =
new ServiceBusReceivedMessage(BinaryData.fromString("hello"));
serviceBusReceivedMessage.setMessageId(String.valueOf(i));
serviceBusReceivedMessage.setEnqueuedTime(OffsetDateTime.now());
ServiceBusMessageContext serviceBusMessageContext =
new ServiceBusMessageContext(serviceBusReceivedMessage);
emitter.next(serviceBusMessageContext);
}
});

ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiverBuilder = getBuilder(messageFlux);

AtomicInteger messageId = new AtomicInteger();
CountDownLatch countDownLatch = new CountDownLatch(numberOfTimes);
ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder,
messageContext -> {
assertEquals(String.valueOf(messageId.getAndIncrement()), messageContext.getMessage().getMessageId());
countDownLatch.countDown();
},
error -> Assertions.fail("Error occurred when receiving messages from the processor"),
new ServiceBusProcessorClientOptions().setMaxConcurrentCalls(1).setTracerProvider(tracerProvider));

serviceBusProcessorClient.start();
boolean success = countDownLatch.await(numberOfTimes, TimeUnit.SECONDS);
serviceBusProcessorClient.close();

assertTrue(success, "Failed to receive all expected messages");
verify(tracer, times(numberOfTimes)).start(eq("ServiceBus.process"), any(), eq(ProcessKind.PROCESS));

// This is one less because the processEvent is called before the end span call, so it is possible for
// to reach this line without calling it the 5th time yet. (Timing issue.)
verify(tracer, atLeast(numberOfTimes - 1)).end(eq("success"), isNull(), any());

}

private ServiceBusClientBuilder.ServiceBusReceiverClientBuilder getBuilder(
Flux<ServiceBusMessageContext> messageFlux) {

Expand Down