diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/AzureMonitorTraceExporter.java b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/AzureMonitorTraceExporter.java index b9486469453b5..d5663394ceb06 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/AzureMonitorTraceExporter.java +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/AzureMonitorTraceExporter.java @@ -44,6 +44,7 @@ import java.util.regex.Pattern; import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; /** * This class is an implementation of OpenTelemetry {@link SpanExporter} that allows different tracing services to @@ -57,14 +58,14 @@ public final class AzureMonitorTraceExporter implements SpanExporter { private static final Set STANDARD_ATTRIBUTE_PREFIXES; - // this is only used for distributed trace correlation across different ikeys - // and will be obsolete soon (as this will be handled by backend indexing going forward) - private static final AttributeKey AI_SPAN_SOURCE_APP_ID_KEY = AttributeKey.stringKey("applicationinsights.internal.source_app_id"); - private static final AttributeKey AI_SPAN_TARGET_APP_ID_KEY = AttributeKey.stringKey("applicationinsights.internal.target_app_id"); - - // this is only used by the 2.x web interop bridge - // for ThreadContext.getRequestTelemetryContext().getRequestTelemetry().setSource() - private static final AttributeKey AI_SPAN_SOURCE_KEY = AttributeKey.stringKey("applicationinsights.internal.source"); + private static final AttributeKey AZURE_NAMESPACE = + AttributeKey.stringKey("az.namespace"); + private static final AttributeKey AZURE_SDK_PEER_ADDRESS = + AttributeKey.stringKey("peer.address"); + private static final AttributeKey AZURE_SDK_MESSAGE_BUS_DESTINATION = + AttributeKey.stringKey("message_bus.destination"); + private static final AttributeKey AZURE_SDK_ENQUEUED_TIME = + AttributeKey.longKey("x-opt-enqueued-time"); static { Set dbSystems = new HashSet<>(); @@ -216,12 +217,10 @@ private void exportRemoteDependency(SpanData span, boolean inProc, addLinks(remoteDependencyData.getProperties(), span.getLinks()); remoteDependencyData.setName(span.getName()); - Attributes attributes = span.getAttributes(); - if (inProc) { remoteDependencyData.setType("InProc"); } else { - applySemanticConventions(attributes, remoteDependencyData, span.getKind()); + applySemanticConventions(span, remoteDependencyData); } remoteDependencyData.setId(span.getSpanId()); @@ -237,7 +236,7 @@ private void exportRemoteDependency(SpanData span, boolean inProc, remoteDependencyData.setSuccess(span.getStatus().getStatusCode() != StatusCode.ERROR); - setExtraAttributes(telemetryItem, remoteDependencyData.getProperties(), attributes); + setExtraAttributes(telemetryItem, remoteDependencyData.getProperties(), span.getAttributes()); // sampling will not be supported in this exporter Double samplingPercentage = 100.0; @@ -246,7 +245,8 @@ private void exportRemoteDependency(SpanData span, boolean inProc, exportEvents(span, samplingPercentage, telemetryItems); } - private void applySemanticConventions(Attributes attributes, RemoteDependencyData remoteDependencyData, SpanKind spanKind) { + private void applySemanticConventions(SpanData span, RemoteDependencyData remoteDependencyData) { + Attributes attributes = span.getAttributes(); String httpMethod = attributes.get(AttributeKey.stringKey("http.method")); if (httpMethod != null) { applyHttpClientSpan(attributes, remoteDependencyData); @@ -262,9 +262,18 @@ private void applySemanticConventions(Attributes attributes, RemoteDependencyDat applyDatabaseClientSpan(attributes, remoteDependencyData, dbSystem); return; } + String azureNamespace = attributes.get(AZURE_NAMESPACE); + if (azureNamespace != null && azureNamespace.equals("Microsoft.EventHub")) { + applyEventHubsSpan(attributes, remoteDependencyData); + return; + } + if (azureNamespace != null && azureNamespace.equals("Microsoft.ServiceBus")) { + applyServiceBusSpan(attributes, remoteDependencyData); + return; + } String messagingSystem = attributes.get(AttributeKey.stringKey("messaging.system")); if (messagingSystem != null) { - applyMessagingClientSpan(attributes, remoteDependencyData, messagingSystem, spanKind); + applyMessagingClientSpan(attributes, remoteDependencyData, messagingSystem, span.getKind()); return; } } @@ -308,18 +317,8 @@ private void applyHttpClientSpan(Attributes attributes, RemoteDependencyData tel target = "Http"; } - String targetAppId = attributes.get(AI_SPAN_TARGET_APP_ID_KEY); - - if (targetAppId == null) { - telemetry.setType("Http"); - telemetry.setTarget(target); - } else { - // using "Http (tracked component)" is important for dependencies that go cross-component (have an appId in their target field) - // if you use just HTTP, Breeze will remove appid from the target - // TODO (trask) remove this once confirmed by zakima that it is no longer needed - telemetry.setType("Http (tracked component)"); - telemetry.setTarget(target + " | " + targetAppId); - } + telemetry.setType("Http"); + telemetry.setTarget(target); Long httpStatusCode = attributes.get(AttributeKey.longKey("http.status_code")); if (httpStatusCode != null) { @@ -404,6 +403,24 @@ private void applyMessagingClientSpan(Attributes attributes, RemoteDependencyDat } } + // special case needed until Azure SDK moves to OTel semantic conventions + private static void applyEventHubsSpan(Attributes attributes, RemoteDependencyData telemetry) { + telemetry.setType("Microsoft.EventHub"); + telemetry.setTarget(getAzureSdkTargetSource(attributes)); + } + + // special case needed until Azure SDK moves to OTel semantic conventions + private static void applyServiceBusSpan(Attributes attributes, RemoteDependencyData telemetry) { + telemetry.setType("AZURE SERVICE BUS"); + telemetry.setTarget(getAzureSdkTargetSource(attributes)); + } + + private static String getAzureSdkTargetSource(Attributes attributes) { + String peerAddress = attributes.get(AZURE_SDK_PEER_ADDRESS); + String destination = attributes.get(AZURE_SDK_MESSAGE_BUS_DESTINATION); + return peerAddress + "/" + destination; + } + private static int getDefaultPortForDbSystem(String dbSystem) { switch (dbSystem) { // TODO (trask) add these default ports to the OpenTelemetry database semantic conventions spec @@ -435,32 +452,26 @@ private void exportRequest(SpanData span, List telemetryItems) { monitorBase.setBaseType("RequestData"); monitorBase.setBaseData(requestData); - String source = null; Attributes attributes = span.getAttributes(); - String sourceAppId = attributes.get(AI_SPAN_SOURCE_APP_ID_KEY); - - if (sourceAppId != null) { - source = sourceAppId; - } - if (source == null) { - String messagingSystem = attributes.get(AttributeKey.stringKey("messaging.system")); - if (messagingSystem != null) { - // TODO (trask) should this pass default port for messaging.system? - source = nullAwareConcat(getTargetFromPeerAttributes(attributes, 0), - attributes.get(AttributeKey.stringKey("messaging.destination")), "/"); - if (source == null) { - source = messagingSystem; + requestData.setSource(getSource(attributes)); + + if (isAzureQueue(attributes)) { + // TODO(trask): for batch consumer, enqueuedTime should be the average of this attribute + // across all links + Long enqueuedTime = attributes.get(AZURE_SDK_ENQUEUED_TIME); + if (enqueuedTime != null) { + long timeSinceEnqueued = + NANOSECONDS.toMillis(span.getStartEpochNanos()) - SECONDS.toMillis(enqueuedTime); + if (timeSinceEnqueued < 0) { + timeSinceEnqueued = 0; + } + if (requestData.getMeasurements() == null) { + requestData.setMeasurements(new HashMap<>()); } + requestData.getMeasurements().put("timeSinceEnqueued", (double) timeSinceEnqueued); } } - if (source == null) { - // this is only used by the 2.x web interop bridge - // for ThreadContext.getRequestTelemetryContext().getRequestTelemetry().setSource() - - source = attributes.get(AI_SPAN_SOURCE_KEY); - } - requestData.setSource(source); addLinks(requestData.getProperties(), span.getLinks()); Long httpStatusCode = attributes.get(AttributeKey.longKey("http.status_code")); @@ -518,6 +529,35 @@ private void exportRequest(SpanData span, List telemetryItems) { exportEvents(span, samplingPercentage, telemetryItems); } + private static String getSource(Attributes attributes) { + if (isAzureQueue(attributes)) { + return getAzureSdkTargetSource(attributes); + } + String messagingSystem = attributes.get(AttributeKey.stringKey("messaging.system")); + if (messagingSystem != null) { + // TODO (trask) AI mapping: should this pass default port for messaging.system? + String source = + nullAwareConcat( + getTargetFromPeerAttributes(attributes, 0), + attributes.get(AttributeKey.stringKey("messaging.destination")), + "/"); + if (source != null) { + return source; + } + // fallback + return messagingSystem; + } + return null; + } + + private static boolean isAzureQueue(Attributes attributes) { + String azureNamespace = attributes.get(AZURE_NAMESPACE); + if (azureNamespace == null) { + return false; + } + return azureNamespace.equals("Microsoft.EventHub") || azureNamespace.equals("Microsoft.ServiceBus"); + } + private static String nullAwareConcat(String str1, String str2, String separator) { if (str1 == null) { return str2; @@ -666,6 +706,11 @@ private void setExtraAttributes(TelemetryItem telemetry, Map pro if (stringKey.startsWith("applicationinsights.internal.")) { return; } + // TODO (trask) use az.namespace for something? + if (stringKey.equals(AZURE_SDK_MESSAGE_BUS_DESTINATION.getKey()) + || stringKey.equals("az.namespace")) { + return; + } // special case mappings if (key.getKey().equals("enduser.id") && value instanceof String) { telemetry.getTags().put(ContextTagKeys.AI_USER_ID.toString(), (String) value);