From de2835bb1ce646d87278ad0915ecfb5f2045fa2b Mon Sep 17 00:00:00 2001 From: Liudmila Molkova Date: Mon, 7 Nov 2022 19:03:55 -0800 Subject: [PATCH] Cleanup --- .../opentelemetry/OpenTelemetryTracer.java | 5 +- .../OpenTelemetryTracerTest.java | 12 - .../eventhubs/TracingIntegrationTests.java | 30 ++ .../azure/messaging/servicebus/FluxTrace.java | 4 +- .../servicebus/ServiceBusProcessorClient.java | 51 ++-- .../servicebus/ServiceBusReceivedMessage.java | 18 +- .../ServiceBusReceiverAsyncClient.java | 19 +- .../ServiceBusReceiverInstrumentation.java | 2 +- .../servicebus/ServiceBusProcessorTest.java | 6 + .../ServiceBusReceiverAsyncClientTest.java | 3 + .../servicebus/TracingIntegrationTests.java | 286 +++++++++++++++++- 11 files changed, 369 insertions(+), 67 deletions(-) diff --git a/sdk/core/azure-core-tracing-opentelemetry/src/main/java/com/azure/core/tracing/opentelemetry/OpenTelemetryTracer.java b/sdk/core/azure-core-tracing-opentelemetry/src/main/java/com/azure/core/tracing/opentelemetry/OpenTelemetryTracer.java index 135eb8516e07a..daab2c7b12b44 100644 --- a/sdk/core/azure-core-tracing-opentelemetry/src/main/java/com/azure/core/tracing/opentelemetry/OpenTelemetryTracer.java +++ b/sdk/core/azure-core-tracing-opentelemetry/src/main/java/com/azure/core/tracing/opentelemetry/OpenTelemetryTracer.java @@ -282,7 +282,10 @@ public AutoCloseable makeSpanCurrent(Context context) { return NOOP_CLOSEABLE; } - io.opentelemetry.context.Context traceContext = getTraceContextOrDefault(context, io.opentelemetry.context.Context.root()); + io.opentelemetry.context.Context traceContext = getTraceContextOrDefault(context, null); + if (traceContext == null) { + return NOOP_CLOSEABLE; + } return traceContext.makeCurrent(); } diff --git a/sdk/core/azure-core-tracing-opentelemetry/src/test/java/com/azure/core/tracing/opentelemetry/OpenTelemetryTracerTest.java b/sdk/core/azure-core-tracing-opentelemetry/src/test/java/com/azure/core/tracing/opentelemetry/OpenTelemetryTracerTest.java index 811ac187032c5..16c51a5557a0e 100644 --- a/sdk/core/azure-core-tracing-opentelemetry/src/test/java/com/azure/core/tracing/opentelemetry/OpenTelemetryTracerTest.java +++ b/sdk/core/azure-core-tracing-opentelemetry/src/test/java/com/azure/core/tracing/opentelemetry/OpenTelemetryTracerTest.java @@ -520,18 +520,6 @@ public void startEndCurrentSpan() { } } - @Test - public void startCurrentSpanNoContext() { - try (Scope parentScope = parentSpan.makeCurrent()) { - try (AutoCloseable scope = openTelemetryTracer.makeSpanCurrent(Context.NONE)) { - assertFalse(Span.current().getSpanContext().isValid()); - } catch (Exception e) { - fail(); - } - assertSame(parentSpan, Span.current()); - } - } - @Test @SuppressWarnings("deprecation") public void startEndCurrentSpanBackwardCompatible() { diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/TracingIntegrationTests.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/TracingIntegrationTests.java index 4054e25262521..afccd290a0d1e 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/TracingIntegrationTests.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/TracingIntegrationTests.java @@ -26,6 +26,7 @@ import org.junit.jupiter.api.parallel.Execution; import org.junit.jupiter.api.parallel.ExecutionMode; import org.junit.jupiter.api.parallel.Isolated; +import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import java.nio.charset.StandardCharsets; @@ -172,6 +173,35 @@ public void sendAndReceive() throws InterruptedException { assertConsumerSpan(received.get(0), receivedMessage.get(), "EventHubs.consume"); } + @Test + public void sendAndReceiveParallel() throws InterruptedException { + int messageCount = 5; + CountDownLatch latch = new CountDownLatch(messageCount); + spanProcessor.notifyIfCondition(latch, span -> span.getName().equals("EventHubs.consume")); + StepVerifier.create(consumer + .receive() + .take(messageCount) + .doOnNext(pe -> { + String traceparent = (String) pe.getData().getProperties().get("traceparent"); + String traceId = Span.current().getSpanContext().getTraceId(); + + // context created for the message and current are the same + assertTrue(traceparent.startsWith("00-" + traceId)); + assertFalse(((ReadableSpan) Span.current()).hasEnded()); + }) + .parallel(messageCount, 1) + .runOn(Schedulers.boundedElastic(), 2)) + .expectNextCount(messageCount) + .verifyComplete(); + + StepVerifier.create(producer.send(data, new SendOptions())).verifyComplete(); + + assertTrue(latch.await(20, TimeUnit.SECONDS)); + List spans = spanProcessor.getEndedSpans(); + List received = findSpans(spans, "EventHubs.consume"); + assertTrue(messageCount <= received.size()); + } + @Test public void sendBuffered() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/FluxTrace.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/FluxTrace.java index b9229fbb36907..16c4c0a50edc4 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/FluxTrace.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/FluxTrace.java @@ -58,7 +58,7 @@ protected void hookOnSubscribe(Subscription subscription) { protected void hookOnNext(ServiceBusMessageContext message) { Throwable exception = null; Context span = instrumentation.instrumentProcess("ServiceBus.process", message.getMessage(), Context.NONE); - AutoCloseable scope = tracer.makeSpanCurrent(span); + message.getMessage().setContext(span); try { downstream.onNext(message); @@ -72,7 +72,7 @@ protected void hookOnNext(ServiceBusMessageContext message) { exception = (Throwable) processorException; } } - tracer.endSpan(exception, span, scope); + tracer.endSpan(exception, context, null); } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java index 0a7fa7422cc60..b203bcb1c5850 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java @@ -7,6 +7,7 @@ import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusProcessorClientBuilder; import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder; import com.azure.messaging.servicebus.implementation.ServiceBusProcessorClientOptions; +import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Disposable; @@ -19,6 +20,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import static com.azure.messaging.servicebus.FluxTrace.PROCESS_ERROR_KEY; + /** * The processor client for processing Service Bus messages. {@link ServiceBusProcessorClient} provides a push-based * mechanism that invokes the message processing callback when a message is received or the error handler when an error @@ -158,6 +161,7 @@ public final class ServiceBusProcessorClient implements AutoCloseable { private final String queueName; private final String topicName; private final String subscriptionName; + private final ServiceBusTracer tracer; private Disposable monitorDisposable; /** @@ -182,11 +186,13 @@ public final class ServiceBusProcessorClient implements AutoCloseable { this.processError = Objects.requireNonNull(processError, "'processError' cannot be null"); this.processorOptions = Objects.requireNonNull(processorOptions, "'processorOptions' cannot be null"); - this.asyncClient.set(sessionReceiverBuilder.buildAsyncClientForProcessor()); + ServiceBusReceiverAsyncClient client = sessionReceiverBuilder.buildAsyncClientForProcessor(); + this.asyncClient.set(client); this.receiverBuilder = null; this.queueName = queueName; this.topicName = topicName; this.subscriptionName = subscriptionName; + this.tracer = client.getInstrumentation().getTracer(); } /** @@ -208,11 +214,14 @@ public final class ServiceBusProcessorClient implements AutoCloseable { this.processMessage = Objects.requireNonNull(processMessage, "'processMessage' cannot be null"); this.processError = Objects.requireNonNull(processError, "'processError' cannot be null"); this.processorOptions = Objects.requireNonNull(processorOptions, "'processorOptions' cannot be null"); - this.asyncClient.set(receiverBuilder.buildAsyncClient()); + + ServiceBusReceiverAsyncClient client = receiverBuilder.buildAsyncClient(); + this.asyncClient.set(client); this.sessionReceiverBuilder = null; this.queueName = queueName; this.topicName = topicName; this.subscriptionName = subscriptionName; + this.tracer = client.getInstrumentation().getTracer(); } /** @@ -360,29 +369,35 @@ public void onSubscribe(Subscription subscription) { subscription.request(1); } + @SuppressWarnings("try") @Override public void onNext(ServiceBusMessageContext serviceBusMessageContext) { - if (serviceBusMessageContext.hasError()) { - handleError(serviceBusMessageContext.getThrowable()); - } else { - try { + try (AutoCloseable scope = tracer.makeSpanCurrent(serviceBusMessageContext.getMessage().getContext())) { + if (serviceBusMessageContext.hasError()) { + handleError(serviceBusMessageContext.getThrowable()); + } else { ServiceBusReceivedMessageContext serviceBusReceivedMessageContext = new ServiceBusReceivedMessageContext(receiverClient, serviceBusMessageContext); - processMessage.accept(serviceBusReceivedMessageContext); - } catch (Exception ex) { - serviceBusMessageContext.getMessage().addContext(FluxTrace.PROCESS_ERROR_KEY, ex); - handleError(new ServiceBusException(ex, ServiceBusErrorSource.USER_CALLBACK)); - - if (!processorOptions.isDisableAutoComplete()) { - LOGGER.warning("Error when processing message. Abandoning message.", ex); - abandonMessage(serviceBusMessageContext, receiverClient); + try { + processMessage.accept(serviceBusReceivedMessageContext); + } catch (Exception ex) { + serviceBusMessageContext.getMessage().setContext( + serviceBusMessageContext.getMessage().getContext().addData(PROCESS_ERROR_KEY, ex)); + handleError(new ServiceBusException(ex, ServiceBusErrorSource.USER_CALLBACK)); + + if (!processorOptions.isDisableAutoComplete()) { + LOGGER.warning("Error when processing message. Abandoning message.", ex); + abandonMessage(serviceBusMessageContext, receiverClient); + } } } - } - if (isRunning.get()) { - LOGGER.verbose("Requesting 1 more message from upstream"); - subscription.request(1); + if (isRunning.get()) { + LOGGER.verbose("Requesting 1 more message from upstream"); + subscription.request(1); + } + } catch (Exception e) { + LOGGER.verbose("Error disposing scope", e); } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceivedMessage.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceivedMessage.java index 48eaa893bec67..112c0f43ab918 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceivedMessage.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceivedMessage.java @@ -518,26 +518,21 @@ public String getTo() { } /** - * Adds a new key value pair to the existing context on Message. + * ASets context on the message. * - * @param key The key for this context object - * @param value The value for this context object. + * @param context Context to set. * * @return The updated {@link ServiceBusMessage}. * @throws NullPointerException if {@code key} or {@code value} is null. */ - ServiceBusReceivedMessage addContext(String key, Object value) { - Objects.requireNonNull(key, "The 'key' parameter cannot be null."); - Objects.requireNonNull(value, "The 'value' parameter cannot be null."); - this.context = context.addData(key, value); + ServiceBusReceivedMessage setContext(Context context) { + Objects.requireNonNull(context, "The 'context' parameter cannot be null."); + this.context = context; return this; } /** - * Adds a new key value pair to the existing context on Message. - * - * @param key The key for this context object - * @param value The value for this context object. + * Gets context associated with the message. * * @return The updated {@link ServiceBusMessage}. * @throws NullPointerException if {@code key} or {@code value} is null. @@ -545,6 +540,7 @@ ServiceBusReceivedMessage addContext(String key, Object value) { Context getContext() { return this.context; } + /** * Gets whether the message has been settled. * diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java index a1a909b7b728d..d255385222f5a 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java @@ -823,15 +823,20 @@ public Flux receiveMessages() { return receiveMessagesNoBackPressure().limitRate(1, 0); } + @SuppressWarnings("try") Flux receiveMessagesNoBackPressure() { return receiveMessagesWithContext(0) - .handle((serviceBusMessageContext, sink) -> { - if (serviceBusMessageContext.hasError()) { - sink.error(serviceBusMessageContext.getThrowable()); - return; - } - sink.next(serviceBusMessageContext.getMessage()); - }); + .handle((serviceBusMessageContext, sink) -> { + try (AutoCloseable scope = tracer.makeSpanCurrent(serviceBusMessageContext.getMessage().getContext())) { + if (serviceBusMessageContext.hasError()) { + sink.error(serviceBusMessageContext.getThrowable()); + return; + } + sink.next(serviceBusMessageContext.getMessage()); + } catch (Exception ex) { + LOGGER.verbose("Error disposing scope", ex); + } + }); } /** diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/instrumentation/ServiceBusReceiverInstrumentation.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/instrumentation/ServiceBusReceiverInstrumentation.java index add850174224c..f56f81cfd962d 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/instrumentation/ServiceBusReceiverInstrumentation.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/instrumentation/ServiceBusReceiverInstrumentation.java @@ -71,7 +71,7 @@ public Mono instrumentSettlement(Mono publisher, ServiceBusReceivedMes .contextWrite(ctx -> { startTime.set(Instant.now().toEpochMilli()); return ctx.put(REACTOR_PARENT_TRACE_CONTEXT_KEY, tracer.startSpanWithLink(getSettlementSpanName(status), message, - messageContext, Context.NONE)); + messageContext, messageContext)); }); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorTest.java index 772c6c402024e..cd5402e178f4f 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorTest.java @@ -50,6 +50,8 @@ public class ServiceBusProcessorTest { private static final String NAMESPACE = "namespace"; private static final String ENTITY_NAME = "entity"; + private static final ServiceBusReceiverInstrumentation DEFAULT_INSTRUMENTATION = + new ServiceBusReceiverInstrumentation(null, null, NAMESPACE, ENTITY_NAME, "subscription", false); /** * Tests receiving messages using a {@link ServiceBusProcessorClient}. @@ -285,6 +287,7 @@ public void testUserMessageHandlerError() throws InterruptedException { when(asyncClient.abandon(any(ServiceBusReceivedMessage.class))).thenReturn(Mono.empty()); when(asyncClient.getFullyQualifiedNamespace()).thenReturn(NAMESPACE); when(asyncClient.getEntityPath()).thenReturn(ENTITY_NAME); + when(asyncClient.getInstrumentation()).thenReturn(DEFAULT_INSTRUMENTATION); doNothing().when(asyncClient).close(); final AtomicInteger messageId = new AtomicInteger(); @@ -339,6 +342,7 @@ public void testUserMessageHandlerErrorWithAutoCompleteDisabled() throws Interru when(asyncClient.isConnectionClosed()).thenReturn(false); when(asyncClient.getFullyQualifiedNamespace()).thenReturn(NAMESPACE); when(asyncClient.getEntityPath()).thenReturn(ENTITY_NAME); + when(asyncClient.getInstrumentation()).thenReturn(DEFAULT_INSTRUMENTATION); doNothing().when(asyncClient).close(); @@ -495,6 +499,7 @@ private ServiceBusClientBuilder.ServiceBusReceiverClientBuilder getBuilder( ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation(tracer, null, NAMESPACE, ENTITY_NAME, null, false); when(asyncClient.receiveMessagesWithContext()).thenReturn( new FluxTrace(messageFlux, instrumentation).publishOn(Schedulers.boundedElastic())); + when(asyncClient.getInstrumentation()).thenReturn(instrumentation); when(asyncClient.isConnectionClosed()).thenReturn(false); doNothing().when(asyncClient).close(); return receiverBuilder; @@ -512,6 +517,7 @@ private ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder getSessio when(receiverBuilder.buildAsyncClientForProcessor()).thenReturn(asyncClient); when(asyncClient.receiveMessagesWithContext()).thenReturn(messageFlux); when(asyncClient.isConnectionClosed()).thenReturn(false); + when(asyncClient.getInstrumentation()).thenReturn(DEFAULT_INSTRUMENTATION); doNothing().when(asyncClient).close(); return receiverBuilder; } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java index 3cb81e6608770..e651ae4d94885 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java @@ -1367,6 +1367,7 @@ void settlementMessagesReportsMetrics(DispositionStatus status) { connectionProcessor, CLEANUP_INTERVAL, instrumentation, messageSerializer, onClientClose, CLIENT_IDENTIFIER); when(receivedMessage.getLockToken()).thenReturn("mylockToken"); when(receivedMessage.getSequenceNumber()).thenReturn(42L); + when(receivedMessage.getContext()).thenReturn(Context.NONE); when(managementNode.updateDisposition(any(), any(), isNull(), isNull(), isNull(), isNull(), isNull(), isNull())).thenReturn(Mono.empty()); @@ -1459,9 +1460,11 @@ void receiveWithTracesAndMetrics() { when(receivedMessage.getLockToken()).thenReturn("mylockToken"); when(receivedMessage.getSequenceNumber()).thenReturn(42L); + when(receivedMessage.getContext()).thenReturn(Context.NONE); when(receivedMessage2.getLockToken()).thenReturn("mylockToken"); when(receivedMessage2.getSequenceNumber()).thenReturn(43L); + when(receivedMessage2.getContext()).thenReturn(Context.NONE); when(messageSerializer.deserialize(any(Message.class), eq(ServiceBusReceivedMessage.class))) .thenReturn(receivedMessage, receivedMessage2); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TracingIntegrationTests.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TracingIntegrationTests.java index 16920599f0e56..032e639218aad 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TracingIntegrationTests.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TracingIntegrationTests.java @@ -23,6 +23,7 @@ import org.junit.jupiter.api.parallel.Execution; import org.junit.jupiter.api.parallel.ExecutionMode; import org.junit.jupiter.api.parallel.Isolated; +import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import java.time.Duration; @@ -41,6 +42,7 @@ import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -85,6 +87,8 @@ protected void beforeTest() { .receiver() .queueName(getQueueName(0)) .buildClient(); + + StepVerifier.setDefaultTimeout(TIMEOUT); } @Override @@ -99,20 +103,32 @@ protected void afterTest() { } @Test - public void sendAndReceive() { + public void sendAndReceive() throws InterruptedException { ServiceBusMessage message1 = new ServiceBusMessage(CONTENTS_BYTES); ServiceBusMessage message2 = new ServiceBusMessage(CONTENTS_BYTES); List messages = Arrays.asList(message1, message2); StepVerifier.create(sender.sendMessages(messages)) .verifyComplete(); + CountDownLatch processedFound = new CountDownLatch(2); + spanProcessor.notifyIfCondition(processedFound, s -> s.getName().equals("ServiceBus.process")); + List received = new ArrayList<>(); - StepVerifier.create(receiver.receiveMessages() - .flatMap(rm -> receiver.complete(rm).thenReturn(rm)) - .take(2)) - .assertNext(rm -> received.add(rm)) - .assertNext(rm -> received.add(rm)) - .verifyComplete(); + receiver.receiveMessages() + .take(2) + .doOnNext(msg -> { + received.add(msg); + String traceparent = (String) msg.getApplicationProperties().get("traceparent"); + String traceId = Span.current().getSpanContext().getTraceId(); + + // context created for the message and current are the same + assertTrue(traceparent.startsWith("00-" + traceId)); + assertFalse(((ReadableSpan) Span.current()).hasEnded()); + receiver.complete(msg).block(); + }) + .subscribe(); + + assertTrue(processedFound.await(20, TimeUnit.SECONDS)); List spans = spanProcessor.getEndedSpans(); @@ -129,10 +145,142 @@ public void sendAndReceive() { List completed = findSpans(spans, "ServiceBus.complete"); assertReceiveSpan(completed.get(0), Collections.singletonList(received.get(0)), "ServiceBus.complete"); - assertParent(completed.get(0), processed.get(0)); + assertParentFound(completed.get(0), processed); assertReceiveSpan(completed.get(1), Collections.singletonList(received.get(1)), "ServiceBus.complete"); - assertParent(completed.get(1), processed.get(1)); + assertParentFound(completed.get(1), processed); + } + + @Test + public void receiveCheckSubscribe() throws InterruptedException { + ServiceBusMessage message1 = new ServiceBusMessage(CONTENTS_BYTES); + ServiceBusMessage message2 = new ServiceBusMessage(CONTENTS_BYTES); + List messages = Arrays.asList(message1, message2); + StepVerifier.create(sender.sendMessages(messages)) + .verifyComplete(); + + CountDownLatch processedFound = new CountDownLatch(2); + spanProcessor.notifyIfCondition(processedFound, s -> s.getName().equals("ServiceBus.process")); + + List received = new ArrayList<>(); + receiver.receiveMessages() + .take(2) + .subscribe(msg -> { + received.add(msg); + String traceparent = (String) msg.getApplicationProperties().get("traceparent"); + String traceId = Span.current().getSpanContext().getTraceId(); + + // context created for the message and current are the same + assertTrue(traceparent.startsWith("00-" + traceId)); + assertFalse(((ReadableSpan) Span.current()).hasEnded()); + receiver.complete(msg).block(); + }); + + assertTrue(processedFound.await(20, TimeUnit.SECONDS)); + + List spans = spanProcessor.getEndedSpans(); + + List processed = findSpans(spans, "ServiceBus.process"); + List completed = findSpans(spans, "ServiceBus.complete"); + assertParentFound(completed.get(0), processed); + assertParentFound(completed.get(1), processed); + } + + @Test + public void sendAndReceiveParallelNoAutoComplete() throws InterruptedException { + int messageCount = 5; + StepVerifier.create(sender.createMessageBatch() + .doOnNext(batch -> { + for (int i = 0; i < messageCount; i++) { + batch.tryAddMessage(new ServiceBusMessage(CONTENTS_BYTES)); + } + }) + .flatMap(batch -> sender.sendMessages(batch))).verifyComplete(); + + CountDownLatch processedFound = new CountDownLatch(messageCount); + spanProcessor.notifyIfCondition(processedFound, span -> span.getName().equals("ServiceBus.process")); + + StepVerifier.create( + receiver.receiveMessages() + .take(messageCount) + .doOnNext(msg -> { + if (Span.current().getSpanContext().isValid()) { + String traceparent = (String) msg.getApplicationProperties().get("traceparent"); + String traceId = Span.current().getSpanContext().getTraceId(); + + // context created for the message and current are the same + assertTrue(traceparent.startsWith("00-" + traceId)); + assertFalse(((ReadableSpan) Span.current()).hasEnded()); + } + receiver.complete(msg).block(); + }) + .parallel(messageCount, 1) + .runOn(Schedulers.boundedElastic())) + .expectNextCount(messageCount) + .verifyComplete(); + + assertTrue(processedFound.await(20, TimeUnit.SECONDS)); + + List spans = spanProcessor.getEndedSpans(); + List processed = findSpans(spans, "ServiceBus.process"); + List completed = findSpans(spans, "ServiceBus.complete"); + + assertEquals(messageCount, processed.size()); + assertEquals(messageCount, completed.size()); + for (ReadableSpan c : completed) { + assertParentFound(c, processed); + } + } + + @Test + public void sendAndReceiveParallelAutoComplete() throws InterruptedException { + int messageCount = 5; + StepVerifier.create(sender.createMessageBatch() + .doOnNext(batch -> { + for (int i = 0; i < messageCount; i++) { + batch.tryAddMessage(new ServiceBusMessage(CONTENTS_BYTES)); + } + }) + .flatMap(batch -> sender.sendMessages(batch))).verifyComplete(); + + CountDownLatch processedFound = new CountDownLatch(messageCount); + spanProcessor.notifyIfCondition(processedFound, span -> span.getName().equals("ServiceBus.process")); + + ServiceBusReceiverAsyncClient receiverAutoComplete = new ServiceBusClientBuilder() + .connectionString(getConnectionString()) + .receiver() + .queueName(getQueueName(0)) + .buildAsyncClient(); + + StepVerifier.create( + receiverAutoComplete.receiveMessages() + .take(messageCount) + .doOnNext(msg -> { + if (Span.current().getSpanContext().isValid()) { + String traceparent = (String) msg.getApplicationProperties().get("traceparent"); + String traceId = Span.current().getSpanContext().getTraceId(); + + // context created for the message and current are the same + assertTrue(traceparent.startsWith("00-" + traceId)); + assertFalse(((ReadableSpan) Span.current()).hasEnded()); + } + }) + .parallel(messageCount, 1) + .runOn(Schedulers.boundedElastic(), 1)) + .expectNextCount(messageCount) + .verifyComplete(); + + assertTrue(processedFound.await(20, TimeUnit.SECONDS)); + + List spans = spanProcessor.getEndedSpans(); + List processed = findSpans(spans, "ServiceBus.process"); + List completed = findSpans(spans, "ServiceBus.complete"); + + assertEquals(messageCount, processed.size()); + assertEquals(messageCount, completed.size()); + for (ReadableSpan c : completed) { + assertParentFound(c, processed); + } } @Test @@ -189,7 +337,7 @@ public void sendReceiveRenewLockAndDeferSync() { StepVerifier.create(sender.sendMessage(new ServiceBusMessage(CONTENTS_BYTES))) .verifyComplete(); - ServiceBusReceivedMessage receivedMessage = receiverSync.receiveMessages(1, Duration.ofSeconds(1)).stream().findFirst().get(); + ServiceBusReceivedMessage receivedMessage = receiverSync.receiveMessages(1, Duration.ofSeconds(10)).stream().findFirst().get(); receiverSync.renewMessageLock(receivedMessage); receiverSync.defer(receivedMessage, new DeferOptions()); @@ -283,7 +431,7 @@ public void sendAndProcess() throws InterruptedException { String message1SpanId = message.getApplicationProperties().get("traceparent").toString().substring(36, 52); CountDownLatch completedFound = new CountDownLatch(1); spanProcessor.notifyIfCondition(completedFound, span -> { - if (span.getName() != "ServiceBus.complete") { + if (!span.getName().equals("ServiceBus.complete")) { return false; } List links = span.toSpanData().getLinks(); @@ -308,7 +456,7 @@ public void sendAndProcess() throws InterruptedException { .buildProcessorClient(); processor.start(); - assertTrue(completedFound.await(10, TimeUnit.SECONDS)); + assertTrue(completedFound.await(20, TimeUnit.SECONDS)); processor.stop(); assertTrue(currentInProcess.get().getSpanContext().isValid()); @@ -331,7 +479,99 @@ public void sendAndProcess() throws InterruptedException { .collect(Collectors.toList()); assertEquals(1, completed.size()); assertSendSpan(completed.get(0), Collections.singletonList(message), "ServiceBus.complete"); - assertParent(completed.get(0), processed.get(0)); + assertParentFound(completed.get(0), processed); + } + + @Test + public void sendAndProcessParallel() throws InterruptedException { + StepVerifier.create(sender.createMessageBatch() + .doOnNext(batch -> { + for (int i = 0; i < 10; i++) { + batch.tryAddMessage(new ServiceBusMessage(CONTENTS_BYTES) + .setMessageId(UUID.randomUUID().toString())); + } + }) + .flatMap(batch -> sender.sendMessages(batch))) + .verifyComplete(); + + CountDownLatch processedFound = new CountDownLatch(10); + spanProcessor.notifyIfCondition(processedFound, span -> span.getName().equals("ServiceBus.process")); + + processor = new ServiceBusClientBuilder() + .connectionString(getConnectionString()) + .processor() + .queueName(getQueueName(0)) + .maxConcurrentCalls(10) + .processMessage(mc -> { + String traceparent = (String) mc.getMessage().getApplicationProperties().get("traceparent"); + String traceId = Span.current().getSpanContext().getTraceId(); + + // context created for the message and current are the same + assertTrue(traceparent.startsWith("00-" + traceId)); + assertFalse(((ReadableSpan) Span.current()).hasEnded()); + }) + .processError(e -> fail("unexpected error", e.getException())) + .buildProcessorClient(); + + processor.start(); + assertTrue(processedFound.await(10, TimeUnit.SECONDS)); + processor.stop(); + + List spans = spanProcessor.getEndedSpans(); + List processed = findSpans(spans, "ServiceBus.process"); + List completed = findSpans(spans, "ServiceBus.complete"); + + assertEquals(10, processed.size()); + assertEquals(10, completed.size()); + for (ReadableSpan c : completed) { + assertParentFound(c, processed); + } + } + + @Test + public void sendAndProcessParallelNoAutoComplete() throws InterruptedException { + int messageCount = 5; + StepVerifier.create(sender.createMessageBatch() + .doOnNext(batch -> { + for (int i = 0; i < messageCount; i++) { + batch.tryAddMessage(new ServiceBusMessage(CONTENTS_BYTES)); + } + }) + .flatMap(batch -> sender.sendMessages(batch))).verifyComplete(); + + CountDownLatch completedFound = new CountDownLatch(messageCount); + spanProcessor.notifyIfCondition(completedFound, span -> span.getName().equals("ServiceBus.complete")); + + processor = new ServiceBusClientBuilder() + .connectionString(getConnectionString()) + .processor() + .queueName(getQueueName(0)) + .maxConcurrentCalls(messageCount) + .disableAutoComplete() + .processMessage(mc -> { + String traceparent = (String) mc.getMessage().getApplicationProperties().get("traceparent"); + String traceId = Span.current().getSpanContext().getTraceId(); + + // context created for the message and current are the same + assertTrue(traceparent.startsWith("00-" + traceId)); + mc.complete(); + }) + .processError(e -> fail("unexpected error", e.getException())) + .buildProcessorClient(); + + processor.start(); + assertTrue(completedFound.await(20, TimeUnit.SECONDS)); + processor.stop(); + + List spans = spanProcessor.getEndedSpans(); + List processed = findSpans(spans, "ServiceBus.process"); + List completed = findSpans(spans, "ServiceBus.complete"); + + assertTrue(messageCount <= processed.size()); + assertTrue(messageCount <= completed.size()); + for (ReadableSpan c : completed) { + assertParentFound(c, processed); + } } @Test @@ -344,6 +584,7 @@ public void sendProcessAndFail() throws InterruptedException { .verifyComplete(); String message1SpanId = message.getApplicationProperties().get("traceparent").toString().substring(36, 52); + CountDownLatch messageProcessed = new CountDownLatch(1); spanProcessor.notifyIfCondition(messageProcessed, span -> span.getName() == "ServiceBus.process" && span.getParentSpanContext().getSpanId().equals(message1SpanId)); @@ -367,7 +608,6 @@ public void sendProcessAndFail() throws InterruptedException { processor.stop(); List spans = spanProcessor.getEndedSpans(); - //assertEquals(0, findSpans(spans, "ServiceBus.consume").size()); List processed = findSpans(spans, "ServiceBus.process") .stream().filter(p -> p.getParentSpanContext().isValid()) .filter(p -> p.toSpanData().getStatus().getStatusCode() == StatusCode.ERROR) @@ -380,7 +620,7 @@ public void sendProcessAndFail() throws InterruptedException { .collect(Collectors.toList()); assertEquals(1, abandoned.size()); assertSendSpan(abandoned.get(0), Collections.singletonList(message), "ServiceBus.abandon"); - assertParent(abandoned.get(0), processed.get(0)); + assertParentFound(abandoned.get(0), processed); } @Test @@ -448,6 +688,22 @@ private void assertParent(ReadableSpan child, ReadableSpan parent) { assertEquals(child.getParentSpanContext().getSpanId(), parent.getSpanContext().getSpanId()); } + private void assertParentFound(ReadableSpan child, List possibleParents) { + boolean hasParentInProcessed = false; + for (ReadableSpan p : possibleParents) { + hasParentInProcessed |= + child.getParentSpanContext().getTraceId().equals(p.getSpanContext().getTraceId()) + && child.getParentSpanContext().getSpanId().equals(p.getSpanContext().getSpanId()); + if (hasParentInProcessed) { + // TODO (limolkova) apparently we complete ahead of time + // assertTrue(p.getLatencyNanos() >= child.getLatencyNanos()); + break; + } + } + + assertTrue(hasParentInProcessed); + } + private List findSpans(List spans, String spanName) { return spans.stream() .filter(s -> s.getName().equals(spanName))