Skip to content

Commit

Permalink
incorporating improvements from Azure#33559
Browse files Browse the repository at this point in the history
  • Loading branch information
lmolkova committed Mar 2, 2023
1 parent bc3aa6f commit 8043c4a
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

@Isolated
@Isolated("Sets global TracingProvider.")
@Execution(ExecutionMode.SAME_THREAD)
public class TracingIntegrationTests extends IntegrationTestBase {
private static final byte[] CONTENTS_BYTES = "Some-contents".getBytes(StandardCharsets.UTF_8);
Expand All @@ -75,6 +75,14 @@ public TracingIntegrationTests() {
@Override
protected void beforeTest() {
spanProcessor = new TestSpanProcessor(getFullyQualifiedDomainName(), getEventHubName());

// For the first integration test run, the tracing provider may be already set because we import
if (GlobalOpenTelemetry.get() != null) {
logger.info("Global telemetry was not null. Manually resetting.");

GlobalOpenTelemetry.resetForTest();
}

OpenTelemetrySdk.builder()
.setTracerProvider(
SdkTracerProvider.builder()
Expand Down Expand Up @@ -263,7 +271,7 @@ public void sendAndReceiveParallel() throws InterruptedException {
@Test
public void sendBuffered() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
EventHubBufferedProducerAsyncClient bufferedProducer = new EventHubBufferedProducerClientBuilder()
EventHubBufferedProducerAsyncClient bufferedProducer = new EventHubBufferedProducerClientBuilder()
.connectionString(getConnectionString())
.onSendBatchFailed(failed -> fail("Exception occurred while sending messages." + failed.getThrowable()))
.onSendBatchSucceeded(succeeded -> latch.countDown())
Expand All @@ -278,15 +286,15 @@ public void sendBuffered() throws InterruptedException {
bufferedProducer
.getPartitionIds().take(1)
.map(partitionId -> new SendOptions().setPartitionId(partitionId))
.flatMap(sendOpts ->
bufferedProducer.enqueueEvent(event1, sendOpts)
.flatMap(sendOpts ->
bufferedProducer.enqueueEvent(event1, sendOpts)
.then(bufferedProducer.enqueueEvent(event2, sendOpts))))
.expectNextCount(1)
.verifyComplete();

StepVerifier.create(consumer
.receive()
.take(2))
.receive()
.take(2))
.expectNextCount(2)
.verifyComplete();

Expand All @@ -313,7 +321,7 @@ public void syncReceive() {
b.tryAdd(new EventData(CONTENTS_BYTES));
return b;
})
.flatMap(b -> producer.send(b)))
.flatMap(b -> producer.send(b)))
.verifyComplete();

List<PartitionEvent> receivedMessages = consumerSync.receiveFromPartition(PARTITION_ID, 2, EventPosition.fromEnqueuedTime(testStartTime), Duration.ofSeconds(10))
Expand Down Expand Up @@ -627,7 +635,7 @@ static class TestSpanProcessor implements SpanProcessor {
this.entityName = entityName;
}
public List<ReadableSpan> getEndedSpans() {
return spans.stream().collect(toList());
return new ArrayList<>(spans);
}

@Override
Expand All @@ -641,6 +649,9 @@ public boolean isStartRequired() {

@Override
public void onEnd(ReadableSpan readableSpan) {
// Various attribute keys can be found in:
// sdk/core/azure-core-metrics-opentelemetry/src/main/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributes.java
// sdk/core/azure-core-tracing-opentelemetry/src/main/java/com/azure/core/tracing/opentelemetry/OpenTelemetryUtils.java
assertEquals("Microsoft.EventHub", readableSpan.getAttribute(AttributeKey.stringKey("az.namespace")));
assertEquals(entityName, readableSpan.getAttribute(AttributeKey.stringKey("messaging.destination.name")));
assertEquals(namespace, readableSpan.getAttribute(AttributeKey.stringKey("net.peer.name")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.DISPOSITION_STATUS_KEY;

/**
* Contains methods to report servicebus metrics.
* Contains methods to report Service Bus metrics.
*/
public class ServiceBusMeter {
private static final ClientLogger LOGGER = new ClientLogger(ServiceBusMeter.class);
private static final String GENERIC_STATUS_KEY = "status";
private static final int DISPOSITION_STATUSES_COUNT = DispositionStatus.values().length;
private static final AutoCloseable NOOP_CLOSEABLE = () -> {
};
private final Meter meter;
private final boolean isEnabled;
private final AtomicReference<CompositeSubscription> lastSeqNoSubscription = new AtomicReference<>(null);

private TelemetryAttributes sendAttributesSuccess;
private TelemetryAttributes sendAttributesFailure;
Expand All @@ -47,7 +47,6 @@ public class ServiceBusMeter {
private TelemetryAttributes[] settleSuccessAttributes;
private TelemetryAttributes[] settleFailureAttributes;

private AtomicReference<CompositeSubscription> lastSeqNoSubscription = new AtomicReference<>(null);
private LongCounter sentMessagesCounter;
private DoubleHistogram consumerLag;
private DoubleHistogram settleMessageDuration;
Expand Down Expand Up @@ -94,7 +93,7 @@ public ServiceBusMeter(Meter meter, String namespace, String entityPath, String
this.sentMessagesCounter = meter.createLongCounter("messaging.servicebus.messages.sent", "Number of sent messages", "messages");
this.settleMessageDuration = meter.createDoubleHistogram("messaging.servicebus.settlement.request.duration", "Duration of settlement call.", "ms");
this.consumerLag = meter.createDoubleHistogram("messaging.servicebus.receiver.lag", "Difference between local time when event was received and the local time it was enqueued on broker.", "sec");
this.settledSequenceNumber = this.meter.createLongGauge("messaging.servicebus.settlement.sequence_number", "Last settled message sequence number", "seqNo");
this.settledSequenceNumber = meter.createLongGauge("messaging.servicebus.settlement.sequence_number", "Last settled message sequence number", "seqNo");
}
}

Expand Down

0 comments on commit 8043c4a

Please sign in to comment.