diff --git a/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md b/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md index 3ae844ab7ef2..cebaac51692f 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md +++ b/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md @@ -1,5 +1,11 @@ # Release History +## 5.10.5-beta.1 (Unreleased) + +### Bugs Fixed + +- Fixed a bug that when received message does not have trace context, span is not created. ([#25182](https://github.com/Azure/azure-sdk-for-java/issues/25182)) + ## 5.10.4 (2022-01-18) ### Other Changes diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java index dc4752ffbc9b..2c387e433099 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; @@ -366,11 +367,12 @@ private void cleanup(PartitionOwnership claimedOwnership, PartitionPump partitio */ private Context startProcessTracingSpan(EventData eventData, String eventHubName, String fullyQualifiedNamespace) { Object diagnosticId = eventData.getProperties().get(DIAGNOSTIC_ID_KEY); - if (diagnosticId == null || !tracerProvider.isEnabled()) { + if (tracerProvider == null || !tracerProvider.isEnabled()) { return Context.NONE; } - Context spanContext = tracerProvider.extractContext(diagnosticId.toString(), Context.NONE) + Context spanContext = Objects.isNull(diagnosticId) ? Context.NONE : tracerProvider.extractContext(diagnosticId.toString(), Context.NONE); + spanContext = spanContext .addData(ENTITY_PATH_KEY, eventHubName) .addData(HOST_NAME_KEY, fullyQualifiedNamespace) .addData(AZ_TRACING_NAMESPACE_KEY, AZ_NAMESPACE_VALUE); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java index 843608236c0a..cd30bfcd8c94 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java @@ -258,6 +258,72 @@ public void testProcessSpans() throws Exception { verify(tracer1, times(1)).end(eq("success"), isNull(), any()); } + /** + * Tests process start spans invoked without diagnostic id from event data of upstream for {@link EventProcessorClient}. + * + * @throws Exception if an error occurs while running the test. + */ + @Test + public void testProcessSpansWithoutDiagnosticId() throws Exception { + //Arrange + final Tracer tracer1 = mock(Tracer.class); + final List tracers = Collections.singletonList(tracer1); + TracerProvider tracerProvider = new TracerProvider(tracers); + when(eventHubClientBuilder.getPrefetchCount()).thenReturn(DEFAULT_PREFETCH_COUNT); + when(eventHubClientBuilder.buildAsyncClient()).thenReturn(eventHubAsyncClient); + when(eventHubAsyncClient.getFullyQualifiedNamespace()).thenReturn("test-ns"); + when(eventHubAsyncClient.getEventHubName()).thenReturn("test-eh"); + when(eventHubAsyncClient.getPartitionIds()).thenReturn(Flux.just("1")); + when(eventHubAsyncClient + .createConsumer(anyString(), anyInt())) + .thenReturn(consumer1); + when(eventData1.getSequenceNumber()).thenReturn(1L); + when(eventData1.getOffset()).thenReturn(1L); + when(eventData1.getEnqueuedTime()).thenReturn(Instant.ofEpochSecond(1560639208)); + when(eventData2.getSequenceNumber()).thenReturn(2L); + when(eventData2.getOffset()).thenReturn(100L); + when(eventData2.getEnqueuedTime()).thenReturn(Instant.ofEpochSecond(1560639208)); + when(eventData3.getSequenceNumber()).thenReturn(3L); + when(eventData3.getOffset()).thenReturn(150L); + when(eventData3.getEnqueuedTime()).thenReturn(Instant.ofEpochSecond(1560639208)); + + Map properties = new HashMap<>(); + + when(eventData1.getProperties()).thenReturn(properties); + when(consumer1.receiveFromPartition(anyString(), any(EventPosition.class), any(ReceiveOptions.class))) + .thenReturn(Flux.just(getEvent(eventData1), getEvent(eventData2), getEvent(eventData3))); + + when(tracer1.start(eq("EventHubs.process"), any(), eq(ProcessKind.PROCESS))).thenAnswer( + invocation -> { + Context passed = invocation.getArgument(1, Context.class); + assertTrue(passed.getData(MESSAGE_ENQUEUED_TIME).isPresent()); + return passed.addData(SPAN_CONTEXT_KEY, "value1").addData("scope", (AutoCloseable) () -> { + return; + }).addData(PARENT_SPAN_KEY, "value2"); + } + ); + + final SampleCheckpointStore checkpointStore = new SampleCheckpointStore(); + + CountDownLatch countDownLatch = new CountDownLatch(3); + TestPartitionProcessor testPartitionProcessor = new TestPartitionProcessor(); + testPartitionProcessor.countDownLatch = countDownLatch; + //Act + EventProcessorClient eventProcessorClient = new EventProcessorClient(eventHubClientBuilder, "test-consumer", + () -> testPartitionProcessor, checkpointStore, false, tracerProvider, ec -> { }, new HashMap<>(), + 1, null, false, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED); + + eventProcessorClient.start(); + boolean success = countDownLatch.await(10, TimeUnit.SECONDS); + eventProcessorClient.stop(); + + assertTrue(success); + + //Assert + verify(tracer1, times(3)).start(eq("EventHubs.process"), any(), eq(ProcessKind.PROCESS)); + verify(tracer1, times(3)).end(eq("success"), isNull(), any()); + } + /** * Tests {@link EventProcessorClient} that processes events from an Event Hub configured with multiple partitions. * diff --git a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md index e649ec78aa3d..4c557ab797f8 100644 --- a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md @@ -2,13 +2,9 @@ ## 7.6.0-beta.1 (Unreleased) -### Features Added - -### Breaking Changes - ### Bugs Fixed -### Other Changes +- Fixed a bug that when received message does not have trace context, span is not created. ([#25182](https://github.com/Azure/azure-sdk-for-java/issues/25182)) ## 7.5.2 (2022-01-14) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java index 3e52127106c1..45363374cb39 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java @@ -364,11 +364,11 @@ private Context startProcessTracingSpan(ServiceBusReceivedMessage receivedMessag String fullyQualifiedNamespace) { Object diagnosticId = receivedMessage.getApplicationProperties().get(DIAGNOSTIC_ID_KEY); - if (diagnosticId == null || !tracerProvider.isEnabled()) { + if (tracerProvider == null || !tracerProvider.isEnabled()) { return Context.NONE; } - Context spanContext = tracerProvider.extractContext(diagnosticId.toString(), Context.NONE); + Context spanContext = Objects.isNull(diagnosticId) ? Context.NONE : tracerProvider.extractContext(diagnosticId.toString(), Context.NONE); spanContext = spanContext .addData(ENTITY_PATH_KEY, entityPath) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorTest.java index acea91b3ab09..f4613a83aaab 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorTest.java @@ -415,6 +415,60 @@ public void testProcessorWithTracingEnabled() throws InterruptedException { } + @Test + public void testProcessorWithTracingEnabledWithoutDiagnosticId() throws InterruptedException { + final Tracer tracer = mock(Tracer.class); + final List tracers = Collections.singletonList(tracer); + final int numberOfTimes = 5; + final TracerProvider tracerProvider = new TracerProvider(tracers); + + when(tracer.start(eq("ServiceBus.process"), any(), eq(ProcessKind.PROCESS))).thenAnswer( + invocation -> { + Context passed = invocation.getArgument(1, Context.class); + assertTrue(passed.getData(MESSAGE_ENQUEUED_TIME).isPresent()); + return passed.addData(SPAN_CONTEXT_KEY, "value1").addData("scope", (AutoCloseable) () -> { + return; + }).addData(PARENT_SPAN_KEY, "value2"); + } + ); + Flux messageFlux = + Flux.create(emitter -> { + for (int i = 0; i < numberOfTimes; i++) { + ServiceBusReceivedMessage serviceBusReceivedMessage = + new ServiceBusReceivedMessage(BinaryData.fromString("hello")); + serviceBusReceivedMessage.setMessageId(String.valueOf(i)); + serviceBusReceivedMessage.setEnqueuedTime(OffsetDateTime.now()); + ServiceBusMessageContext serviceBusMessageContext = + new ServiceBusMessageContext(serviceBusReceivedMessage); + emitter.next(serviceBusMessageContext); + } + }); + + ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiverBuilder = getBuilder(messageFlux); + + AtomicInteger messageId = new AtomicInteger(); + CountDownLatch countDownLatch = new CountDownLatch(numberOfTimes); + ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder, + messageContext -> { + assertEquals(String.valueOf(messageId.getAndIncrement()), messageContext.getMessage().getMessageId()); + countDownLatch.countDown(); + }, + error -> Assertions.fail("Error occurred when receiving messages from the processor"), + new ServiceBusProcessorClientOptions().setMaxConcurrentCalls(1).setTracerProvider(tracerProvider)); + + serviceBusProcessorClient.start(); + boolean success = countDownLatch.await(numberOfTimes, TimeUnit.SECONDS); + serviceBusProcessorClient.close(); + + assertTrue(success, "Failed to receive all expected messages"); + verify(tracer, times(numberOfTimes)).start(eq("ServiceBus.process"), any(), eq(ProcessKind.PROCESS)); + + // This is one less because the processEvent is called before the end span call, so it is possible for + // to reach this line without calling it the 5th time yet. (Timing issue.) + verify(tracer, atLeast(numberOfTimes - 1)).end(eq("success"), isNull(), any()); + + } + private ServiceBusClientBuilder.ServiceBusReceiverClientBuilder getBuilder( Flux messageFlux) {