Skip to content

Commit

Permalink
Improve AzMon Trace Exporter support for EventHubs (#21668)
Browse files Browse the repository at this point in the history
* Improve AzMon Trace Exporter support for EventHubs

* And Service Bus too!

* Add source and time enqueued
  • Loading branch information
trask authored Aug 12, 2021
1 parent 0c5173a commit ee858a3
Showing 1 changed file with 92 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.regex.Pattern;

import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

/**
* This class is an implementation of OpenTelemetry {@link SpanExporter} that allows different tracing services to
Expand All @@ -57,14 +58,14 @@ public final class AzureMonitorTraceExporter implements SpanExporter {

private static final Set<String> STANDARD_ATTRIBUTE_PREFIXES;

// this is only used for distributed trace correlation across different ikeys
// and will be obsolete soon (as this will be handled by backend indexing going forward)
private static final AttributeKey<String> AI_SPAN_SOURCE_APP_ID_KEY = AttributeKey.stringKey("applicationinsights.internal.source_app_id");
private static final AttributeKey<String> AI_SPAN_TARGET_APP_ID_KEY = AttributeKey.stringKey("applicationinsights.internal.target_app_id");

// this is only used by the 2.x web interop bridge
// for ThreadContext.getRequestTelemetryContext().getRequestTelemetry().setSource()
private static final AttributeKey<String> AI_SPAN_SOURCE_KEY = AttributeKey.stringKey("applicationinsights.internal.source");
private static final AttributeKey<String> AZURE_NAMESPACE =
AttributeKey.stringKey("az.namespace");
private static final AttributeKey<String> AZURE_SDK_PEER_ADDRESS =
AttributeKey.stringKey("peer.address");
private static final AttributeKey<String> AZURE_SDK_MESSAGE_BUS_DESTINATION =
AttributeKey.stringKey("message_bus.destination");
private static final AttributeKey<Long> AZURE_SDK_ENQUEUED_TIME =
AttributeKey.longKey("x-opt-enqueued-time");

static {
Set<String> dbSystems = new HashSet<>();
Expand Down Expand Up @@ -216,12 +217,10 @@ private void exportRemoteDependency(SpanData span, boolean inProc,
addLinks(remoteDependencyData.getProperties(), span.getLinks());
remoteDependencyData.setName(span.getName());

Attributes attributes = span.getAttributes();

if (inProc) {
remoteDependencyData.setType("InProc");
} else {
applySemanticConventions(attributes, remoteDependencyData, span.getKind());
applySemanticConventions(span, remoteDependencyData);
}

remoteDependencyData.setId(span.getSpanId());
Expand All @@ -237,7 +236,7 @@ private void exportRemoteDependency(SpanData span, boolean inProc,

remoteDependencyData.setSuccess(span.getStatus().getStatusCode() != StatusCode.ERROR);

setExtraAttributes(telemetryItem, remoteDependencyData.getProperties(), attributes);
setExtraAttributes(telemetryItem, remoteDependencyData.getProperties(), span.getAttributes());

// sampling will not be supported in this exporter
Double samplingPercentage = 100.0;
Expand All @@ -246,7 +245,8 @@ private void exportRemoteDependency(SpanData span, boolean inProc,
exportEvents(span, samplingPercentage, telemetryItems);
}

private void applySemanticConventions(Attributes attributes, RemoteDependencyData remoteDependencyData, SpanKind spanKind) {
private void applySemanticConventions(SpanData span, RemoteDependencyData remoteDependencyData) {
Attributes attributes = span.getAttributes();
String httpMethod = attributes.get(AttributeKey.stringKey("http.method"));
if (httpMethod != null) {
applyHttpClientSpan(attributes, remoteDependencyData);
Expand All @@ -262,9 +262,18 @@ private void applySemanticConventions(Attributes attributes, RemoteDependencyDat
applyDatabaseClientSpan(attributes, remoteDependencyData, dbSystem);
return;
}
String azureNamespace = attributes.get(AZURE_NAMESPACE);
if (azureNamespace != null && azureNamespace.equals("Microsoft.EventHub")) {
applyEventHubsSpan(attributes, remoteDependencyData);
return;
}
if (azureNamespace != null && azureNamespace.equals("Microsoft.ServiceBus")) {
applyServiceBusSpan(attributes, remoteDependencyData);
return;
}
String messagingSystem = attributes.get(AttributeKey.stringKey("messaging.system"));
if (messagingSystem != null) {
applyMessagingClientSpan(attributes, remoteDependencyData, messagingSystem, spanKind);
applyMessagingClientSpan(attributes, remoteDependencyData, messagingSystem, span.getKind());
return;
}
}
Expand Down Expand Up @@ -308,18 +317,8 @@ private void applyHttpClientSpan(Attributes attributes, RemoteDependencyData tel
target = "Http";
}

String targetAppId = attributes.get(AI_SPAN_TARGET_APP_ID_KEY);

if (targetAppId == null) {
telemetry.setType("Http");
telemetry.setTarget(target);
} else {
// using "Http (tracked component)" is important for dependencies that go cross-component (have an appId in their target field)
// if you use just HTTP, Breeze will remove appid from the target
// TODO (trask) remove this once confirmed by zakima that it is no longer needed
telemetry.setType("Http (tracked component)");
telemetry.setTarget(target + " | " + targetAppId);
}
telemetry.setType("Http");
telemetry.setTarget(target);

Long httpStatusCode = attributes.get(AttributeKey.longKey("http.status_code"));
if (httpStatusCode != null) {
Expand Down Expand Up @@ -404,6 +403,24 @@ private void applyMessagingClientSpan(Attributes attributes, RemoteDependencyDat
}
}

// special case needed until Azure SDK moves to OTel semantic conventions
private static void applyEventHubsSpan(Attributes attributes, RemoteDependencyData telemetry) {
telemetry.setType("Microsoft.EventHub");
telemetry.setTarget(getAzureSdkTargetSource(attributes));
}

// special case needed until Azure SDK moves to OTel semantic conventions
private static void applyServiceBusSpan(Attributes attributes, RemoteDependencyData telemetry) {
telemetry.setType("AZURE SERVICE BUS");
telemetry.setTarget(getAzureSdkTargetSource(attributes));
}

private static String getAzureSdkTargetSource(Attributes attributes) {
String peerAddress = attributes.get(AZURE_SDK_PEER_ADDRESS);
String destination = attributes.get(AZURE_SDK_MESSAGE_BUS_DESTINATION);
return peerAddress + "/" + destination;
}

private static int getDefaultPortForDbSystem(String dbSystem) {
switch (dbSystem) {
// TODO (trask) add these default ports to the OpenTelemetry database semantic conventions spec
Expand Down Expand Up @@ -435,32 +452,26 @@ private void exportRequest(SpanData span, List<TelemetryItem> telemetryItems) {
monitorBase.setBaseType("RequestData");
monitorBase.setBaseData(requestData);

String source = null;
Attributes attributes = span.getAttributes();

String sourceAppId = attributes.get(AI_SPAN_SOURCE_APP_ID_KEY);

if (sourceAppId != null) {
source = sourceAppId;
}
if (source == null) {
String messagingSystem = attributes.get(AttributeKey.stringKey("messaging.system"));
if (messagingSystem != null) {
// TODO (trask) should this pass default port for messaging.system?
source = nullAwareConcat(getTargetFromPeerAttributes(attributes, 0),
attributes.get(AttributeKey.stringKey("messaging.destination")), "/");
if (source == null) {
source = messagingSystem;
requestData.setSource(getSource(attributes));

if (isAzureQueue(attributes)) {
// TODO(trask): for batch consumer, enqueuedTime should be the average of this attribute
// across all links
Long enqueuedTime = attributes.get(AZURE_SDK_ENQUEUED_TIME);
if (enqueuedTime != null) {
long timeSinceEnqueued =
NANOSECONDS.toMillis(span.getStartEpochNanos()) - SECONDS.toMillis(enqueuedTime);
if (timeSinceEnqueued < 0) {
timeSinceEnqueued = 0;
}
if (requestData.getMeasurements() == null) {
requestData.setMeasurements(new HashMap<>());
}
requestData.getMeasurements().put("timeSinceEnqueued", (double) timeSinceEnqueued);
}
}
if (source == null) {
// this is only used by the 2.x web interop bridge
// for ThreadContext.getRequestTelemetryContext().getRequestTelemetry().setSource()

source = attributes.get(AI_SPAN_SOURCE_KEY);
}
requestData.setSource(source);

addLinks(requestData.getProperties(), span.getLinks());
Long httpStatusCode = attributes.get(AttributeKey.longKey("http.status_code"));
Expand Down Expand Up @@ -518,6 +529,35 @@ private void exportRequest(SpanData span, List<TelemetryItem> telemetryItems) {
exportEvents(span, samplingPercentage, telemetryItems);
}

private static String getSource(Attributes attributes) {
if (isAzureQueue(attributes)) {
return getAzureSdkTargetSource(attributes);
}
String messagingSystem = attributes.get(AttributeKey.stringKey("messaging.system"));
if (messagingSystem != null) {
// TODO (trask) AI mapping: should this pass default port for messaging.system?
String source =
nullAwareConcat(
getTargetFromPeerAttributes(attributes, 0),
attributes.get(AttributeKey.stringKey("messaging.destination")),
"/");
if (source != null) {
return source;
}
// fallback
return messagingSystem;
}
return null;
}

private static boolean isAzureQueue(Attributes attributes) {
String azureNamespace = attributes.get(AZURE_NAMESPACE);
if (azureNamespace == null) {
return false;
}
return azureNamespace.equals("Microsoft.EventHub") || azureNamespace.equals("Microsoft.ServiceBus");
}

private static String nullAwareConcat(String str1, String str2, String separator) {
if (str1 == null) {
return str2;
Expand Down Expand Up @@ -666,6 +706,11 @@ private void setExtraAttributes(TelemetryItem telemetry, Map<String, String> pro
if (stringKey.startsWith("applicationinsights.internal.")) {
return;
}
// TODO (trask) use az.namespace for something?
if (stringKey.equals(AZURE_SDK_MESSAGE_BUS_DESTINATION.getKey())
|| stringKey.equals("az.namespace")) {
return;
}
// special case mappings
if (key.getKey().equals("enduser.id") && value instanceof String) {
telemetry.getTags().put(ContextTagKeys.AI_USER_ID.toString(), (String) value);
Expand Down

0 comments on commit ee858a3

Please sign in to comment.