From f88481edfe74e03ebf5314da183dddcfc7cd6dc5 Mon Sep 17 00:00:00 2001 From: zejiajiang Date: Thu, 6 Jan 2022 15:02:40 +0800 Subject: [PATCH 1/5] fix(issue#25182): eventhub and svervicebus processor client start span independently --- .../eventhubs/PartitionPumpManager.java | 6 +- .../eventhubs/EventProcessorClientTest.java | 66 +++++++++++++++++++ .../servicebus/ServiceBusProcessorClient.java | 4 +- .../servicebus/ServiceBusProcessorTest.java | 54 +++++++++++++++ 4 files changed, 126 insertions(+), 4 deletions(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java index dc4752ffbc9bd..2c387e4330990 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; @@ -366,11 +367,12 @@ private void cleanup(PartitionOwnership claimedOwnership, PartitionPump partitio */ private Context startProcessTracingSpan(EventData eventData, String eventHubName, String fullyQualifiedNamespace) { Object diagnosticId = eventData.getProperties().get(DIAGNOSTIC_ID_KEY); - if (diagnosticId == null || !tracerProvider.isEnabled()) { + if (tracerProvider == null || !tracerProvider.isEnabled()) { return Context.NONE; } - Context spanContext = tracerProvider.extractContext(diagnosticId.toString(), Context.NONE) + Context spanContext = Objects.isNull(diagnosticId) ? Context.NONE : tracerProvider.extractContext(diagnosticId.toString(), Context.NONE); + spanContext = spanContext .addData(ENTITY_PATH_KEY, eventHubName) .addData(HOST_NAME_KEY, fullyQualifiedNamespace) .addData(AZ_TRACING_NAMESPACE_KEY, AZ_NAMESPACE_VALUE); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java index 843608236c0a6..cd30bfcd8c948 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java @@ -258,6 +258,72 @@ public void testProcessSpans() throws Exception { verify(tracer1, times(1)).end(eq("success"), isNull(), any()); } + /** + * Tests process start spans invoked without diagnostic id from event data of upstream for {@link EventProcessorClient}. + * + * @throws Exception if an error occurs while running the test. + */ + @Test + public void testProcessSpansWithoutDiagnosticId() throws Exception { + //Arrange + final Tracer tracer1 = mock(Tracer.class); + final List tracers = Collections.singletonList(tracer1); + TracerProvider tracerProvider = new TracerProvider(tracers); + when(eventHubClientBuilder.getPrefetchCount()).thenReturn(DEFAULT_PREFETCH_COUNT); + when(eventHubClientBuilder.buildAsyncClient()).thenReturn(eventHubAsyncClient); + when(eventHubAsyncClient.getFullyQualifiedNamespace()).thenReturn("test-ns"); + when(eventHubAsyncClient.getEventHubName()).thenReturn("test-eh"); + when(eventHubAsyncClient.getPartitionIds()).thenReturn(Flux.just("1")); + when(eventHubAsyncClient + .createConsumer(anyString(), anyInt())) + .thenReturn(consumer1); + when(eventData1.getSequenceNumber()).thenReturn(1L); + when(eventData1.getOffset()).thenReturn(1L); + when(eventData1.getEnqueuedTime()).thenReturn(Instant.ofEpochSecond(1560639208)); + when(eventData2.getSequenceNumber()).thenReturn(2L); + when(eventData2.getOffset()).thenReturn(100L); + when(eventData2.getEnqueuedTime()).thenReturn(Instant.ofEpochSecond(1560639208)); + when(eventData3.getSequenceNumber()).thenReturn(3L); + when(eventData3.getOffset()).thenReturn(150L); + when(eventData3.getEnqueuedTime()).thenReturn(Instant.ofEpochSecond(1560639208)); + + Map properties = new HashMap<>(); + + when(eventData1.getProperties()).thenReturn(properties); + when(consumer1.receiveFromPartition(anyString(), any(EventPosition.class), any(ReceiveOptions.class))) + .thenReturn(Flux.just(getEvent(eventData1), getEvent(eventData2), getEvent(eventData3))); + + when(tracer1.start(eq("EventHubs.process"), any(), eq(ProcessKind.PROCESS))).thenAnswer( + invocation -> { + Context passed = invocation.getArgument(1, Context.class); + assertTrue(passed.getData(MESSAGE_ENQUEUED_TIME).isPresent()); + return passed.addData(SPAN_CONTEXT_KEY, "value1").addData("scope", (AutoCloseable) () -> { + return; + }).addData(PARENT_SPAN_KEY, "value2"); + } + ); + + final SampleCheckpointStore checkpointStore = new SampleCheckpointStore(); + + CountDownLatch countDownLatch = new CountDownLatch(3); + TestPartitionProcessor testPartitionProcessor = new TestPartitionProcessor(); + testPartitionProcessor.countDownLatch = countDownLatch; + //Act + EventProcessorClient eventProcessorClient = new EventProcessorClient(eventHubClientBuilder, "test-consumer", + () -> testPartitionProcessor, checkpointStore, false, tracerProvider, ec -> { }, new HashMap<>(), + 1, null, false, Duration.ofSeconds(10), Duration.ofMinutes(1), LoadBalancingStrategy.BALANCED); + + eventProcessorClient.start(); + boolean success = countDownLatch.await(10, TimeUnit.SECONDS); + eventProcessorClient.stop(); + + assertTrue(success); + + //Assert + verify(tracer1, times(3)).start(eq("EventHubs.process"), any(), eq(ProcessKind.PROCESS)); + verify(tracer1, times(3)).end(eq("success"), isNull(), any()); + } + /** * Tests {@link EventProcessorClient} that processes events from an Event Hub configured with multiple partitions. * diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java index 3e52127106c15..45363374cb395 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java @@ -364,11 +364,11 @@ private Context startProcessTracingSpan(ServiceBusReceivedMessage receivedMessag String fullyQualifiedNamespace) { Object diagnosticId = receivedMessage.getApplicationProperties().get(DIAGNOSTIC_ID_KEY); - if (diagnosticId == null || !tracerProvider.isEnabled()) { + if (tracerProvider == null || !tracerProvider.isEnabled()) { return Context.NONE; } - Context spanContext = tracerProvider.extractContext(diagnosticId.toString(), Context.NONE); + Context spanContext = Objects.isNull(diagnosticId) ? Context.NONE : tracerProvider.extractContext(diagnosticId.toString(), Context.NONE); spanContext = spanContext .addData(ENTITY_PATH_KEY, entityPath) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorTest.java index acea91b3ab09c..f4613a83aaab9 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorTest.java @@ -415,6 +415,60 @@ public void testProcessorWithTracingEnabled() throws InterruptedException { } + @Test + public void testProcessorWithTracingEnabledWithoutDiagnosticId() throws InterruptedException { + final Tracer tracer = mock(Tracer.class); + final List tracers = Collections.singletonList(tracer); + final int numberOfTimes = 5; + final TracerProvider tracerProvider = new TracerProvider(tracers); + + when(tracer.start(eq("ServiceBus.process"), any(), eq(ProcessKind.PROCESS))).thenAnswer( + invocation -> { + Context passed = invocation.getArgument(1, Context.class); + assertTrue(passed.getData(MESSAGE_ENQUEUED_TIME).isPresent()); + return passed.addData(SPAN_CONTEXT_KEY, "value1").addData("scope", (AutoCloseable) () -> { + return; + }).addData(PARENT_SPAN_KEY, "value2"); + } + ); + Flux messageFlux = + Flux.create(emitter -> { + for (int i = 0; i < numberOfTimes; i++) { + ServiceBusReceivedMessage serviceBusReceivedMessage = + new ServiceBusReceivedMessage(BinaryData.fromString("hello")); + serviceBusReceivedMessage.setMessageId(String.valueOf(i)); + serviceBusReceivedMessage.setEnqueuedTime(OffsetDateTime.now()); + ServiceBusMessageContext serviceBusMessageContext = + new ServiceBusMessageContext(serviceBusReceivedMessage); + emitter.next(serviceBusMessageContext); + } + }); + + ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiverBuilder = getBuilder(messageFlux); + + AtomicInteger messageId = new AtomicInteger(); + CountDownLatch countDownLatch = new CountDownLatch(numberOfTimes); + ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder, + messageContext -> { + assertEquals(String.valueOf(messageId.getAndIncrement()), messageContext.getMessage().getMessageId()); + countDownLatch.countDown(); + }, + error -> Assertions.fail("Error occurred when receiving messages from the processor"), + new ServiceBusProcessorClientOptions().setMaxConcurrentCalls(1).setTracerProvider(tracerProvider)); + + serviceBusProcessorClient.start(); + boolean success = countDownLatch.await(numberOfTimes, TimeUnit.SECONDS); + serviceBusProcessorClient.close(); + + assertTrue(success, "Failed to receive all expected messages"); + verify(tracer, times(numberOfTimes)).start(eq("ServiceBus.process"), any(), eq(ProcessKind.PROCESS)); + + // This is one less because the processEvent is called before the end span call, so it is possible for + // to reach this line without calling it the 5th time yet. (Timing issue.) + verify(tracer, atLeast(numberOfTimes - 1)).end(eq("success"), isNull(), any()); + + } + private ServiceBusClientBuilder.ServiceBusReceiverClientBuilder getBuilder( Flux messageFlux) { From 16350afd7c62e328c56f8937e5539bdebab60ec2 Mon Sep 17 00:00:00 2001 From: zejiajiang Date: Mon, 24 Jan 2022 09:18:32 +0800 Subject: [PATCH 2/5] add CHANGELOG --- sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md b/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md index 3ae844ab7ef2d..4d4006b22eec3 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md +++ b/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md @@ -1,5 +1,11 @@ # Release History +## 5.10.5-beta.1 (Unreleased) + +### Bugs Fixed + +- Issue if received message does not have trace context, spans are not created ([#25182](https://github.com/Azure/azure-sdk-for-java/issues/25182)) + ## 5.10.4 (2022-01-18) ### Other Changes From fd809b36377bd1118c4b0e67bb07377abc33dea6 Mon Sep 17 00:00:00 2001 From: zejiajiang Date: Mon, 24 Jan 2022 09:19:55 +0800 Subject: [PATCH 3/5] chore --- sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md b/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md index 4d4006b22eec3..e2a4bc58e1580 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md +++ b/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md @@ -4,7 +4,7 @@ ### Bugs Fixed -- Issue if received message does not have trace context, spans are not created ([#25182](https://github.com/Azure/azure-sdk-for-java/issues/25182)) +- Issue if received message does not have trace context, spans are not created ([#25182](https://github.com/Azure/azure-sdk-for-java/issues/25182)) ## 5.10.4 (2022-01-18) From a9bffeafaaa4c09d1f79ad26d14c4df5048bdb97 Mon Sep 17 00:00:00 2001 From: ZejiaJiang <96095733+ZejiaJiang@users.noreply.github.com> Date: Mon, 24 Jan 2022 09:46:48 +0800 Subject: [PATCH 4/5] Update sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md Co-authored-by: Weidong Xu --- sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md b/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md index e2a4bc58e1580..cebaac51692f3 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md +++ b/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md @@ -4,7 +4,7 @@ ### Bugs Fixed -- Issue if received message does not have trace context, spans are not created ([#25182](https://github.com/Azure/azure-sdk-for-java/issues/25182)) +- Fixed a bug that when received message does not have trace context, span is not created. ([#25182](https://github.com/Azure/azure-sdk-for-java/issues/25182)) ## 5.10.4 (2022-01-18) From 672deb09778ee2153a2a0f237e582280a9581db3 Mon Sep 17 00:00:00 2001 From: zejiajiang Date: Mon, 24 Jan 2022 10:04:59 +0800 Subject: [PATCH 5/5] add CHANGELOG in service bus --- sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md index e649ec78aa3d6..4c557ab797f85 100644 --- a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md @@ -2,13 +2,9 @@ ## 7.6.0-beta.1 (Unreleased) -### Features Added - -### Breaking Changes - ### Bugs Fixed -### Other Changes +- Fixed a bug that when received message does not have trace context, span is not created. ([#25182](https://github.com/Azure/azure-sdk-for-java/issues/25182)) ## 7.5.2 (2022-01-14)