Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve AzMon Trace Exporter support for EventHubs #21668

Merged
merged 9 commits into from
Aug 12, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.regex.Pattern;

import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

/**
* This class is an implementation of OpenTelemetry {@link SpanExporter} that allows different tracing services to
Expand All @@ -57,14 +58,14 @@ public final class AzureMonitorTraceExporter implements SpanExporter {

private static final Set<String> STANDARD_ATTRIBUTE_PREFIXES;

// this is only used for distributed trace correlation across different ikeys
// and will be obsolete soon (as this will be handled by backend indexing going forward)
private static final AttributeKey<String> AI_SPAN_SOURCE_APP_ID_KEY = AttributeKey.stringKey("applicationinsights.internal.source_app_id");
private static final AttributeKey<String> AI_SPAN_TARGET_APP_ID_KEY = AttributeKey.stringKey("applicationinsights.internal.target_app_id");

// this is only used by the 2.x web interop bridge
// for ThreadContext.getRequestTelemetryContext().getRequestTelemetry().setSource()
private static final AttributeKey<String> AI_SPAN_SOURCE_KEY = AttributeKey.stringKey("applicationinsights.internal.source");
private static final AttributeKey<String> AZURE_NAMESPACE =
AttributeKey.stringKey("az.namespace");
private static final AttributeKey<String> AZURE_SDK_PEER_ADDRESS =
AttributeKey.stringKey("peer.address");
private static final AttributeKey<String> AZURE_SDK_MESSAGE_BUS_DESTINATION =
AttributeKey.stringKey("message_bus.destination");
private static final AttributeKey<Long> AZURE_SDK_ENQUEUED_TIME =
AttributeKey.longKey("x-opt-enqueued-time");

static {
Set<String> dbSystems = new HashSet<>();
Expand Down Expand Up @@ -216,12 +217,10 @@ private void exportRemoteDependency(SpanData span, boolean inProc,
addLinks(remoteDependencyData.getProperties(), span.getLinks());
remoteDependencyData.setName(span.getName());

Attributes attributes = span.getAttributes();

if (inProc) {
remoteDependencyData.setType("InProc");
} else {
applySemanticConventions(attributes, remoteDependencyData, span.getKind());
applySemanticConventions(span, remoteDependencyData);
}

remoteDependencyData.setId(span.getSpanId());
Expand All @@ -237,7 +236,7 @@ private void exportRemoteDependency(SpanData span, boolean inProc,

remoteDependencyData.setSuccess(span.getStatus().getStatusCode() != StatusCode.ERROR);

setExtraAttributes(telemetryItem, remoteDependencyData.getProperties(), attributes);
setExtraAttributes(telemetryItem, remoteDependencyData.getProperties(), span.getAttributes());

// sampling will not be supported in this exporter
Double samplingPercentage = 100.0;
Expand All @@ -246,7 +245,8 @@ private void exportRemoteDependency(SpanData span, boolean inProc,
exportEvents(span, samplingPercentage, telemetryItems);
}

private void applySemanticConventions(Attributes attributes, RemoteDependencyData remoteDependencyData, SpanKind spanKind) {
private void applySemanticConventions(SpanData span, RemoteDependencyData remoteDependencyData) {
Attributes attributes = span.getAttributes();
String httpMethod = attributes.get(AttributeKey.stringKey("http.method"));
if (httpMethod != null) {
applyHttpClientSpan(attributes, remoteDependencyData);
Expand All @@ -262,9 +262,18 @@ private void applySemanticConventions(Attributes attributes, RemoteDependencyDat
applyDatabaseClientSpan(attributes, remoteDependencyData, dbSystem);
return;
}
String azureNamespace = attributes.get(AZURE_NAMESPACE);
if (azureNamespace != null && azureNamespace.equals("Microsoft.EventHub")) {
applyEventHubsSpan(attributes, remoteDependencyData);
return;
}
if (azureNamespace != null && azureNamespace.equals("Microsoft.ServiceBus")) {
applyServiceBusSpan(attributes, remoteDependencyData);
return;
}
String messagingSystem = attributes.get(AttributeKey.stringKey("messaging.system"));
if (messagingSystem != null) {
applyMessagingClientSpan(attributes, remoteDependencyData, messagingSystem, spanKind);
applyMessagingClientSpan(attributes, remoteDependencyData, messagingSystem, span.getKind());
return;
}
}
Expand Down Expand Up @@ -308,18 +317,8 @@ private void applyHttpClientSpan(Attributes attributes, RemoteDependencyData tel
target = "Http";
}

String targetAppId = attributes.get(AI_SPAN_TARGET_APP_ID_KEY);

if (targetAppId == null) {
telemetry.setType("Http");
telemetry.setTarget(target);
} else {
// using "Http (tracked component)" is important for dependencies that go cross-component (have an appId in their target field)
// if you use just HTTP, Breeze will remove appid from the target
// TODO (trask) remove this once confirmed by zakima that it is no longer needed
telemetry.setType("Http (tracked component)");
telemetry.setTarget(target + " | " + targetAppId);
}
telemetry.setType("Http");
telemetry.setTarget(target);

Long httpStatusCode = attributes.get(AttributeKey.longKey("http.status_code"));
if (httpStatusCode != null) {
Expand Down Expand Up @@ -404,6 +403,24 @@ private void applyMessagingClientSpan(Attributes attributes, RemoteDependencyDat
}
}

// special case needed until Azure SDK moves to OTel semantic conventions
private static void applyEventHubsSpan(Attributes attributes, RemoteDependencyData telemetry) {
telemetry.setType("Microsoft.EventHub");
telemetry.setTarget(getAzureSdkTargetSource(attributes));
}

// special case needed until Azure SDK moves to OTel semantic conventions
private static void applyServiceBusSpan(Attributes attributes, RemoteDependencyData telemetry) {
telemetry.setType("AZURE SERVICE BUS");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why EventHubs is Microsoft.EventHub and Service Bus is AZURE SERVICE BUS ? I belive they are different generations of telemetry types, just wonder if ServiceBus can also move to Azure resource provider namespace

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya, i'll raise this with U/X team to support Microsoft.ServiceBus 👍

telemetry.setTarget(getAzureSdkTargetSource(attributes));
}

private static String getAzureSdkTargetSource(Attributes attributes) {
String peerAddress = attributes.get(AZURE_SDK_PEER_ADDRESS);
String destination = attributes.get(AZURE_SDK_MESSAGE_BUS_DESTINATION);
return peerAddress + "/" + destination;
}

private static int getDefaultPortForDbSystem(String dbSystem) {
switch (dbSystem) {
// TODO (trask) add these default ports to the OpenTelemetry database semantic conventions spec
Expand Down Expand Up @@ -435,32 +452,26 @@ private void exportRequest(SpanData span, List<TelemetryItem> telemetryItems) {
monitorBase.setBaseType("RequestData");
monitorBase.setBaseData(requestData);

String source = null;
Attributes attributes = span.getAttributes();

String sourceAppId = attributes.get(AI_SPAN_SOURCE_APP_ID_KEY);

if (sourceAppId != null) {
source = sourceAppId;
}
if (source == null) {
String messagingSystem = attributes.get(AttributeKey.stringKey("messaging.system"));
if (messagingSystem != null) {
// TODO (trask) should this pass default port for messaging.system?
source = nullAwareConcat(getTargetFromPeerAttributes(attributes, 0),
attributes.get(AttributeKey.stringKey("messaging.destination")), "/");
if (source == null) {
source = messagingSystem;
requestData.setSource(getSource(attributes));

if (isAzureQueue(attributes)) {
// TODO(trask): for batch consumer, enqueuedTime should be the average of this attribute
// across all links
Long enqueuedTime = attributes.get(AZURE_SDK_ENQUEUED_TIME);
if (enqueuedTime != null) {
long timeSinceEnqueued =
NANOSECONDS.toMillis(span.getStartEpochNanos()) - SECONDS.toMillis(enqueuedTime);
if (timeSinceEnqueued < 0) {
timeSinceEnqueued = 0;
}
if (requestData.getMeasurements() == null) {
requestData.setMeasurements(new HashMap<>());
}
requestData.getMeasurements().put("timeSinceEnqueued", (double) timeSinceEnqueued);
}
}
if (source == null) {
// this is only used by the 2.x web interop bridge
// for ThreadContext.getRequestTelemetryContext().getRequestTelemetry().setSource()

source = attributes.get(AI_SPAN_SOURCE_KEY);
}
requestData.setSource(source);

addLinks(requestData.getProperties(), span.getLinks());
Long httpStatusCode = attributes.get(AttributeKey.longKey("http.status_code"));
Expand Down Expand Up @@ -518,6 +529,35 @@ private void exportRequest(SpanData span, List<TelemetryItem> telemetryItems) {
exportEvents(span, samplingPercentage, telemetryItems);
}

private static String getSource(Attributes attributes) {
if (isAzureQueue(attributes)) {
return getAzureSdkTargetSource(attributes);
}
String messagingSystem = attributes.get(AttributeKey.stringKey("messaging.system"));
if (messagingSystem != null) {
// TODO (trask) AI mapping: should this pass default port for messaging.system?
String source =
nullAwareConcat(
getTargetFromPeerAttributes(attributes, 0),
attributes.get(AttributeKey.stringKey("messaging.destination")),
"/");
if (source != null) {
return source;
}
// fallback
return messagingSystem;
}
return null;
}

private static boolean isAzureQueue(Attributes attributes) {
String azureNamespace = attributes.get(AZURE_NAMESPACE);
if (azureNamespace == null) {
return false;
}
return azureNamespace.equals("Microsoft.EventHub") || azureNamespace.equals("Microsoft.ServiceBus");
}

private static String nullAwareConcat(String str1, String str2, String separator) {
if (str1 == null) {
return str2;
Expand Down Expand Up @@ -666,6 +706,11 @@ private void setExtraAttributes(TelemetryItem telemetry, Map<String, String> pro
if (stringKey.startsWith("applicationinsights.internal.")) {
return;
}
// TODO (trask) use az.namespace for something?
if (stringKey.equals(AZURE_SDK_MESSAGE_BUS_DESTINATION.getKey())
|| stringKey.equals("az.namespace")) {
return;
}
// special case mappings
if (key.getKey().equals("enduser.id") && value instanceof String) {
telemetry.getTags().put(ContextTagKeys.AI_USER_ID.toString(), (String) value);
Expand Down