Skip to content

Commit

Permalink
ServiceBus tracing: fix current context when processing concurrent re…
Browse files Browse the repository at this point in the history
…quests (#31993)

* Allow to clear otel context when no span is passed to makeSpanCurrent
  • Loading branch information
lmolkova authored Nov 9, 2022
1 parent a3facc4 commit 3ef6d6e
Show file tree
Hide file tree
Showing 9 changed files with 365 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.junit.jupiter.api.parallel.Isolated;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -172,6 +173,35 @@ public void sendAndReceive() throws InterruptedException {
assertConsumerSpan(received.get(0), receivedMessage.get(), "EventHubs.consume");
}

@Test
public void sendAndReceiveParallel() throws InterruptedException {
int messageCount = 5;
CountDownLatch latch = new CountDownLatch(messageCount);
spanProcessor.notifyIfCondition(latch, span -> span.getName().equals("EventHubs.consume"));
StepVerifier.create(consumer
.receive()
.take(messageCount)
.doOnNext(pe -> {
String traceparent = (String) pe.getData().getProperties().get("traceparent");
String traceId = Span.current().getSpanContext().getTraceId();

// context created for the message and current are the same
assertTrue(traceparent.startsWith("00-" + traceId));
assertFalse(((ReadableSpan) Span.current()).hasEnded());
})
.parallel(messageCount, 1)
.runOn(Schedulers.boundedElastic(), 2))
.expectNextCount(messageCount)
.verifyComplete();

StepVerifier.create(producer.send(data, new SendOptions())).verifyComplete();

assertTrue(latch.await(20, TimeUnit.SECONDS));
List<ReadableSpan> spans = spanProcessor.getEndedSpans();
List<ReadableSpan> received = findSpans(spans, "EventHubs.consume");
assertTrue(messageCount <= received.size());
}

@Test
public void sendBuffered() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ protected void hookOnSubscribe(Subscription subscription) {
protected void hookOnNext(ServiceBusMessageContext message) {
Throwable exception = null;
Context span = instrumentation.instrumentProcess("ServiceBus.process", message.getMessage(), Context.NONE);
AutoCloseable scope = tracer.makeSpanCurrent(span);
message.getMessage().setContext(span);

try {
downstream.onNext(message);
Expand All @@ -72,7 +72,7 @@ protected void hookOnNext(ServiceBusMessageContext message) {
exception = (Throwable) processorException;
}
}
tracer.endSpan(exception, span, scope);
tracer.endSpan(exception, context, null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusProcessorClientBuilder;
import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder;
import com.azure.messaging.servicebus.implementation.ServiceBusProcessorClientOptions;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
Expand All @@ -19,6 +20,8 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static com.azure.messaging.servicebus.FluxTrace.PROCESS_ERROR_KEY;

/**
* The processor client for processing Service Bus messages. {@link ServiceBusProcessorClient} provides a push-based
* mechanism that invokes the message processing callback when a message is received or the error handler when an error
Expand Down Expand Up @@ -158,6 +161,7 @@ public final class ServiceBusProcessorClient implements AutoCloseable {
private final String queueName;
private final String topicName;
private final String subscriptionName;
private final ServiceBusTracer tracer;
private Disposable monitorDisposable;

/**
Expand All @@ -182,11 +186,13 @@ public final class ServiceBusProcessorClient implements AutoCloseable {
this.processError = Objects.requireNonNull(processError, "'processError' cannot be null");
this.processorOptions = Objects.requireNonNull(processorOptions, "'processorOptions' cannot be null");

this.asyncClient.set(sessionReceiverBuilder.buildAsyncClientForProcessor());
ServiceBusReceiverAsyncClient client = sessionReceiverBuilder.buildAsyncClientForProcessor();
this.asyncClient.set(client);
this.receiverBuilder = null;
this.queueName = queueName;
this.topicName = topicName;
this.subscriptionName = subscriptionName;
this.tracer = client.getInstrumentation().getTracer();
}

/**
Expand All @@ -208,11 +214,14 @@ public final class ServiceBusProcessorClient implements AutoCloseable {
this.processMessage = Objects.requireNonNull(processMessage, "'processMessage' cannot be null");
this.processError = Objects.requireNonNull(processError, "'processError' cannot be null");
this.processorOptions = Objects.requireNonNull(processorOptions, "'processorOptions' cannot be null");
this.asyncClient.set(receiverBuilder.buildAsyncClient());

ServiceBusReceiverAsyncClient client = receiverBuilder.buildAsyncClient();
this.asyncClient.set(client);
this.sessionReceiverBuilder = null;
this.queueName = queueName;
this.topicName = topicName;
this.subscriptionName = subscriptionName;
this.tracer = client.getInstrumentation().getTracer();
}

/**
Expand Down Expand Up @@ -360,29 +369,35 @@ public void onSubscribe(Subscription subscription) {
subscription.request(1);
}

@SuppressWarnings("try")
@Override
public void onNext(ServiceBusMessageContext serviceBusMessageContext) {
if (serviceBusMessageContext.hasError()) {
handleError(serviceBusMessageContext.getThrowable());
} else {
try {
try (AutoCloseable scope = tracer.makeSpanCurrent(serviceBusMessageContext.getMessage().getContext())) {
if (serviceBusMessageContext.hasError()) {
handleError(serviceBusMessageContext.getThrowable());
} else {
ServiceBusReceivedMessageContext serviceBusReceivedMessageContext =
new ServiceBusReceivedMessageContext(receiverClient, serviceBusMessageContext);

processMessage.accept(serviceBusReceivedMessageContext);
} catch (Exception ex) {
serviceBusMessageContext.getMessage().addContext(FluxTrace.PROCESS_ERROR_KEY, ex);
handleError(new ServiceBusException(ex, ServiceBusErrorSource.USER_CALLBACK));

if (!processorOptions.isDisableAutoComplete()) {
LOGGER.warning("Error when processing message. Abandoning message.", ex);
abandonMessage(serviceBusMessageContext, receiverClient);
try {
processMessage.accept(serviceBusReceivedMessageContext);
} catch (Exception ex) {
serviceBusMessageContext.getMessage().setContext(
serviceBusMessageContext.getMessage().getContext().addData(PROCESS_ERROR_KEY, ex));
handleError(new ServiceBusException(ex, ServiceBusErrorSource.USER_CALLBACK));

if (!processorOptions.isDisableAutoComplete()) {
LOGGER.warning("Error when processing message. Abandoning message.", ex);
abandonMessage(serviceBusMessageContext, receiverClient);
}
}
}
}
if (isRunning.get()) {
LOGGER.verbose("Requesting 1 more message from upstream");
subscription.request(1);
if (isRunning.get()) {
LOGGER.verbose("Requesting 1 more message from upstream");
subscription.request(1);
}
} catch (Exception e) {
LOGGER.verbose("Error disposing scope", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,33 +518,29 @@ public String getTo() {
}

/**
* Adds a new key value pair to the existing context on Message.
* Sets context on the message.
*
* @param key The key for this context object
* @param value The value for this context object.
* @param context Context to set.
*
* @return The updated {@link ServiceBusMessage}.
* @throws NullPointerException if {@code key} or {@code value} is null.
*/
ServiceBusReceivedMessage addContext(String key, Object value) {
Objects.requireNonNull(key, "The 'key' parameter cannot be null.");
Objects.requireNonNull(value, "The 'value' parameter cannot be null.");
this.context = context.addData(key, value);
ServiceBusReceivedMessage setContext(Context context) {
Objects.requireNonNull(context, "The 'context' parameter cannot be null.");
this.context = context;
return this;
}

/**
* Adds a new key value pair to the existing context on Message.
*
* @param key The key for this context object
* @param value The value for this context object.
* Gets context associated with the message.
*
* @return The updated {@link ServiceBusMessage}.
* @throws NullPointerException if {@code key} or {@code value} is null.
*/
Context getContext() {
return this.context;
}

/**
* Gets whether the message has been settled.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -823,15 +823,20 @@ public Flux<ServiceBusReceivedMessage> receiveMessages() {
return receiveMessagesNoBackPressure().limitRate(1, 0);
}

@SuppressWarnings("try")
Flux<ServiceBusReceivedMessage> receiveMessagesNoBackPressure() {
return receiveMessagesWithContext(0)
.handle((serviceBusMessageContext, sink) -> {
if (serviceBusMessageContext.hasError()) {
sink.error(serviceBusMessageContext.getThrowable());
return;
}
sink.next(serviceBusMessageContext.getMessage());
});
.handle((serviceBusMessageContext, sink) -> {
try (AutoCloseable scope = tracer.makeSpanCurrent(serviceBusMessageContext.getMessage().getContext())) {
if (serviceBusMessageContext.hasError()) {
sink.error(serviceBusMessageContext.getThrowable());
return;
}
sink.next(serviceBusMessageContext.getMessage());
} catch (Exception ex) {
LOGGER.verbose("Error disposing scope", ex);
}
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public <T> Mono<T> instrumentSettlement(Mono<T> publisher, ServiceBusReceivedMes
.contextWrite(ctx -> {
startTime.set(Instant.now().toEpochMilli());
return ctx.put(REACTOR_PARENT_TRACE_CONTEXT_KEY, tracer.startSpanWithLink(getSettlementSpanName(status), message,
messageContext, Context.NONE));
messageContext, messageContext));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public class ServiceBusProcessorTest {

private static final String NAMESPACE = "namespace";
private static final String ENTITY_NAME = "entity";
private static final ServiceBusReceiverInstrumentation DEFAULT_INSTRUMENTATION =
new ServiceBusReceiverInstrumentation(null, null, NAMESPACE, ENTITY_NAME, "subscription", false);

/**
* Tests receiving messages using a {@link ServiceBusProcessorClient}.
Expand Down Expand Up @@ -285,6 +287,7 @@ public void testUserMessageHandlerError() throws InterruptedException {
when(asyncClient.abandon(any(ServiceBusReceivedMessage.class))).thenReturn(Mono.empty());
when(asyncClient.getFullyQualifiedNamespace()).thenReturn(NAMESPACE);
when(asyncClient.getEntityPath()).thenReturn(ENTITY_NAME);
when(asyncClient.getInstrumentation()).thenReturn(DEFAULT_INSTRUMENTATION);
doNothing().when(asyncClient).close();

final AtomicInteger messageId = new AtomicInteger();
Expand Down Expand Up @@ -339,6 +342,7 @@ public void testUserMessageHandlerErrorWithAutoCompleteDisabled() throws Interru
when(asyncClient.isConnectionClosed()).thenReturn(false);
when(asyncClient.getFullyQualifiedNamespace()).thenReturn(NAMESPACE);
when(asyncClient.getEntityPath()).thenReturn(ENTITY_NAME);
when(asyncClient.getInstrumentation()).thenReturn(DEFAULT_INSTRUMENTATION);

doNothing().when(asyncClient).close();

Expand Down Expand Up @@ -495,6 +499,7 @@ private ServiceBusClientBuilder.ServiceBusReceiverClientBuilder getBuilder(
ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation(tracer, null, NAMESPACE, ENTITY_NAME, null, false);
when(asyncClient.receiveMessagesWithContext()).thenReturn(
new FluxTrace(messageFlux, instrumentation).publishOn(Schedulers.boundedElastic()));
when(asyncClient.getInstrumentation()).thenReturn(instrumentation);
when(asyncClient.isConnectionClosed()).thenReturn(false);
doNothing().when(asyncClient).close();
return receiverBuilder;
Expand All @@ -512,6 +517,7 @@ private ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder getSessio
when(receiverBuilder.buildAsyncClientForProcessor()).thenReturn(asyncClient);
when(asyncClient.receiveMessagesWithContext()).thenReturn(messageFlux);
when(asyncClient.isConnectionClosed()).thenReturn(false);
when(asyncClient.getInstrumentation()).thenReturn(DEFAULT_INSTRUMENTATION);
doNothing().when(asyncClient).close();
return receiverBuilder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1367,6 +1367,7 @@ void settlementMessagesReportsMetrics(DispositionStatus status) {
connectionProcessor, CLEANUP_INTERVAL, instrumentation, messageSerializer, onClientClose, CLIENT_IDENTIFIER);
when(receivedMessage.getLockToken()).thenReturn("mylockToken");
when(receivedMessage.getSequenceNumber()).thenReturn(42L);
when(receivedMessage.getContext()).thenReturn(Context.NONE);
when(managementNode.updateDisposition(any(), any(), isNull(), isNull(), isNull(),
isNull(), isNull(), isNull())).thenReturn(Mono.empty());

Expand Down Expand Up @@ -1459,9 +1460,11 @@ void receiveWithTracesAndMetrics() {

when(receivedMessage.getLockToken()).thenReturn("mylockToken");
when(receivedMessage.getSequenceNumber()).thenReturn(42L);
when(receivedMessage.getContext()).thenReturn(Context.NONE);

when(receivedMessage2.getLockToken()).thenReturn("mylockToken");
when(receivedMessage2.getSequenceNumber()).thenReturn(43L);
when(receivedMessage2.getContext()).thenReturn(Context.NONE);

when(messageSerializer.deserialize(any(Message.class), eq(ServiceBusReceivedMessage.class)))
.thenReturn(receivedMessage, receivedMessage2);
Expand Down
Loading

0 comments on commit 3ef6d6e

Please sign in to comment.