Skip to content

Commit

Permalink
Implement capturing message headers for aws2 sqs spans
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit committed Nov 10, 2023
1 parent 4a9b965 commit 792daa5
Show file tree
Hide file tree
Showing 19 changed files with 406 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ plugins {
base.archivesName.set("${base.archivesName.get()}-autoconfigure")

dependencies {
compileOnly(project(":javaagent-extension-api"))

implementation(project(":instrumentation:aws-sdk:aws-sdk-2.2:library"))

library("software.amazon.awssdk:aws-core:2.2.0")
Expand All @@ -29,5 +31,6 @@ tasks {
systemProperty("otel.instrumentation.aws-sdk.experimental-span-attributes", true)
systemProperty("otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", true)
systemProperty("otel.instrumentation.aws-sdk.experimental-record-individual-http-error", true)
systemProperty("otel.instrumentation.messaging.experimental.capture-headers", "test-message-header")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,75 @@

package io.opentelemetry.instrumentation.awssdk.v2_2.autoconfigure;

import static java.util.Collections.emptyList;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.internal.ConfigPropertiesUtil;
import io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdkTelemetry;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
import java.util.List;

public final class AwsSdkSingletons {

private static final boolean CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES =
ConfigPropertiesUtil.getBoolean(
"otel.instrumentation.aws-sdk.experimental-span-attributes", false);
private static final boolean HAS_INSTRUMENTATION_CONFIG = hasAgentConfiguration();
private static final AwsSdkTelemetry TELEMETRY =
AwsSdkTelemetry.builder(GlobalOpenTelemetry.get())
.setCapturedHeaders(getCapturedHeaders())
.setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes())
.setMessagingReceiveInstrumentationEnabled(messagingReceiveInstrumentationEnabled())
.setUseConfiguredPropagatorForMessaging(useMessagingPropagator())
.setRecordIndividualHttpError(recordIndividualHttpError())
.build();

private static final boolean USE_MESSAGING_PROPAGATOR =
ConfigPropertiesUtil.getBoolean(
"otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", false);
private static boolean hasAgentConfiguration() {
try {
Class.forName("io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig");
return true;
} catch (ClassNotFoundException e) {
return false;
}
}

private static final boolean RECORD_INDIVIDUAL_HTTP_ERROR =
ConfigPropertiesUtil.getBoolean(
"otel.instrumentation.aws-sdk.experimental-record-individual-http-error", false);
private static List<String> getCapturedHeaders() {
if (HAS_INSTRUMENTATION_CONFIG) {
return ExperimentalConfig.get().getMessagingHeaders();
} else {
return ConfigPropertiesUtil.getList(
"otel.instrumentation.messaging.experimental.capture-headers", emptyList());
}
}

private static boolean captureExperimentalSpanAttributes() {
return getBoolean("otel.instrumentation.aws-sdk.experimental-span-attributes", false);
}

private static final boolean RECEIVE_TELEMETRY_ENABLED =
ConfigPropertiesUtil.getBoolean(
private static boolean messagingReceiveInstrumentationEnabled() {
if (HAS_INSTRUMENTATION_CONFIG) {
return ExperimentalConfig.get().messagingReceiveInstrumentationEnabled();
} else {
return ConfigPropertiesUtil.getBoolean(
"otel.instrumentation.messaging.experimental.receive-telemetry.enabled", false);
}
}

private static final AwsSdkTelemetry TELEMETRY =
AwsSdkTelemetry.builder(GlobalOpenTelemetry.get())
.setCaptureExperimentalSpanAttributes(CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES)
.setMessagingReceiveInstrumentationEnabled(RECEIVE_TELEMETRY_ENABLED)
.setUseConfiguredPropagatorForMessaging(USE_MESSAGING_PROPAGATOR)
.setRecordIndividualHttpError(RECORD_INDIVIDUAL_HTTP_ERROR)
.build();
private static boolean useMessagingPropagator() {
return getBoolean(
"otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", false);
}

private static boolean recordIndividualHttpError() {
return getBoolean(
"otel.instrumentation.aws-sdk.experimental-record-individual-http-error", false);
}

private static boolean getBoolean(String name, boolean defaultValue) {
if (HAS_INSTRUMENTATION_CONFIG) {
return InstrumentationConfig.get().getBoolean(name, defaultValue);
} else {
return ConfigPropertiesUtil.getBoolean(name, defaultValue);
}
}

public static AwsSdkTelemetry telemetry() {
return TELEMETRY;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.awssdk.v2_2;

import software.amazon.awssdk.core.interceptor.ExecutionAttributes;

abstract class AbstractSqsRequest {

public abstract ExecutionAttributes getRequest();
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package io.opentelemetry.instrumentation.awssdk.v2_2;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.trace.Span;
Expand All @@ -18,11 +21,13 @@
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.rpc.RpcClientAttributesExtractor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import javax.annotation.Nullable;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.SdkExecutionAttribute;
Expand Down Expand Up @@ -63,38 +68,73 @@ final class AwsSdkInstrumenterFactory {
Arrays.asList(
rpcAttributesExtractor, httpAttributesExtractor, experimentalAttributesExtractor);

static Instrumenter<ExecutionAttributes, Response> requestInstrumenter(
OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) {
private final OpenTelemetry openTelemetry;
@Nullable private final TextMapPropagator messagingPropagator;
private final List<String> capturedHeaders;
private final boolean captureExperimentalSpanAttributes;
private final boolean messagingReceiveInstrumentationEnabled;
private final boolean useXrayPropagator;

AwsSdkInstrumenterFactory(
OpenTelemetry openTelemetry,
@Nullable TextMapPropagator messagingPropagator,
List<String> capturedHeaders,
boolean captureExperimentalSpanAttributes,
boolean messagingReceiveInstrumentationEnabled,
boolean useXrayPropagator) {
this.openTelemetry = openTelemetry;
this.messagingPropagator = messagingPropagator;
this.capturedHeaders = capturedHeaders;
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
this.messagingReceiveInstrumentationEnabled = messagingReceiveInstrumentationEnabled;
this.useXrayPropagator = useXrayPropagator;
}

Instrumenter<ExecutionAttributes, Response> requestInstrumenter() {
return createInstrumenter(
openTelemetry,
captureExperimentalSpanAttributes
? extendedAttributesExtractors
: defaultAttributesExtractors,
AwsSdkInstrumenterFactory::spanName,
SpanKindExtractor.alwaysClient(),
attributesExtractors(),
emptyList(),
true);
}

static Instrumenter<ExecutionAttributes, Response> consumerReceiveInstrumenter(
OpenTelemetry openTelemetry,
boolean captureExperimentalSpanAttributes,
boolean messagingReceiveInstrumentationEnabled) {
return sqsInstrumenter(
private List<AttributesExtractor<ExecutionAttributes, Response>> attributesExtractors() {
return captureExperimentalSpanAttributes
? extendedAttributesExtractors
: defaultAttributesExtractors;
}

private List<AttributesExtractor<ExecutionAttributes, Response>> consumerAttributesExtractors() {
return captureExperimentalSpanAttributes
? extendedConsumerAttributesExtractors
: defaultConsumerAttributesExtractors;
}

private <REQUEST, RESPONSE> AttributesExtractor<REQUEST, RESPONSE> messagingAttributesExtractor(
MessagingAttributesGetter<REQUEST, RESPONSE> getter, MessageOperation operation) {
return MessagingAttributesExtractor.builder(getter, operation)
.setCapturedHeaders(capturedHeaders)
.build();
}

Instrumenter<SqsReceiveRequest, Response> consumerReceiveInstrumenter() {
MessageOperation operation = MessageOperation.RECEIVE;
SqsReceiveRequestAttributesGetter getter = SqsReceiveRequestAttributesGetter.INSTANCE;
AttributesExtractor<SqsReceiveRequest, Response> messagingAttributeExtractor =
messagingAttributesExtractor(getter, operation);

return createInstrumenter(
openTelemetry,
MessageOperation.RECEIVE,
captureExperimentalSpanAttributes
? extendedConsumerAttributesExtractors
: defaultConsumerAttributesExtractors,
MessagingSpanNameExtractor.create(getter, operation),
SpanKindExtractor.alwaysConsumer(),
toSqsRequestExtractors(consumerAttributesExtractors(), Function.identity()),
singletonList(messagingAttributeExtractor),
messagingReceiveInstrumentationEnabled);
}

static Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter(
OpenTelemetry openTelemetry,
TextMapPropagator messagingPropagator,
boolean captureExperimentalSpanAttributes,
boolean messagingReceiveInstrumentationEnabled,
boolean shouldUseXrayPropagator) {
Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter() {
MessageOperation operation = MessageOperation.PROCESS;
SqsProcessRequestAttributesGetter getter = SqsProcessRequestAttributesGetter.INSTANCE;

Expand All @@ -104,96 +144,83 @@ static Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter(
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractors(
toProcessRequestExtractors(
captureExperimentalSpanAttributes
? extendedConsumerAttributesExtractors
: defaultConsumerAttributesExtractors))
.addAttributesExtractor(
MessagingAttributesExtractor.builder(getter, operation).build());
toSqsRequestExtractors(consumerAttributesExtractors(), unused -> null))
.addAttributesExtractor(messagingAttributesExtractor(getter, operation));

if (messagingReceiveInstrumentationEnabled) {
builder.addSpanLinksExtractor(
(spanLinks, parentContext, request) -> {
Context extracted =
SqsParentContext.ofMessage(
request.getMessage(), messagingPropagator, shouldUseXrayPropagator);
request.getMessage(), messagingPropagator, useXrayPropagator);
spanLinks.addLink(Span.fromContext(extracted).getSpanContext());
});
}
return builder.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
}

private static List<AttributesExtractor<SqsProcessRequest, Void>> toProcessRequestExtractors(
List<AttributesExtractor<ExecutionAttributes, Response>> extractors) {
List<AttributesExtractor<SqsProcessRequest, Void>> result = new ArrayList<>();
private static <RESPONSE>
List<AttributesExtractor<AbstractSqsRequest, RESPONSE>> toSqsRequestExtractors(
List<AttributesExtractor<ExecutionAttributes, Response>> extractors,
Function<RESPONSE, Response> responseConverter) {
List<AttributesExtractor<AbstractSqsRequest, RESPONSE>> result = new ArrayList<>();
for (AttributesExtractor<ExecutionAttributes, Response> extractor : extractors) {
result.add(
new AttributesExtractor<SqsProcessRequest, Void>() {
new AttributesExtractor<AbstractSqsRequest, RESPONSE>() {
@Override
public void onStart(
AttributesBuilder attributes,
Context parentContext,
SqsProcessRequest sqsProcessRequest) {
extractor.onStart(attributes, parentContext, sqsProcessRequest.getRequest());
AbstractSqsRequest sqsRequest) {
extractor.onStart(attributes, parentContext, sqsRequest.getRequest());
}

@Override
public void onEnd(
AttributesBuilder attributes,
Context context,
SqsProcessRequest sqsProcessRequest,
@Nullable Void unused,
AbstractSqsRequest sqsRequest,
@Nullable RESPONSE response,
@Nullable Throwable error) {
extractor.onEnd(attributes, context, sqsProcessRequest.getRequest(), null, error);
extractor.onEnd(
attributes,
context,
sqsRequest.getRequest(),
responseConverter.apply(response),
error);
}
});
}
return result;
}

static Instrumenter<ExecutionAttributes, Response> producerInstrumenter(
OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) {
return sqsInstrumenter(
openTelemetry,
MessageOperation.PUBLISH,
captureExperimentalSpanAttributes
? extendedAttributesExtractors
: defaultAttributesExtractors,
true);
}

private static Instrumenter<ExecutionAttributes, Response> sqsInstrumenter(
OpenTelemetry openTelemetry,
MessageOperation operation,
List<AttributesExtractor<ExecutionAttributes, Response>> extractors,
boolean enabled) {
Instrumenter<ExecutionAttributes, Response> producerInstrumenter() {
MessageOperation operation = MessageOperation.PUBLISH;
SqsAttributesGetter getter = SqsAttributesGetter.INSTANCE;
AttributesExtractor<ExecutionAttributes, Response> messagingAttributeExtractor =
MessagingAttributesExtractor.builder(getter, operation).build();
List<AttributesExtractor<ExecutionAttributes, Response>> newExtractors =
new ArrayList<>(extractors);
newExtractors.add(messagingAttributeExtractor);
messagingAttributesExtractor(getter, operation);

return createInstrumenter(
openTelemetry,
newExtractors,
MessagingSpanNameExtractor.create(getter, operation),
operation == MessageOperation.PUBLISH
? SpanKindExtractor.alwaysProducer()
: SpanKindExtractor.alwaysConsumer(),
enabled);
SpanKindExtractor.alwaysProducer(),
attributesExtractors(),
singletonList(messagingAttributeExtractor),
true);
}

private static Instrumenter<ExecutionAttributes, Response> createInstrumenter(
private static <REQUEST, RESPONSE> Instrumenter<REQUEST, RESPONSE> createInstrumenter(
OpenTelemetry openTelemetry,
List<AttributesExtractor<ExecutionAttributes, Response>> extractors,
SpanNameExtractor<ExecutionAttributes> spanNameExtractor,
SpanKindExtractor<ExecutionAttributes> spanKindExtractor,
SpanNameExtractor<REQUEST> spanNameExtractor,
SpanKindExtractor<REQUEST> spanKindExtractor,
List<? extends AttributesExtractor<? super REQUEST, ? super RESPONSE>> attributeExtractors,
List<AttributesExtractor<REQUEST, RESPONSE>> additionalAttributeExtractors,
boolean enabled) {

return Instrumenter.<ExecutionAttributes, Response>builder(
return Instrumenter.<REQUEST, RESPONSE>builder(
openTelemetry, INSTRUMENTATION_NAME, spanNameExtractor)
.addAttributesExtractors(extractors)
.addAttributesExtractors(attributeExtractors)
.addAttributesExtractors(additionalAttributeExtractors)
.setEnabled(enabled)
.buildInstrumenter(spanKindExtractor);
}
Expand All @@ -203,6 +230,4 @@ private static String spanName(ExecutionAttributes attributes) {
String awsOperation = attributes.getAttribute(SdkExecutionAttribute.OPERATION_NAME);
return awsServiceName + "." + awsOperation;
}

private AwsSdkInstrumenterFactory() {}
}
Loading

0 comments on commit 792daa5

Please sign in to comment.