Skip to content

Commit

Permalink
incorporating improvements from Azure#33559
Browse files Browse the repository at this point in the history
  • Loading branch information
lmolkova committed Feb 22, 2023
1 parent 8603cb5 commit 94f618a
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,20 +130,22 @@ static void addAttribute(Span span, String key, Object value, OpenTelemetrySchem
}

/**
* Parses an OpenTelemetry Status from AMQP Error Condition.
* Parses an OpenTelemetry status from error description.
*
* @param span the span to set the status for.
* @param statusMessage description for this error condition.
* @param statusMessage description for this error condition. Any non-null {@code statusMessage} indicates an error.
* Pass empty string or {@code "error"} string to create status without description.
* @param throwable the error occurred during response transmission (optional).
* @return the corresponding OpenTelemetry {@link Span}.
*/
public static Span setError(Span span, String statusMessage, Throwable throwable) {
static Span setError(Span span, String statusMessage, Throwable throwable) {
if (throwable != null) {
span.recordException(throwable);
return span.setStatus(StatusCode.ERROR, statusMessage);
}

if (statusMessage != null) {
// "success" is needed for back compat with older Event Hubs and Service Bus, don't use it.
if (statusMessage != null && !"success".equals(statusMessage)) {
if ("error".equals(statusMessage)) {
return span.setStatus(StatusCode.ERROR);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ default void end(int responseCode, Throwable error, Context context) {
}

/**
* Completes the current tracing span for AMQP calls.
* Completes span on the context.
*
* <p><strong>Code samples</strong></p>
*
Expand Down Expand Up @@ -334,8 +334,10 @@ default void end(int responseCode, Throwable error, Context context) {
* </pre>
* <!-- end com.azure.core.util.tracing.end#exception -->
*
* @param errorMessage The error message that occurred during the call, or {@code null} if no error
* occurred.
* @param errorMessage The error message that occurred during the call, or {@code null} if no error.
* occurred. {@code errorMessage} matching {@code "error"} indicates an error without description.
* Any other non-null string indicates an error with description provided in {@code errorMessage}.
*
* @param throwable {@link Throwable} that happened during the span or {@code null} if no exception occurred.
* @param context Additional metadata that is passed through the call stack.
* @throws NullPointerException if {@code context} is {@code null}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

@Isolated
@Isolated("Sets global TracingProvider.")
@Execution(ExecutionMode.SAME_THREAD)
public class TracingIntegrationTests extends IntegrationTestBase {
private static final byte[] CONTENTS_BYTES = "Some-contents".getBytes(StandardCharsets.UTF_8);
Expand All @@ -75,6 +75,14 @@ public TracingIntegrationTests() {
@Override
protected void beforeTest() {
spanProcessor = new TestSpanProcessor(getFullyQualifiedDomainName(), getEventHubName());

// For the first integration test run, the tracing provider may be already set because we import
if (GlobalOpenTelemetry.get() != null) {
logger.info("Global telemetry was not null. Manually resetting.");

GlobalOpenTelemetry.resetForTest();
}

OpenTelemetrySdk.builder()
.setTracerProvider(
SdkTracerProvider.builder()
Expand Down Expand Up @@ -263,7 +271,7 @@ public void sendAndReceiveParallel() throws InterruptedException {
@Test
public void sendBuffered() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
EventHubBufferedProducerAsyncClient bufferedProducer = new EventHubBufferedProducerClientBuilder()
EventHubBufferedProducerAsyncClient bufferedProducer = new EventHubBufferedProducerClientBuilder()
.connectionString(getConnectionString())
.onSendBatchFailed(failed -> fail("Exception occurred while sending messages." + failed.getThrowable()))
.onSendBatchSucceeded(succeeded -> latch.countDown())
Expand All @@ -278,15 +286,15 @@ public void sendBuffered() throws InterruptedException {
bufferedProducer
.getPartitionIds().take(1)
.map(partitionId -> new SendOptions().setPartitionId(partitionId))
.flatMap(sendOpts ->
bufferedProducer.enqueueEvent(event1, sendOpts)
.flatMap(sendOpts ->
bufferedProducer.enqueueEvent(event1, sendOpts)
.then(bufferedProducer.enqueueEvent(event2, sendOpts))))
.expectNextCount(1)
.verifyComplete();

StepVerifier.create(consumer
.receive()
.take(2))
.receive()
.take(2))
.expectNextCount(2)
.verifyComplete();

Expand All @@ -313,7 +321,7 @@ public void syncReceive() {
b.tryAdd(new EventData(CONTENTS_BYTES));
return b;
})
.flatMap(b -> producer.send(b)))
.flatMap(b -> producer.send(b)))
.verifyComplete();

List<PartitionEvent> receivedMessages = consumerSync.receiveFromPartition(PARTITION_ID, 2, EventPosition.fromEnqueuedTime(testStartTime), Duration.ofSeconds(10))
Expand Down Expand Up @@ -627,7 +635,7 @@ static class TestSpanProcessor implements SpanProcessor {
this.entityName = entityName;
}
public List<ReadableSpan> getEndedSpans() {
return spans.stream().collect(toList());
return new ArrayList<>(spans);
}

@Override
Expand All @@ -641,6 +649,9 @@ public boolean isStartRequired() {

@Override
public void onEnd(ReadableSpan readableSpan) {
// Various attribute keys can be found in:
// sdk/core/azure-core-metrics-opentelemetry/src/main/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributes.java
// sdk/core/azure-core-tracing-opentelemetry/src/main/java/com/azure/core/tracing/opentelemetry/OpenTelemetryUtils.java
assertEquals("Microsoft.EventHub", readableSpan.getAttribute(AttributeKey.stringKey("az.namespace")));
assertEquals(entityName, readableSpan.getAttribute(AttributeKey.stringKey("messaging.destination.name")));
assertEquals(namespace, readableSpan.getAttribute(AttributeKey.stringKey("net.peer.name")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.DISPOSITION_STATUS_KEY;

/**
* Contains methods to report servicebus metrics.
* Contains methods to report Service Bus metrics.
*/
public class ServiceBusMeter {
private static final ClientLogger LOGGER = new ClientLogger(ServiceBusMeter.class);
private static final String GENERIC_STATUS_KEY = "status";
private static final int DISPOSITION_STATUSES_COUNT = DispositionStatus.values().length;
private static final AutoCloseable NOOP_CLOSEABLE = () -> {
};
private final Meter meter;
private final boolean isEnabled;
private final AtomicReference<CompositeSubscription> lastSeqNoSubscription = new AtomicReference<>(null);

private TelemetryAttributes sendAttributesSuccess;
private TelemetryAttributes sendAttributesFailure;
Expand All @@ -47,7 +47,6 @@ public class ServiceBusMeter {
private TelemetryAttributes[] settleSuccessAttributes;
private TelemetryAttributes[] settleFailureAttributes;

private AtomicReference<CompositeSubscription> lastSeqNoSubscription = new AtomicReference<>(null);
private LongCounter sentMessagesCounter;
private DoubleHistogram consumerLag;
private DoubleHistogram settleMessageDuration;
Expand Down Expand Up @@ -94,7 +93,7 @@ public ServiceBusMeter(Meter meter, String namespace, String entityPath, String
this.sentMessagesCounter = meter.createLongCounter("messaging.servicebus.messages.sent", "Number of sent messages", "messages");
this.settleMessageDuration = meter.createDoubleHistogram("messaging.servicebus.settlement.request.duration", "Duration of settlement call.", "ms");
this.consumerLag = meter.createDoubleHistogram("messaging.servicebus.receiver.lag", "Difference between local time when event was received and the local time it was enqueued on broker.", "sec");
this.settledSequenceNumber = this.meter.createLongGauge("messaging.servicebus.settlement.sequence_number", "Last settled message sequence number", "seqNo");
this.settledSequenceNumber = meter.createLongGauge("messaging.servicebus.settlement.sequence_number", "Last settled message sequence number", "seqNo");
}
}

Expand Down

0 comments on commit 94f618a

Please sign in to comment.