Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ServiceBus tracing: fix current context when processing concurrent requests #31993

Merged
merged 4 commits into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.junit.jupiter.api.parallel.Isolated;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -172,6 +173,35 @@ public void sendAndReceive() throws InterruptedException {
assertConsumerSpan(received.get(0), receivedMessage.get(), "EventHubs.consume");
}

@Test
public void sendAndReceiveParallel() throws InterruptedException {
int messageCount = 5;
CountDownLatch latch = new CountDownLatch(messageCount);
spanProcessor.notifyIfCondition(latch, span -> span.getName().equals("EventHubs.consume"));
StepVerifier.create(consumer
.receive()
.take(messageCount)
.doOnNext(pe -> {
String traceparent = (String) pe.getData().getProperties().get("traceparent");
String traceId = Span.current().getSpanContext().getTraceId();

// context created for the message and current are the same
assertTrue(traceparent.startsWith("00-" + traceId));
assertFalse(((ReadableSpan) Span.current()).hasEnded());
})
.parallel(messageCount, 1)
.runOn(Schedulers.boundedElastic(), 2))
.expectNextCount(messageCount)
.verifyComplete();

StepVerifier.create(producer.send(data, new SendOptions())).verifyComplete();

assertTrue(latch.await(20, TimeUnit.SECONDS));
List<ReadableSpan> spans = spanProcessor.getEndedSpans();
List<ReadableSpan> received = findSpans(spans, "EventHubs.consume");
assertTrue(messageCount <= received.size());
}

@Test
public void sendBuffered() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ protected void hookOnSubscribe(Subscription subscription) {
protected void hookOnNext(ServiceBusMessageContext message) {
Throwable exception = null;
Context span = instrumentation.instrumentProcess("ServiceBus.process", message.getMessage(), Context.NONE);
AutoCloseable scope = tracer.makeSpanCurrent(span);
message.getMessage().setContext(span);

try {
downstream.onNext(message);
Expand All @@ -72,7 +72,7 @@ protected void hookOnNext(ServiceBusMessageContext message) {
exception = (Throwable) processorException;
}
}
tracer.endSpan(exception, span, scope);
tracer.endSpan(exception, context, null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusProcessorClientBuilder;
import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder;
import com.azure.messaging.servicebus.implementation.ServiceBusProcessorClientOptions;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
Expand All @@ -19,6 +20,8 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static com.azure.messaging.servicebus.FluxTrace.PROCESS_ERROR_KEY;

/**
* The processor client for processing Service Bus messages. {@link ServiceBusProcessorClient} provides a push-based
* mechanism that invokes the message processing callback when a message is received or the error handler when an error
Expand Down Expand Up @@ -158,6 +161,7 @@ public final class ServiceBusProcessorClient implements AutoCloseable {
private final String queueName;
private final String topicName;
private final String subscriptionName;
private final ServiceBusTracer tracer;
private Disposable monitorDisposable;

/**
Expand All @@ -182,11 +186,13 @@ public final class ServiceBusProcessorClient implements AutoCloseable {
this.processError = Objects.requireNonNull(processError, "'processError' cannot be null");
this.processorOptions = Objects.requireNonNull(processorOptions, "'processorOptions' cannot be null");

this.asyncClient.set(sessionReceiverBuilder.buildAsyncClientForProcessor());
ServiceBusReceiverAsyncClient client = sessionReceiverBuilder.buildAsyncClientForProcessor();
this.asyncClient.set(client);
this.receiverBuilder = null;
this.queueName = queueName;
this.topicName = topicName;
this.subscriptionName = subscriptionName;
this.tracer = client.getInstrumentation().getTracer();
}

/**
Expand All @@ -208,11 +214,14 @@ public final class ServiceBusProcessorClient implements AutoCloseable {
this.processMessage = Objects.requireNonNull(processMessage, "'processMessage' cannot be null");
this.processError = Objects.requireNonNull(processError, "'processError' cannot be null");
this.processorOptions = Objects.requireNonNull(processorOptions, "'processorOptions' cannot be null");
this.asyncClient.set(receiverBuilder.buildAsyncClient());

ServiceBusReceiverAsyncClient client = receiverBuilder.buildAsyncClient();
this.asyncClient.set(client);
this.sessionReceiverBuilder = null;
this.queueName = queueName;
this.topicName = topicName;
this.subscriptionName = subscriptionName;
this.tracer = client.getInstrumentation().getTracer();
}

/**
Expand Down Expand Up @@ -360,29 +369,35 @@ public void onSubscribe(Subscription subscription) {
subscription.request(1);
}

@SuppressWarnings("try")
@Override
public void onNext(ServiceBusMessageContext serviceBusMessageContext) {
if (serviceBusMessageContext.hasError()) {
handleError(serviceBusMessageContext.getThrowable());
} else {
try {
try (AutoCloseable scope = tracer.makeSpanCurrent(serviceBusMessageContext.getMessage().getContext())) {
if (serviceBusMessageContext.hasError()) {
handleError(serviceBusMessageContext.getThrowable());
} else {
ServiceBusReceivedMessageContext serviceBusReceivedMessageContext =
new ServiceBusReceivedMessageContext(receiverClient, serviceBusMessageContext);

processMessage.accept(serviceBusReceivedMessageContext);
} catch (Exception ex) {
serviceBusMessageContext.getMessage().addContext(FluxTrace.PROCESS_ERROR_KEY, ex);
handleError(new ServiceBusException(ex, ServiceBusErrorSource.USER_CALLBACK));

if (!processorOptions.isDisableAutoComplete()) {
LOGGER.warning("Error when processing message. Abandoning message.", ex);
abandonMessage(serviceBusMessageContext, receiverClient);
try {
processMessage.accept(serviceBusReceivedMessageContext);
} catch (Exception ex) {
serviceBusMessageContext.getMessage().setContext(
serviceBusMessageContext.getMessage().getContext().addData(PROCESS_ERROR_KEY, ex));
handleError(new ServiceBusException(ex, ServiceBusErrorSource.USER_CALLBACK));

if (!processorOptions.isDisableAutoComplete()) {
LOGGER.warning("Error when processing message. Abandoning message.", ex);
abandonMessage(serviceBusMessageContext, receiverClient);
}
}
}
}
if (isRunning.get()) {
LOGGER.verbose("Requesting 1 more message from upstream");
subscription.request(1);
if (isRunning.get()) {
LOGGER.verbose("Requesting 1 more message from upstream");
subscription.request(1);
}
} catch (Exception e) {
LOGGER.verbose("Error disposing scope", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,33 +518,29 @@ public String getTo() {
}

/**
* Adds a new key value pair to the existing context on Message.
* Sets context on the message.
*
* @param key The key for this context object
* @param value The value for this context object.
* @param context Context to set.
*
* @return The updated {@link ServiceBusMessage}.
* @throws NullPointerException if {@code key} or {@code value} is null.
*/
ServiceBusReceivedMessage addContext(String key, Object value) {
Objects.requireNonNull(key, "The 'key' parameter cannot be null.");
Objects.requireNonNull(value, "The 'value' parameter cannot be null.");
this.context = context.addData(key, value);
ServiceBusReceivedMessage setContext(Context context) {
Objects.requireNonNull(context, "The 'context' parameter cannot be null.");
this.context = context;
return this;
}

/**
* Adds a new key value pair to the existing context on Message.
*
* @param key The key for this context object
* @param value The value for this context object.
* Gets context associated with the message.
*
* @return The updated {@link ServiceBusMessage}.
* @throws NullPointerException if {@code key} or {@code value} is null.
*/
Context getContext() {
return this.context;
}

/**
* Gets whether the message has been settled.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -823,15 +823,20 @@ public Flux<ServiceBusReceivedMessage> receiveMessages() {
return receiveMessagesNoBackPressure().limitRate(1, 0);
}

@SuppressWarnings("try")
Flux<ServiceBusReceivedMessage> receiveMessagesNoBackPressure() {
return receiveMessagesWithContext(0)
.handle((serviceBusMessageContext, sink) -> {
if (serviceBusMessageContext.hasError()) {
sink.error(serviceBusMessageContext.getThrowable());
return;
}
sink.next(serviceBusMessageContext.getMessage());
});
.handle((serviceBusMessageContext, sink) -> {
try (AutoCloseable scope = tracer.makeSpanCurrent(serviceBusMessageContext.getMessage().getContext())) {
if (serviceBusMessageContext.hasError()) {
sink.error(serviceBusMessageContext.getThrowable());
return;
}
sink.next(serviceBusMessageContext.getMessage());
} catch (Exception ex) {
LOGGER.verbose("Error disposing scope", ex);
}
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public <T> Mono<T> instrumentSettlement(Mono<T> publisher, ServiceBusReceivedMes
.contextWrite(ctx -> {
startTime.set(Instant.now().toEpochMilli());
return ctx.put(REACTOR_PARENT_TRACE_CONTEXT_KEY, tracer.startSpanWithLink(getSettlementSpanName(status), message,
messageContext, Context.NONE));
messageContext, messageContext));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public class ServiceBusProcessorTest {

private static final String NAMESPACE = "namespace";
private static final String ENTITY_NAME = "entity";
private static final ServiceBusReceiverInstrumentation DEFAULT_INSTRUMENTATION =
new ServiceBusReceiverInstrumentation(null, null, NAMESPACE, ENTITY_NAME, "subscription", false);

/**
* Tests receiving messages using a {@link ServiceBusProcessorClient}.
Expand Down Expand Up @@ -285,6 +287,7 @@ public void testUserMessageHandlerError() throws InterruptedException {
when(asyncClient.abandon(any(ServiceBusReceivedMessage.class))).thenReturn(Mono.empty());
when(asyncClient.getFullyQualifiedNamespace()).thenReturn(NAMESPACE);
when(asyncClient.getEntityPath()).thenReturn(ENTITY_NAME);
when(asyncClient.getInstrumentation()).thenReturn(DEFAULT_INSTRUMENTATION);
doNothing().when(asyncClient).close();

final AtomicInteger messageId = new AtomicInteger();
Expand Down Expand Up @@ -339,6 +342,7 @@ public void testUserMessageHandlerErrorWithAutoCompleteDisabled() throws Interru
when(asyncClient.isConnectionClosed()).thenReturn(false);
when(asyncClient.getFullyQualifiedNamespace()).thenReturn(NAMESPACE);
when(asyncClient.getEntityPath()).thenReturn(ENTITY_NAME);
when(asyncClient.getInstrumentation()).thenReturn(DEFAULT_INSTRUMENTATION);

doNothing().when(asyncClient).close();

Expand Down Expand Up @@ -495,6 +499,7 @@ private ServiceBusClientBuilder.ServiceBusReceiverClientBuilder getBuilder(
ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation(tracer, null, NAMESPACE, ENTITY_NAME, null, false);
when(asyncClient.receiveMessagesWithContext()).thenReturn(
new FluxTrace(messageFlux, instrumentation).publishOn(Schedulers.boundedElastic()));
when(asyncClient.getInstrumentation()).thenReturn(instrumentation);
when(asyncClient.isConnectionClosed()).thenReturn(false);
doNothing().when(asyncClient).close();
return receiverBuilder;
Expand All @@ -512,6 +517,7 @@ private ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder getSessio
when(receiverBuilder.buildAsyncClientForProcessor()).thenReturn(asyncClient);
when(asyncClient.receiveMessagesWithContext()).thenReturn(messageFlux);
when(asyncClient.isConnectionClosed()).thenReturn(false);
when(asyncClient.getInstrumentation()).thenReturn(DEFAULT_INSTRUMENTATION);
doNothing().when(asyncClient).close();
return receiverBuilder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1367,6 +1367,7 @@ void settlementMessagesReportsMetrics(DispositionStatus status) {
connectionProcessor, CLEANUP_INTERVAL, instrumentation, messageSerializer, onClientClose, CLIENT_IDENTIFIER);
when(receivedMessage.getLockToken()).thenReturn("mylockToken");
when(receivedMessage.getSequenceNumber()).thenReturn(42L);
when(receivedMessage.getContext()).thenReturn(Context.NONE);
when(managementNode.updateDisposition(any(), any(), isNull(), isNull(), isNull(),
isNull(), isNull(), isNull())).thenReturn(Mono.empty());

Expand Down Expand Up @@ -1459,9 +1460,11 @@ void receiveWithTracesAndMetrics() {

when(receivedMessage.getLockToken()).thenReturn("mylockToken");
when(receivedMessage.getSequenceNumber()).thenReturn(42L);
when(receivedMessage.getContext()).thenReturn(Context.NONE);

when(receivedMessage2.getLockToken()).thenReturn("mylockToken");
when(receivedMessage2.getSequenceNumber()).thenReturn(43L);
when(receivedMessage2.getContext()).thenReturn(Context.NONE);

when(messageSerializer.deserialize(any(Message.class), eq(ServiceBusReceivedMessage.class)))
.thenReturn(receivedMessage, receivedMessage2);
Expand Down
Loading