Skip to content

Commit

Permalink
ServiceBus metrics (#31283)
Browse files Browse the repository at this point in the history
* Service bus metrics
  • Loading branch information
lmolkova authored Oct 13, 2022
1 parent add7cae commit 2dd991e
Show file tree
Hide file tree
Showing 29 changed files with 1,445 additions and 643 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
<suppress checks="IllegalImport" files=".*[/\\]com[/\\]azure[/\\]monitor[/\\]opentelemetry[/\\]exporter[/\\]*"/>
<suppress checks="IllegalImport" files=".*[/\\]com[/\\]azure[/\\]identity[/\\]*"/>
<suppress checks="IllegalImport" files="com.azure.messaging.servicebus.TracingIntegrationTests.java"/>
<suppress checks="IllegalImport" files="com.azure.messaging.eventhubs.TracingIntegrationTests.java"/>

<!-- Suppress warnings for Event Processor until the usage of "Client" is discussed and resolved:
https://github.com/Azure/azure-sdk/issues/321 -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ class OpenTelemetryAttributes implements TelemetryAttributes {

private static Map<String, String> getMappings() {
Map<String, String> mappings = new HashMap<>();
// messaging mapping, attributes are defined in com.azure.core.amqp.implementation.ClientConstants
// messaging mapping, attributes are defined in com.azure.core.amqp.implementation.ClientConstants and in EventHubs, ServiceBus
// metric helpers
mappings.put("status", "otel.status_code");
mappings.put("entityName", "messaging.destination");
mappings.put("entityPath", "messaging.az.entity_path");
Expand All @@ -31,6 +32,8 @@ private static Map<String, String> getMappings() {
mappings.put("deliveryState", "amqp.delivery_state");
mappings.put("partitionId", "messaging.eventhubs.partition_id");
mappings.put("consumerGroup", "messaging.eventhubs.consumer_group");
mappings.put("subscriptionName", "messaging.servicebus.subscription_name");
mappings.put("dispositionStatus", "messaging.servicebus.disposition_status");

return Collections.unmodifiableMap(mappings);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,14 @@ public void attributeMappings() {
put("partitionId", 42);
put("status", "error");
put("consumerGroup", "$Default");
put("subscriptionName", "/foo/subscriptions/bar");
put("dispositionStatus", "abandon");
}});

assertEquals(OpenTelemetryAttributes.class, attributeCollection.getClass());
Attributes attributes = ((OpenTelemetryAttributes) attributeCollection).get();

assertEquals(11, attributes.size());
assertEquals(13, attributes.size());
assertEquals("value", attributes.get(AttributeKey.stringKey("foobar")));
assertEquals("host", attributes.get(AttributeKey.stringKey("net.peer.name")));
assertEquals("entity", attributes.get(AttributeKey.stringKey("messaging.destination")));
Expand All @@ -88,7 +90,8 @@ public void attributeMappings() {
assertEquals("no_content", attributes.get(AttributeKey.stringKey("amqp.status_code")));
assertEquals(42, attributes.get(AttributeKey.longKey("messaging.eventhubs.partition_id")));
assertEquals("error", attributes.get(AttributeKey.stringKey("otel.status_code")));
assertEquals("$Default", attributes.get(AttributeKey.stringKey("messaging.eventhubs.consumer_group")));
assertEquals("/foo/subscriptions/bar", attributes.get(AttributeKey.stringKey("messaging.servicebus.subscription_name")));
assertEquals("abandon", attributes.get(AttributeKey.stringKey("messaging.servicebus.disposition_status")));
}

@Test
Expand Down
3 changes: 2 additions & 1 deletion sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Release History

## 7.12.0-beta.1 (Unreleased)

- Enabled distributed tracing for producer and missing sender operations. ([#30508](https://github.com/Azure/azure-sdk-for-java/pull/30508))
- Enabled metrics to track number of sent messages, receiver lag, settlement calls ([#31283](https://github.com/Azure/azure-sdk-for-java/pull/31283))
### Features Added

### Breaking Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
package com.azure.messaging.servicebus;

import com.azure.core.util.Context;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiverTracer;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.BaseSubscriber;
Expand All @@ -18,27 +19,29 @@
*/
final class FluxTrace extends FluxOperator<ServiceBusMessageContext, ServiceBusMessageContext> {
static final String PROCESS_ERROR_KEY = "process-error";
private final ServiceBusReceiverTracer tracer;
private final ServiceBusReceiverInstrumentation instrumentation;

FluxTrace(Flux<? extends ServiceBusMessageContext> upstream, ServiceBusReceiverTracer tracer) {
FluxTrace(Flux<? extends ServiceBusMessageContext> upstream, ServiceBusReceiverInstrumentation instrumentation) {
super(upstream);
this.tracer = tracer;
this.instrumentation = instrumentation;
}

@Override
public void subscribe(CoreSubscriber<? super ServiceBusMessageContext> coreSubscriber) {
Objects.requireNonNull(coreSubscriber, "'coreSubscriber' cannot be null.");

source.subscribe(new TracingSubscriber(coreSubscriber, tracer));
source.subscribe(new TracingSubscriber(coreSubscriber, instrumentation));
}

private static class TracingSubscriber extends BaseSubscriber<ServiceBusMessageContext> {

private final CoreSubscriber<? super ServiceBusMessageContext> downstream;
private final ServiceBusReceiverTracer tracer;
TracingSubscriber(CoreSubscriber<? super ServiceBusMessageContext> downstream, ServiceBusReceiverTracer tracer) {
private final ServiceBusReceiverInstrumentation instrumentation;
private final ServiceBusTracer tracer;
TracingSubscriber(CoreSubscriber<? super ServiceBusMessageContext> downstream, ServiceBusReceiverInstrumentation instrumentation) {
this.downstream = downstream;
this.tracer = tracer;
this.instrumentation = instrumentation;
this.tracer = instrumentation.getTracer();
}

@Override
Expand All @@ -53,13 +56,8 @@ protected void hookOnSubscribe(Subscription subscription) {

@Override
protected void hookOnNext(ServiceBusMessageContext message) {
if (tracer == null || tracer.isSync()) {
downstream.onNext(message);
return;
}

Throwable exception = null;
Context span = tracer.startProcessSpan("ServiceBus.process", message.getMessage(), Context.NONE);
Context span = instrumentation.instrumentProcess("ServiceBus.process", message.getMessage(), Context.NONE);
AutoCloseable scope = tracer.makeSpanCurrent(span);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,17 @@
import com.azure.core.util.Configuration;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.metrics.Meter;
import com.azure.core.util.metrics.MeterProvider;
import com.azure.messaging.servicebus.implementation.MessageUtils;
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
import com.azure.messaging.servicebus.implementation.ServiceBusAmqpConnection;
import com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor;
import com.azure.messaging.servicebus.implementation.ServiceBusConstants;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiverTracer;
import com.azure.messaging.servicebus.implementation.ServiceBusReactorAmqpConnection;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation;
import com.azure.messaging.servicebus.implementation.ServiceBusSharedKeyCredential;
import com.azure.messaging.servicebus.implementation.ServiceBusTracer;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import com.azure.messaging.servicebus.implementation.models.ServiceBusProcessorClientOptions;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import com.azure.messaging.servicebus.models.SubQueue;
Expand Down Expand Up @@ -205,13 +207,14 @@ public final class ServiceBusClientBuilder implements
private static final String NAME_KEY = "name";
private static final String VERSION_KEY = "version";
private static final String UNKNOWN = "UNKNOWN";
private static final String LIBRARY_NAME;
private static final String LIBRARY_VERSION;
private static final Pattern HOST_PORT_PATTERN = Pattern.compile("^[^:]+:\\d+");
private static final Duration MAX_LOCK_RENEW_DEFAULT_DURATION = Duration.ofMinutes(5);
private static final ClientLogger LOGGER = new ClientLogger(ServiceBusClientBuilder.class);

private final Object connectionLock = new Object();
private final MessageSerializer messageSerializer = new ServiceBusMessageSerializer();

private ClientOptions clientOptions;
private Configuration configuration;
private ServiceBusConnectionProcessor sharedConnection;
Expand All @@ -231,6 +234,12 @@ public final class ServiceBusClientBuilder implements
*/
private final AtomicInteger openClients = new AtomicInteger();

static {
final Map<String, String> properties = CoreUtils.getProperties(SERVICE_BUS_PROPERTIES_FILE);
LIBRARY_NAME = properties.getOrDefault(NAME_KEY, UNKNOWN);
LIBRARY_VERSION = properties.getOrDefault(VERSION_KEY, UNKNOWN);
}

/**
* Creates a new instance with the default transport {@link AmqpTransportType#AMQP}.
*/
Expand Down Expand Up @@ -738,18 +747,15 @@ private ConnectionOptions getConnectionOptions() {
: SslDomain.VerifyMode.VERIFY_PEER_NAME;

final ClientOptions options = clientOptions != null ? clientOptions : new ClientOptions();
final Map<String, String> properties = CoreUtils.getProperties(SERVICE_BUS_PROPERTIES_FILE);
final String product = properties.getOrDefault(NAME_KEY, UNKNOWN);
final String clientVersion = properties.getOrDefault(VERSION_KEY, UNKNOWN);

if (customEndpointAddress == null) {
return new ConnectionOptions(getAndValidateFullyQualifiedNamespace(), credentials, authorizationType,
ServiceBusConstants.AZURE_ACTIVE_DIRECTORY_SCOPE, transport, retryOptions, proxyOptions, scheduler,
options, verificationMode, product, clientVersion);
options, verificationMode, LIBRARY_NAME, LIBRARY_VERSION);
} else {
return new ConnectionOptions(getAndValidateFullyQualifiedNamespace(), credentials, authorizationType,
ServiceBusConstants.AZURE_ACTIVE_DIRECTORY_SCOPE, transport, retryOptions, proxyOptions, scheduler,
options, verificationMode, product, clientVersion, customEndpointAddress.getHost(),
options, verificationMode, LIBRARY_NAME, LIBRARY_VERSION, customEndpointAddress.getHost(),
customEndpointAddress.getPort());
}
}
Expand Down Expand Up @@ -961,9 +967,11 @@ public ServiceBusSenderAsyncClient buildAsyncClient() {
clientIdentifier = UUID.randomUUID().toString();
}

final ServiceBusSenderTracer tracer = new ServiceBusSenderTracer(ServiceBusTracer.getDefaultTracer(), connectionProcessor.getFullyQualifiedNamespace(), entityName);
final ServiceBusSenderInstrumentation instrumentation = new ServiceBusSenderInstrumentation(ServiceBusTracer.getDefaultTracer(),
createMeter(), connectionProcessor.getFullyQualifiedNamespace(), entityName);

return new ServiceBusSenderAsyncClient(entityName, entityType, connectionProcessor, retryOptions,
tracer, messageSerializer, ServiceBusClientBuilder.this::onClientClose, null, clientIdentifier);
instrumentation, messageSerializer, ServiceBusClientBuilder.this::onClientClose, null, clientIdentifier);
}

/**
Expand Down Expand Up @@ -1443,10 +1451,13 @@ ServiceBusReceiverAsyncClient buildAsyncClientForProcessor() {

final ServiceBusSessionManager sessionManager = new ServiceBusSessionManager(entityPath, entityType,
connectionProcessor, messageSerializer, receiverOptions, clientIdentifier);
final ServiceBusReceiverTracer tracer = new ServiceBusReceiverTracer(ServiceBusTracer.getDefaultTracer(), connectionProcessor.getFullyQualifiedNamespace(), entityPath, false);

final ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation(
ServiceBusTracer.getDefaultTracer(), createMeter(), connectionProcessor.getFullyQualifiedNamespace(),
entityPath, subscriptionName, false);
return new ServiceBusReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), entityPath,
entityType, receiverOptions, connectionProcessor, ServiceBusConstants.OPERATION_TIMEOUT,
tracer, messageSerializer, ServiceBusClientBuilder.this::onClientClose, sessionManager);
instrumentation, messageSerializer, ServiceBusClientBuilder.this::onClientClose, sessionManager);
}

/**
Expand Down Expand Up @@ -1518,10 +1529,10 @@ private ServiceBusSessionReceiverAsyncClient buildAsyncClient(boolean isAutoComp
clientIdentifier = UUID.randomUUID().toString();
}

final ServiceBusReceiverTracer tracer = new ServiceBusReceiverTracer(ServiceBusTracer.getDefaultTracer(),
connectionProcessor.getFullyQualifiedNamespace(), entityPath, syncConsumer);
final ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation(ServiceBusTracer.getDefaultTracer(),
createMeter(), connectionProcessor.getFullyQualifiedNamespace(), entityPath, subscriptionName, syncConsumer);
return new ServiceBusSessionReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(),
entityPath, entityType, receiverOptions, connectionProcessor, tracer, messageSerializer,
entityPath, entityType, receiverOptions, connectionProcessor, instrumentation, messageSerializer,
ServiceBusClientBuilder.this::onClientClose, clientIdentifier);
}
}
Expand Down Expand Up @@ -1961,11 +1972,11 @@ ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed, bo
clientIdentifier = UUID.randomUUID().toString();
}

final ServiceBusReceiverTracer tracer = new ServiceBusReceiverTracer(ServiceBusTracer.getDefaultTracer(),
connectionProcessor.getFullyQualifiedNamespace(), entityPath, syncConsumer);
final ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation(ServiceBusTracer.getDefaultTracer(),
createMeter(), connectionProcessor.getFullyQualifiedNamespace(), entityPath, subscriptionName, syncConsumer);
return new ServiceBusReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), entityPath,
entityType, receiverOptions, connectionProcessor, ServiceBusConstants.OPERATION_TIMEOUT,
tracer, messageSerializer, ServiceBusClientBuilder.this::onClientClose, clientIdentifier);
instrumentation, messageSerializer, ServiceBusClientBuilder.this::onClientClose, clientIdentifier);
}
}

Expand All @@ -1982,4 +1993,9 @@ private void validateAndThrow(Duration maxLockRenewalDuration) {
"'maxLockRenewalDuration' cannot be negative."));
}
}

private Meter createMeter() {
return MeterProvider.getDefaultProvider().createMeter(LIBRARY_NAME, LIBRARY_VERSION,
clientOptions == null ? null : clientOptions.getMetricsOptions());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.azure.core.amqp.implementation.ErrorContextProvider;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import org.apache.qpid.proton.message.Message;

import java.nio.BufferOverflowException;
Expand All @@ -29,21 +30,17 @@ public final class ServiceBusMessageBatch {
private final List<ServiceBusMessage> serviceBusMessageList;
private final byte[] eventBytes;
private int sizeInBytes;
private final ServiceBusSenderTracer tracer;
private final String entityPath;
private final String hostname;
private final ServiceBusTracer tracer;

ServiceBusMessageBatch(int maxMessageSize, ErrorContextProvider contextProvider, ServiceBusSenderTracer tracer,
MessageSerializer serializer, String entityPath, String hostname) {
ServiceBusMessageBatch(int maxMessageSize, ErrorContextProvider contextProvider, ServiceBusTracer tracer,
MessageSerializer serializer) {
this.maxMessageSize = maxMessageSize;
this.contextProvider = contextProvider;
this.serializer = serializer;
this.serviceBusMessageList = new ArrayList<>();
this.sizeInBytes = (maxMessageSize / 65536) * 1024; // reserve 1KB for every 64KB
this.eventBytes = new byte[maxMessageSize];
this.tracer = tracer;
this.entityPath = entityPath;
this.hostname = hostname;
}

/**
Expand Down Expand Up @@ -92,7 +89,7 @@ public boolean tryAddMessage(final ServiceBusMessage serviceBusMessage) {
if (serviceBusMessage == null) {
throw LOGGER.logExceptionAsWarning(new NullPointerException("'serviceBusMessage' cannot be null"));
}
tracer.createMessageSpan(serviceBusMessage);
tracer.reportMessageSpan(serviceBusMessage, serviceBusMessage.getContext());

final int size;
try {
Expand Down
Loading

0 comments on commit 2dd991e

Please sign in to comment.