Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
lmolkova committed Nov 8, 2022
1 parent d76fdd8 commit de2835b
Show file tree
Hide file tree
Showing 11 changed files with 369 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,10 @@ public AutoCloseable makeSpanCurrent(Context context) {
return NOOP_CLOSEABLE;
}

io.opentelemetry.context.Context traceContext = getTraceContextOrDefault(context, io.opentelemetry.context.Context.root());
io.opentelemetry.context.Context traceContext = getTraceContextOrDefault(context, null);
if (traceContext == null) {
return NOOP_CLOSEABLE;
}
return traceContext.makeCurrent();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,18 +520,6 @@ public void startEndCurrentSpan() {
}
}

@Test
public void startCurrentSpanNoContext() {
try (Scope parentScope = parentSpan.makeCurrent()) {
try (AutoCloseable scope = openTelemetryTracer.makeSpanCurrent(Context.NONE)) {
assertFalse(Span.current().getSpanContext().isValid());
} catch (Exception e) {
fail();
}
assertSame(parentSpan, Span.current());
}
}

@Test
@SuppressWarnings("deprecation")
public void startEndCurrentSpanBackwardCompatible() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.junit.jupiter.api.parallel.Isolated;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -172,6 +173,35 @@ public void sendAndReceive() throws InterruptedException {
assertConsumerSpan(received.get(0), receivedMessage.get(), "EventHubs.consume");
}

@Test
public void sendAndReceiveParallel() throws InterruptedException {
int messageCount = 5;
CountDownLatch latch = new CountDownLatch(messageCount);
spanProcessor.notifyIfCondition(latch, span -> span.getName().equals("EventHubs.consume"));
StepVerifier.create(consumer
.receive()
.take(messageCount)
.doOnNext(pe -> {
String traceparent = (String) pe.getData().getProperties().get("traceparent");
String traceId = Span.current().getSpanContext().getTraceId();

// context created for the message and current are the same
assertTrue(traceparent.startsWith("00-" + traceId));
assertFalse(((ReadableSpan) Span.current()).hasEnded());
})
.parallel(messageCount, 1)
.runOn(Schedulers.boundedElastic(), 2))
.expectNextCount(messageCount)
.verifyComplete();

StepVerifier.create(producer.send(data, new SendOptions())).verifyComplete();

assertTrue(latch.await(20, TimeUnit.SECONDS));
List<ReadableSpan> spans = spanProcessor.getEndedSpans();
List<ReadableSpan> received = findSpans(spans, "EventHubs.consume");
assertTrue(messageCount <= received.size());
}

@Test
public void sendBuffered() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ protected void hookOnSubscribe(Subscription subscription) {
protected void hookOnNext(ServiceBusMessageContext message) {
Throwable exception = null;
Context span = instrumentation.instrumentProcess("ServiceBus.process", message.getMessage(), Context.NONE);
AutoCloseable scope = tracer.makeSpanCurrent(span);
message.getMessage().setContext(span);

try {
downstream.onNext(message);
Expand All @@ -72,7 +72,7 @@ protected void hookOnNext(ServiceBusMessageContext message) {
exception = (Throwable) processorException;
}
}
tracer.endSpan(exception, span, scope);
tracer.endSpan(exception, context, null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusProcessorClientBuilder;
import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder;
import com.azure.messaging.servicebus.implementation.ServiceBusProcessorClientOptions;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
Expand All @@ -19,6 +20,8 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static com.azure.messaging.servicebus.FluxTrace.PROCESS_ERROR_KEY;

/**
* The processor client for processing Service Bus messages. {@link ServiceBusProcessorClient} provides a push-based
* mechanism that invokes the message processing callback when a message is received or the error handler when an error
Expand Down Expand Up @@ -158,6 +161,7 @@ public final class ServiceBusProcessorClient implements AutoCloseable {
private final String queueName;
private final String topicName;
private final String subscriptionName;
private final ServiceBusTracer tracer;
private Disposable monitorDisposable;

/**
Expand All @@ -182,11 +186,13 @@ public final class ServiceBusProcessorClient implements AutoCloseable {
this.processError = Objects.requireNonNull(processError, "'processError' cannot be null");
this.processorOptions = Objects.requireNonNull(processorOptions, "'processorOptions' cannot be null");

this.asyncClient.set(sessionReceiverBuilder.buildAsyncClientForProcessor());
ServiceBusReceiverAsyncClient client = sessionReceiverBuilder.buildAsyncClientForProcessor();
this.asyncClient.set(client);
this.receiverBuilder = null;
this.queueName = queueName;
this.topicName = topicName;
this.subscriptionName = subscriptionName;
this.tracer = client.getInstrumentation().getTracer();
}

/**
Expand All @@ -208,11 +214,14 @@ public final class ServiceBusProcessorClient implements AutoCloseable {
this.processMessage = Objects.requireNonNull(processMessage, "'processMessage' cannot be null");
this.processError = Objects.requireNonNull(processError, "'processError' cannot be null");
this.processorOptions = Objects.requireNonNull(processorOptions, "'processorOptions' cannot be null");
this.asyncClient.set(receiverBuilder.buildAsyncClient());

ServiceBusReceiverAsyncClient client = receiverBuilder.buildAsyncClient();
this.asyncClient.set(client);
this.sessionReceiverBuilder = null;
this.queueName = queueName;
this.topicName = topicName;
this.subscriptionName = subscriptionName;
this.tracer = client.getInstrumentation().getTracer();
}

/**
Expand Down Expand Up @@ -360,29 +369,35 @@ public void onSubscribe(Subscription subscription) {
subscription.request(1);
}

@SuppressWarnings("try")
@Override
public void onNext(ServiceBusMessageContext serviceBusMessageContext) {
if (serviceBusMessageContext.hasError()) {
handleError(serviceBusMessageContext.getThrowable());
} else {
try {
try (AutoCloseable scope = tracer.makeSpanCurrent(serviceBusMessageContext.getMessage().getContext())) {
if (serviceBusMessageContext.hasError()) {
handleError(serviceBusMessageContext.getThrowable());
} else {
ServiceBusReceivedMessageContext serviceBusReceivedMessageContext =
new ServiceBusReceivedMessageContext(receiverClient, serviceBusMessageContext);

processMessage.accept(serviceBusReceivedMessageContext);
} catch (Exception ex) {
serviceBusMessageContext.getMessage().addContext(FluxTrace.PROCESS_ERROR_KEY, ex);
handleError(new ServiceBusException(ex, ServiceBusErrorSource.USER_CALLBACK));

if (!processorOptions.isDisableAutoComplete()) {
LOGGER.warning("Error when processing message. Abandoning message.", ex);
abandonMessage(serviceBusMessageContext, receiverClient);
try {
processMessage.accept(serviceBusReceivedMessageContext);
} catch (Exception ex) {
serviceBusMessageContext.getMessage().setContext(
serviceBusMessageContext.getMessage().getContext().addData(PROCESS_ERROR_KEY, ex));
handleError(new ServiceBusException(ex, ServiceBusErrorSource.USER_CALLBACK));

if (!processorOptions.isDisableAutoComplete()) {
LOGGER.warning("Error when processing message. Abandoning message.", ex);
abandonMessage(serviceBusMessageContext, receiverClient);
}
}
}
}
if (isRunning.get()) {
LOGGER.verbose("Requesting 1 more message from upstream");
subscription.request(1);
if (isRunning.get()) {
LOGGER.verbose("Requesting 1 more message from upstream");
subscription.request(1);
}
} catch (Exception e) {
LOGGER.verbose("Error disposing scope", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,33 +518,29 @@ public String getTo() {
}

/**
* Adds a new key value pair to the existing context on Message.
* ASets context on the message.
*
* @param key The key for this context object
* @param value The value for this context object.
* @param context Context to set.
*
* @return The updated {@link ServiceBusMessage}.
* @throws NullPointerException if {@code key} or {@code value} is null.
*/
ServiceBusReceivedMessage addContext(String key, Object value) {
Objects.requireNonNull(key, "The 'key' parameter cannot be null.");
Objects.requireNonNull(value, "The 'value' parameter cannot be null.");
this.context = context.addData(key, value);
ServiceBusReceivedMessage setContext(Context context) {
Objects.requireNonNull(context, "The 'context' parameter cannot be null.");
this.context = context;
return this;
}

/**
* Adds a new key value pair to the existing context on Message.
*
* @param key The key for this context object
* @param value The value for this context object.
* Gets context associated with the message.
*
* @return The updated {@link ServiceBusMessage}.
* @throws NullPointerException if {@code key} or {@code value} is null.
*/
Context getContext() {
return this.context;
}

/**
* Gets whether the message has been settled.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -823,15 +823,20 @@ public Flux<ServiceBusReceivedMessage> receiveMessages() {
return receiveMessagesNoBackPressure().limitRate(1, 0);
}

@SuppressWarnings("try")
Flux<ServiceBusReceivedMessage> receiveMessagesNoBackPressure() {
return receiveMessagesWithContext(0)
.handle((serviceBusMessageContext, sink) -> {
if (serviceBusMessageContext.hasError()) {
sink.error(serviceBusMessageContext.getThrowable());
return;
}
sink.next(serviceBusMessageContext.getMessage());
});
.handle((serviceBusMessageContext, sink) -> {
try (AutoCloseable scope = tracer.makeSpanCurrent(serviceBusMessageContext.getMessage().getContext())) {
if (serviceBusMessageContext.hasError()) {
sink.error(serviceBusMessageContext.getThrowable());
return;
}
sink.next(serviceBusMessageContext.getMessage());
} catch (Exception ex) {
LOGGER.verbose("Error disposing scope", ex);
}
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public <T> Mono<T> instrumentSettlement(Mono<T> publisher, ServiceBusReceivedMes
.contextWrite(ctx -> {
startTime.set(Instant.now().toEpochMilli());
return ctx.put(REACTOR_PARENT_TRACE_CONTEXT_KEY, tracer.startSpanWithLink(getSettlementSpanName(status), message,
messageContext, Context.NONE));
messageContext, messageContext));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public class ServiceBusProcessorTest {

private static final String NAMESPACE = "namespace";
private static final String ENTITY_NAME = "entity";
private static final ServiceBusReceiverInstrumentation DEFAULT_INSTRUMENTATION =
new ServiceBusReceiverInstrumentation(null, null, NAMESPACE, ENTITY_NAME, "subscription", false);

/**
* Tests receiving messages using a {@link ServiceBusProcessorClient}.
Expand Down Expand Up @@ -285,6 +287,7 @@ public void testUserMessageHandlerError() throws InterruptedException {
when(asyncClient.abandon(any(ServiceBusReceivedMessage.class))).thenReturn(Mono.empty());
when(asyncClient.getFullyQualifiedNamespace()).thenReturn(NAMESPACE);
when(asyncClient.getEntityPath()).thenReturn(ENTITY_NAME);
when(asyncClient.getInstrumentation()).thenReturn(DEFAULT_INSTRUMENTATION);
doNothing().when(asyncClient).close();

final AtomicInteger messageId = new AtomicInteger();
Expand Down Expand Up @@ -339,6 +342,7 @@ public void testUserMessageHandlerErrorWithAutoCompleteDisabled() throws Interru
when(asyncClient.isConnectionClosed()).thenReturn(false);
when(asyncClient.getFullyQualifiedNamespace()).thenReturn(NAMESPACE);
when(asyncClient.getEntityPath()).thenReturn(ENTITY_NAME);
when(asyncClient.getInstrumentation()).thenReturn(DEFAULT_INSTRUMENTATION);

doNothing().when(asyncClient).close();

Expand Down Expand Up @@ -495,6 +499,7 @@ private ServiceBusClientBuilder.ServiceBusReceiverClientBuilder getBuilder(
ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation(tracer, null, NAMESPACE, ENTITY_NAME, null, false);
when(asyncClient.receiveMessagesWithContext()).thenReturn(
new FluxTrace(messageFlux, instrumentation).publishOn(Schedulers.boundedElastic()));
when(asyncClient.getInstrumentation()).thenReturn(instrumentation);
when(asyncClient.isConnectionClosed()).thenReturn(false);
doNothing().when(asyncClient).close();
return receiverBuilder;
Expand All @@ -512,6 +517,7 @@ private ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder getSessio
when(receiverBuilder.buildAsyncClientForProcessor()).thenReturn(asyncClient);
when(asyncClient.receiveMessagesWithContext()).thenReturn(messageFlux);
when(asyncClient.isConnectionClosed()).thenReturn(false);
when(asyncClient.getInstrumentation()).thenReturn(DEFAULT_INSTRUMENTATION);
doNothing().when(asyncClient).close();
return receiverBuilder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1367,6 +1367,7 @@ void settlementMessagesReportsMetrics(DispositionStatus status) {
connectionProcessor, CLEANUP_INTERVAL, instrumentation, messageSerializer, onClientClose, CLIENT_IDENTIFIER);
when(receivedMessage.getLockToken()).thenReturn("mylockToken");
when(receivedMessage.getSequenceNumber()).thenReturn(42L);
when(receivedMessage.getContext()).thenReturn(Context.NONE);
when(managementNode.updateDisposition(any(), any(), isNull(), isNull(), isNull(),
isNull(), isNull(), isNull())).thenReturn(Mono.empty());

Expand Down Expand Up @@ -1459,9 +1460,11 @@ void receiveWithTracesAndMetrics() {

when(receivedMessage.getLockToken()).thenReturn("mylockToken");
when(receivedMessage.getSequenceNumber()).thenReturn(42L);
when(receivedMessage.getContext()).thenReturn(Context.NONE);

when(receivedMessage2.getLockToken()).thenReturn("mylockToken");
when(receivedMessage2.getSequenceNumber()).thenReturn(43L);
when(receivedMessage2.getContext()).thenReturn(Context.NONE);

when(messageSerializer.deserialize(any(Message.class), eq(ServiceBusReceivedMessage.class)))
.thenReturn(receivedMessage, receivedMessage2);
Expand Down
Loading

0 comments on commit de2835b

Please sign in to comment.