From 3332095692464c3d4f40fce454a181cb4548b899 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Tue, 7 Nov 2023 18:25:50 +0200 Subject: [PATCH] Add process spans to aws-1 sqs instrumentation (#9796) --- .../aws-sdk-1.11/javaagent/build.gradle.kts | 23 +- .../awssdk/v1_11/TracingRequestHandler.java | 3 + .../src/test/groovy/S3TracingTest.groovy | 43 ++- .../src/test/groovy/SnsTracingTest.groovy | 21 +- .../v1_11/SqsSuppressReceiveSpansTest.groovy | 17 ++ .../library-autoconfigure/build.gradle.kts | 24 +- .../autoconfigure/TracingRequestHandler.java | 3 + .../SqsSuppressReceiveSpansTest.groovy | 17 ++ .../v1_11/AwsSdkInstrumenterFactory.java | 91 +++++- .../awssdk/v1_11/AwsSdkTelemetry.java | 26 +- .../awssdk/v1_11/AwsSdkTelemetryBuilder.java | 17 +- .../awssdk/v1_11/SqsAccess.java | 10 +- .../instrumentation/awssdk/v1_11/SqsImpl.java | 78 ++++- .../awssdk/v1_11/SqsMessage.java | 17 ++ .../awssdk/v1_11/SqsMessageImpl.java | 27 ++ .../awssdk/v1_11/SqsProcessRequest.java | 30 ++ .../SqsProcessRequestAttributesGetter.java | 64 ++++ .../awssdk/v1_11/TracingIterator.java | 94 ++++++ .../awssdk/v1_11/TracingList.java | 82 +++++ .../awssdk/v1_11/TracingRequestHandler.java | 57 +++- .../v1_11/SqsSuppressReceiveSpansTest.groovy | 20 ++ .../awssdk/v1_11/SqsTracingTest.groovy | 1 + ...AbstractSqsSuppressReceiveSpansTest.groovy | 283 ++++++++++++++++++ .../v1_11/AbstractSqsTracingTest.groovy | 111 +++++-- .../apachecamel/aws/AwsConnector.java | 6 +- .../apachecamel/aws/AwsSpanAssertions.java | 29 +- .../apachecamel/aws/SqsCamelTest.java | 6 +- 27 files changed, 1088 insertions(+), 112 deletions(-) create mode 100644 instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/testSqsNoReceiveTelemetry/groovy/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/SqsSuppressReceiveSpansTest.groovy create mode 100644 instrumentation/aws-sdk/aws-sdk-1.11/library-autoconfigure/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/instrumentor/SqsSuppressReceiveSpansTest.groovy create mode 100644 instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsMessage.java create mode 100644 instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsMessageImpl.java create mode 100644 instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsProcessRequest.java create mode 100644 instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsProcessRequestAttributesGetter.java create mode 100644 instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/TracingIterator.java create mode 100644 instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/TracingList.java create mode 100644 instrumentation/aws-sdk/aws-sdk-1.11/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/SqsSuppressReceiveSpansTest.groovy create mode 100644 instrumentation/aws-sdk/aws-sdk-1.11/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/AbstractSqsSuppressReceiveSpansTest.groovy diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/build.gradle.kts b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/build.gradle.kts index e04a0ce1b5a0..341cbc8c9329 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/build.gradle.kts +++ b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/build.gradle.kts @@ -96,6 +96,22 @@ testing { implementation("com.amazonaws:aws-java-sdk-sqs:1.11.106") } + + targets { + all { + testTask.configure { + jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true") + } + } + } + } + + val testSqsNoReceiveTelemetry by registering(JvmTestSuite::class) { + dependencies { + implementation(project(":instrumentation:aws-sdk:aws-sdk-1.11:testing")) + + implementation("com.amazonaws:aws-java-sdk-sqs:1.11.106") + } } } } @@ -105,14 +121,19 @@ tasks { check { dependsOn(testing.suites) } + } else { + check { + dependsOn(testing.suites.named("testSqs"), testing.suites.named("testSqsNoReceiveTelemetry")) + } } test { - systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean) + usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) } withType().configureEach { // TODO run tests both with and without experimental span attributes jvmArgs("-Dotel.instrumentation.aws-sdk.experimental-span-attributes=true") + systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean) } } diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/TracingRequestHandler.java b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/TracingRequestHandler.java index 0e31a1f29c16..8891b9072164 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/TracingRequestHandler.java +++ b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/TracingRequestHandler.java @@ -14,6 +14,7 @@ import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; import io.opentelemetry.instrumentation.awssdk.v1_11.AwsSdkTelemetry; +import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig; /** @@ -37,6 +38,8 @@ public class TracingRequestHandler extends RequestHandler2 { .setCaptureExperimentalSpanAttributes( InstrumentationConfig.get() .getBoolean("otel.instrumentation.aws-sdk.experimental-span-attributes", false)) + .setMessagingReceiveInstrumentationEnabled( + ExperimentalConfig.get().messagingReceiveInstrumentationEnabled()) .build() .newRequestHandler(); diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/S3TracingTest.groovy b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/S3TracingTest.groovy index 1a66b2f6c494..e1caeb51748e 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/S3TracingTest.groovy +++ b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/S3TracingTest.groovy @@ -36,7 +36,11 @@ class S3TracingTest extends AgentInstrumentationSpecification { awsConnector.receiveMessage(queueUrl) awsConnector.putSampleData(bucketName) // traced message - awsConnector.receiveMessage(queueUrl) + def receiveMessageResult = awsConnector.receiveMessage(queueUrl) + receiveMessageResult.messages.each {message -> + runWithSpan("process child") {} + } + // cleanup awsConnector.deleteBucket(bucketName) awsConnector.purgeQueue(queueUrl) @@ -168,7 +172,7 @@ class S3TracingTest extends AgentInstrumentationSpecification { } } } - trace(5, 2) { + trace(5, 3) { span(0) { name "S3.PutObject" kind CLIENT @@ -192,7 +196,7 @@ class S3TracingTest extends AgentInstrumentationSpecification { } } span(1) { - name "s3ToSqsTestQueue receive" + name "s3ToSqsTestQueue process" kind CONSUMER childOf span(0) attributes { @@ -203,17 +207,18 @@ class S3TracingTest extends AgentInstrumentationSpecification { "rpc.system" "aws-api" "rpc.service" "AmazonSQS" "http.method" "POST" - "http.status_code" 200 "http.url" String "net.peer.name" String - "$SemanticAttributes.NET_PROTOCOL_NAME" "http" - "$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1" "net.peer.port" { it == null || Number } "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS" "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "s3ToSqsTestQueue" - "$SemanticAttributes.MESSAGING_OPERATION" "receive" - "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long } - "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long } + "$SemanticAttributes.MESSAGING_OPERATION" "process" + } + } + span(2) { + name "process child" + childOf span(1) + attributes { } } } @@ -336,7 +341,10 @@ class S3TracingTest extends AgentInstrumentationSpecification { awsConnector.receiveMessage(queueUrl) awsConnector.putSampleData(bucketName) // traced message - awsConnector.receiveMessage(queueUrl) + def receiveMessageResult = awsConnector.receiveMessage(queueUrl) + receiveMessageResult.messages.each {message -> + runWithSpan("process child") {} + } // cleanup awsConnector.deleteBucket(bucketName) awsConnector.purgeQueue(queueUrl) @@ -556,9 +564,9 @@ class S3TracingTest extends AgentInstrumentationSpecification { } } } - trace(9, 1) { + trace(9, 2) { span(0) { - name "s3ToSnsToSqsTestQueue receive" + name "s3ToSnsToSqsTestQueue process" kind CONSUMER hasNoParent() attributes { @@ -569,19 +577,22 @@ class S3TracingTest extends AgentInstrumentationSpecification { "rpc.system" "aws-api" "rpc.service" "AmazonSQS" "http.method" "POST" - "http.status_code" 200 "http.url" String "net.peer.name" String - "$SemanticAttributes.NET_PROTOCOL_NAME" "http" - "$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1" "net.peer.port" { it == null || Number } "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS" "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "s3ToSnsToSqsTestQueue" - "$SemanticAttributes.MESSAGING_OPERATION" "receive" + "$SemanticAttributes.MESSAGING_OPERATION" "process" "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long } "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long } } } + span(1) { + name "process child" + childOf span(0) + attributes { + } + } } trace(10, 1) { span(0) { diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/SnsTracingTest.groovy b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/SnsTracingTest.groovy index e702c04fec4b..32ce42e1f2bc 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/SnsTracingTest.groovy +++ b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/SnsTracingTest.groovy @@ -32,7 +32,10 @@ class SnsTracingTest extends AgentInstrumentationSpecification { when: awsConnector.publishSampleNotification(topicArn) - awsConnector.receiveMessage(queueUrl) + def receiveMessageResult = awsConnector.receiveMessage(queueUrl) + receiveMessageResult.messages.each {message -> + runWithSpan("process child") {} + } then: assertTraces(6) { @@ -154,7 +157,7 @@ class SnsTracingTest extends AgentInstrumentationSpecification { } } } - trace(5, 2) { + trace(5, 3) { span(0) { name "SNS.Publish" kind CLIENT @@ -176,7 +179,7 @@ class SnsTracingTest extends AgentInstrumentationSpecification { } } span(1) { - name "snsToSqsTestQueue receive" + name "snsToSqsTestQueue process" kind CONSUMER childOf span(0) attributes { @@ -187,16 +190,18 @@ class SnsTracingTest extends AgentInstrumentationSpecification { "rpc.service" "AmazonSQS" "rpc.method" "ReceiveMessage" "http.method" "POST" - "http.status_code" 200 "http.url" String "net.peer.name" String - "$SemanticAttributes.NET_PROTOCOL_NAME" "http" - "$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1" "net.peer.port" { it == null || Number } "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS" "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "snsToSqsTestQueue" - "$SemanticAttributes.MESSAGING_OPERATION" "receive" - "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long + "$SemanticAttributes.MESSAGING_OPERATION" "process" + } + } + span(2) { + name "process child" + childOf span(1) + attributes { } } } diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/testSqsNoReceiveTelemetry/groovy/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/SqsSuppressReceiveSpansTest.groovy b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/testSqsNoReceiveTelemetry/groovy/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/SqsSuppressReceiveSpansTest.groovy new file mode 100644 index 000000000000..05c3db6ca7d9 --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/testSqsNoReceiveTelemetry/groovy/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/SqsSuppressReceiveSpansTest.groovy @@ -0,0 +1,17 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.awssdk.v1_11 + +import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder +import io.opentelemetry.instrumentation.awssdk.v1_11.AbstractSqsSuppressReceiveSpansTest +import io.opentelemetry.instrumentation.test.AgentTestTrait + +class SqsSuppressReceiveSpansTest extends AbstractSqsSuppressReceiveSpansTest implements AgentTestTrait { + @Override + AmazonSQSAsyncClientBuilder configureClient(AmazonSQSAsyncClientBuilder client) { + return client + } +} diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library-autoconfigure/build.gradle.kts b/instrumentation/aws-sdk/aws-sdk-1.11/library-autoconfigure/build.gradle.kts index 8ca4a2fd6226..d8f590e52e8b 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/library-autoconfigure/build.gradle.kts +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library-autoconfigure/build.gradle.kts @@ -20,6 +20,26 @@ dependencies { testLibrary("com.amazonaws:aws-java-sdk-sqs:1.11.106") } -tasks.test { - systemProperty("otel.instrumentation.aws-sdk.experimental-span-attributes", "true") +tasks { + withType().configureEach { + systemProperty("otel.instrumentation.aws-sdk.experimental-span-attributes", "true") + } + + val testReceiveSpansDisabled by registering(Test::class) { + filter { + includeTestsMatching("SqsSuppressReceiveSpansTest") + } + include("**/SqsSuppressReceiveSpansTest.*") + } + + test { + filter { + excludeTestsMatching("SqsSuppressReceiveSpansTest") + } + jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true") + } + + check { + dependsOn(testReceiveSpansDisabled) + } } diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/autoconfigure/TracingRequestHandler.java b/instrumentation/aws-sdk/aws-sdk-1.11/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/autoconfigure/TracingRequestHandler.java index fa0fbe8d699d..904fefe56268 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/autoconfigure/TracingRequestHandler.java +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/autoconfigure/TracingRequestHandler.java @@ -23,6 +23,9 @@ public class TracingRequestHandler extends RequestHandler2 { .setCaptureExperimentalSpanAttributes( ConfigPropertiesUtil.getBoolean( "otel.instrumentation.aws-sdk.experimental-span-attributes", false)) + .setMessagingReceiveInstrumentationEnabled( + ConfigPropertiesUtil.getBoolean( + "otel.instrumentation.messaging.experimental.receive-telemetry.enabled", false)) .build() .newRequestHandler(); diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library-autoconfigure/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/instrumentor/SqsSuppressReceiveSpansTest.groovy b/instrumentation/aws-sdk/aws-sdk-1.11/library-autoconfigure/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/instrumentor/SqsSuppressReceiveSpansTest.groovy new file mode 100644 index 000000000000..9a33658390b2 --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library-autoconfigure/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/instrumentor/SqsSuppressReceiveSpansTest.groovy @@ -0,0 +1,17 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awssdk.v1_11.instrumentor + +import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder +import io.opentelemetry.instrumentation.awssdk.v1_11.AbstractSqsSuppressReceiveSpansTest +import io.opentelemetry.instrumentation.test.LibraryTestTrait + +class SqsSuppressReceiveSpansTest extends AbstractSqsSuppressReceiveSpansTest implements LibraryTestTrait { + @Override + AmazonSQSAsyncClientBuilder configureClient(AmazonSQSAsyncClientBuilder client) { + return client + } +} diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AwsSdkInstrumenterFactory.java b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AwsSdkInstrumenterFactory.java index dcf595ac8cb4..bf12e58a7775 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AwsSdkInstrumenterFactory.java +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AwsSdkInstrumenterFactory.java @@ -11,8 +11,12 @@ import com.amazonaws.Request; import com.amazonaws.Response; import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor; import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesExtractor; @@ -20,8 +24,10 @@ import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; import io.opentelemetry.instrumentation.api.instrumenter.rpc.RpcClientAttributesExtractor; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import javax.annotation.Nullable; final class AwsSdkInstrumenterFactory { private static final String INSTRUMENTATION_NAME = "io.opentelemetry.aws-sdk-1.11"; @@ -49,25 +55,91 @@ static Instrumenter, Response> requestInstrumenter( captureExperimentalSpanAttributes, spanName, SpanKindExtractor.alwaysClient(), - emptyList()); + emptyList(), + true); } - static Instrumenter, Response> consumerInstrumenter( - OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) { + static Instrumenter, Response> consumerReceiveInstrumenter( + OpenTelemetry openTelemetry, + boolean captureExperimentalSpanAttributes, + boolean messagingReceiveInstrumentationEnabled) { return sqsInstrumenter( - openTelemetry, MessageOperation.RECEIVE, captureExperimentalSpanAttributes); + openTelemetry, + MessageOperation.RECEIVE, + captureExperimentalSpanAttributes, + messagingReceiveInstrumentationEnabled); + } + + static Instrumenter consumerProcessInstrumenter( + OpenTelemetry openTelemetry, + boolean captureExperimentalSpanAttributes, + boolean messagingReceiveInstrumentationEnabled) { + MessageOperation operation = MessageOperation.PROCESS; + SqsProcessRequestAttributesGetter getter = SqsProcessRequestAttributesGetter.INSTANCE; + + InstrumenterBuilder builder = + Instrumenter.builder( + openTelemetry, + INSTRUMENTATION_NAME, + MessagingSpanNameExtractor.create(getter, operation)) + .addAttributesExtractors( + toProcessRequestExtractors( + captureExperimentalSpanAttributes + ? extendedAttributesExtractors + : defaultAttributesExtractors)) + .addAttributesExtractor( + MessagingAttributesExtractor.builder(getter, operation).build()); + + if (messagingReceiveInstrumentationEnabled) { + builder.addSpanLinksExtractor( + (spanLinks, parentContext, request) -> { + Context extracted = + SqsParentContext.ofSystemAttributes(request.getMessage().getAttributes()); + spanLinks.addLink(Span.fromContext(extracted).getSpanContext()); + }); + } + return builder.buildInstrumenter(SpanKindExtractor.alwaysConsumer()); + } + + private static List> toProcessRequestExtractors( + List, Response>> extractors) { + List> result = new ArrayList<>(); + for (AttributesExtractor, Response> extractor : extractors) { + result.add( + new AttributesExtractor() { + @Override + public void onStart( + AttributesBuilder attributes, + Context parentContext, + SqsProcessRequest sqsProcessRequest) { + extractor.onStart(attributes, parentContext, sqsProcessRequest.getRequest()); + } + + @Override + public void onEnd( + AttributesBuilder attributes, + Context context, + SqsProcessRequest sqsProcessRequest, + @Nullable Void unused, + @Nullable Throwable error) { + extractor.onEnd(attributes, context, sqsProcessRequest.getRequest(), null, error); + } + }); + } + return result; } static Instrumenter, Response> producerInstrumenter( OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) { return sqsInstrumenter( - openTelemetry, MessageOperation.PUBLISH, captureExperimentalSpanAttributes); + openTelemetry, MessageOperation.PUBLISH, captureExperimentalSpanAttributes, true); } private static Instrumenter, Response> sqsInstrumenter( OpenTelemetry openTelemetry, MessageOperation operation, - boolean captureExperimentalSpanAttributes) { + boolean captureExperimentalSpanAttributes, + boolean enabled) { SqsAttributesGetter getter = SqsAttributesGetter.INSTANCE; AttributesExtractor, Response> messagingAttributeExtractor = MessagingAttributesExtractor.builder(getter, operation).build(); @@ -79,7 +151,8 @@ private static Instrumenter, Response> sqsInstrumenter( operation == MessageOperation.PUBLISH ? SpanKindExtractor.alwaysProducer() : SpanKindExtractor.alwaysConsumer(), - singletonList(messagingAttributeExtractor)); + singletonList(messagingAttributeExtractor), + enabled); } private static Instrumenter, Response> createInstrumenter( @@ -87,7 +160,8 @@ private static Instrumenter, Response> createInstrumenter( boolean captureExperimentalSpanAttributes, SpanNameExtractor> spanNameExtractor, SpanKindExtractor> spanKindExtractor, - List, Response>> additionalAttributeExtractors) { + List, Response>> additionalAttributeExtractors, + boolean enabled) { return Instrumenter., Response>builder( openTelemetry, INSTRUMENTATION_NAME, spanNameExtractor) .addAttributesExtractors( @@ -95,6 +169,7 @@ private static Instrumenter, Response> createInstrumenter( ? extendedAttributesExtractors : defaultAttributesExtractors) .addAttributesExtractors(additionalAttributeExtractors) + .setEnabled(enabled) .buildInstrumenter(spanKindExtractor); } diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AwsSdkTelemetry.java b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AwsSdkTelemetry.java index e5d50f379303..b41ad3147f75 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AwsSdkTelemetry.java +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AwsSdkTelemetry.java @@ -45,16 +45,27 @@ public static AwsSdkTelemetryBuilder builder(OpenTelemetry openTelemetry) { } private final Instrumenter, Response> requestInstrumenter; - private final Instrumenter, Response> consumerInstrumenter; + private final Instrumenter, Response> consumerReceiveInstrumenter; + private final Instrumenter consumerProcessInstrumenter; private final Instrumenter, Response> producerInstrumenter; - AwsSdkTelemetry(OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) { + AwsSdkTelemetry( + OpenTelemetry openTelemetry, + boolean captureExperimentalSpanAttributes, + boolean messagingReceiveInstrumentationEnabled) { requestInstrumenter = AwsSdkInstrumenterFactory.requestInstrumenter( openTelemetry, captureExperimentalSpanAttributes); - consumerInstrumenter = - AwsSdkInstrumenterFactory.consumerInstrumenter( - openTelemetry, captureExperimentalSpanAttributes); + consumerReceiveInstrumenter = + AwsSdkInstrumenterFactory.consumerReceiveInstrumenter( + openTelemetry, + captureExperimentalSpanAttributes, + messagingReceiveInstrumentationEnabled); + consumerProcessInstrumenter = + AwsSdkInstrumenterFactory.consumerProcessInstrumenter( + openTelemetry, + captureExperimentalSpanAttributes, + messagingReceiveInstrumentationEnabled); producerInstrumenter = AwsSdkInstrumenterFactory.producerInstrumenter( openTelemetry, captureExperimentalSpanAttributes); @@ -66,6 +77,9 @@ public static AwsSdkTelemetryBuilder builder(OpenTelemetry openTelemetry) { */ public RequestHandler2 newRequestHandler() { return new TracingRequestHandler( - requestInstrumenter, consumerInstrumenter, producerInstrumenter); + requestInstrumenter, + consumerReceiveInstrumenter, + consumerProcessInstrumenter, + producerInstrumenter); } } diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AwsSdkTelemetryBuilder.java b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AwsSdkTelemetryBuilder.java index 7ecb5341c8d2..daadf8e97d69 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AwsSdkTelemetryBuilder.java +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AwsSdkTelemetryBuilder.java @@ -14,6 +14,7 @@ public class AwsSdkTelemetryBuilder { private final OpenTelemetry openTelemetry; private boolean captureExperimentalSpanAttributes; + private boolean messagingReceiveInstrumentationEnabled; AwsSdkTelemetryBuilder(OpenTelemetry openTelemetry) { this.openTelemetry = openTelemetry; @@ -31,10 +32,24 @@ public AwsSdkTelemetryBuilder setCaptureExperimentalSpanAttributes( return this; } + /** + * Set whether to capture the consumer message receive telemetry in messaging instrumentation. + * + *

Note that this will cause the consumer side to start a new trace, with only a span link + * connecting it to the producer trace. + */ + @CanIgnoreReturnValue + public AwsSdkTelemetryBuilder setMessagingReceiveInstrumentationEnabled( + boolean messagingReceiveInstrumentationEnabled) { + this.messagingReceiveInstrumentationEnabled = messagingReceiveInstrumentationEnabled; + return this; + } + /** * Returns a new {@link AwsSdkTelemetry} with the settings of this {@link AwsSdkTelemetryBuilder}. */ public AwsSdkTelemetry build() { - return new AwsSdkTelemetry(openTelemetry, captureExperimentalSpanAttributes); + return new AwsSdkTelemetry( + openTelemetry, captureExperimentalSpanAttributes, messagingReceiveInstrumentationEnabled); } } diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsAccess.java b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsAccess.java index 205bfeeb0653..0bfef4012a52 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsAccess.java +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsAccess.java @@ -8,7 +8,8 @@ import com.amazonaws.AmazonWebServiceRequest; import com.amazonaws.Request; import com.amazonaws.Response; -import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.internal.Timer; import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle; import java.util.Collections; import java.util.Map; @@ -22,8 +23,11 @@ private SqsAccess() {} static boolean afterResponse( Request request, Response response, - Instrumenter, Response> consumerInstrumenter) { - return enabled && SqsImpl.afterResponse(request, response, consumerInstrumenter); + Timer timer, + Context parentContext, + TracingRequestHandler requestHandler) { + return enabled + && SqsImpl.afterResponse(request, response, timer, parentContext, requestHandler); } @NoMuzzle diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsImpl.java b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsImpl.java index e4feddbb7466..e0b4b93ee49c 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsImpl.java +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsImpl.java @@ -9,13 +9,15 @@ import com.amazonaws.Request; import com.amazonaws.Response; import com.amazonaws.services.sqs.AmazonSQS; -import com.amazonaws.services.sqs.model.Message; import com.amazonaws.services.sqs.model.MessageAttributeValue; import com.amazonaws.services.sqs.model.ReceiveMessageRequest; import com.amazonaws.services.sqs.model.ReceiveMessageResult; import com.amazonaws.services.sqs.model.SendMessageRequest; import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil; +import io.opentelemetry.instrumentation.api.internal.Timer; +import java.lang.reflect.Field; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -33,33 +35,81 @@ private SqsImpl() {} static boolean afterResponse( Request request, Response response, - Instrumenter, Response> consumerInstrumenter) { + Timer timer, + Context parentContext, + TracingRequestHandler requestHandler) { if (response.getAwsResponse() instanceof ReceiveMessageResult) { - afterConsumerResponse(request, response, consumerInstrumenter); + afterConsumerResponse(request, response, timer, parentContext, requestHandler); return true; } return false; } - /** Create and close CONSUMER span for each message consumed. */ private static void afterConsumerResponse( Request request, Response response, - Instrumenter, Response> consumerInstrumenter) { + Timer timer, + Context parentContext, + TracingRequestHandler requestHandler) { ReceiveMessageResult receiveMessageResult = (ReceiveMessageResult) response.getAwsResponse(); - for (Message message : receiveMessageResult.getMessages()) { - createConsumerSpan(message, request, response, consumerInstrumenter); + if (receiveMessageResult.getMessages().isEmpty()) { + return; } + + Instrumenter, Response> consumerReceiveInstrumenter = + requestHandler.getConsumerReceiveInstrumenter(); + Instrumenter consumerProcessInstrumenter = + requestHandler.getConsumerProcessInstrumenter(); + + Context receiveContext = null; + if (timer != null && consumerReceiveInstrumenter.shouldStart(parentContext, request)) { + receiveContext = + InstrumenterUtil.startAndEnd( + consumerReceiveInstrumenter, + parentContext, + request, + response, + null, + timer.startTime(), + timer.now()); + } + + addTracing(receiveMessageResult, request, consumerProcessInstrumenter, receiveContext); } - private static void createConsumerSpan( - Message message, + private static final Field messagesField = getMessagesField(); + + private static Field getMessagesField() { + try { + Field field = ReceiveMessageResult.class.getDeclaredField("messages"); + field.setAccessible(true); + return field; + } catch (Exception e) { + return null; + } + } + + private static void addTracing( + ReceiveMessageResult receiveMessageResult, Request request, - Response response, - Instrumenter, Response> consumerInstrumenter) { - Context parentContext = SqsParentContext.ofSystemAttributes(message.getAttributes()); - Context context = consumerInstrumenter.start(parentContext, request); - consumerInstrumenter.end(context, request, response, null); + Instrumenter consumerProcessInstrumenter, + Context receiveContext) { + if (messagesField == null) { + return; + } + // replace Messages list inside ReceiveMessageResult with a tracing list that creates process + // spans as the list is iterated + try { + messagesField.set( + receiveMessageResult, + TracingList.wrap( + receiveMessageResult.getMessages(), + consumerProcessInstrumenter, + request, + receiveContext)); + } catch (IllegalAccessException ignored) { + // should not happen, we call setAccessible on the field + } } static boolean beforeMarshalling(AmazonWebServiceRequest rawRequest) { diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsMessage.java b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsMessage.java new file mode 100644 index 000000000000..9083f7293de5 --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsMessage.java @@ -0,0 +1,17 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awssdk.v1_11; + +import java.util.Map; + +/** + * A wrapper interface for {@link com.amazonaws.services.sqs.model.Message}. Using this wrapper + * avoids muzzle failure when sqs classes are not present. + */ +interface SqsMessage { + + Map getAttributes(); +} diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsMessageImpl.java b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsMessageImpl.java new file mode 100644 index 000000000000..83ef26fc0912 --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsMessageImpl.java @@ -0,0 +1,27 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awssdk.v1_11; + +import com.amazonaws.services.sqs.model.Message; +import java.util.Map; + +final class SqsMessageImpl implements SqsMessage { + + private final Message message; + + private SqsMessageImpl(Message message) { + this.message = message; + } + + static SqsMessage wrap(Message message) { + return new SqsMessageImpl(message); + } + + @Override + public Map getAttributes() { + return message.getAttributes(); + } +} diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsProcessRequest.java b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsProcessRequest.java new file mode 100644 index 000000000000..f537c9cf3b87 --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsProcessRequest.java @@ -0,0 +1,30 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awssdk.v1_11; + +import com.amazonaws.Request; + +final class SqsProcessRequest { + private final Request request; + private final SqsMessage message; + + private SqsProcessRequest(Request request, SqsMessage message) { + this.request = request; + this.message = message; + } + + public static SqsProcessRequest create(Request request, SqsMessage message) { + return new SqsProcessRequest(request, message); + } + + public Request getRequest() { + return request; + } + + public SqsMessage getMessage() { + return message; + } +} diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsProcessRequestAttributesGetter.java b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsProcessRequestAttributesGetter.java new file mode 100644 index 000000000000..564110e6a2f9 --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsProcessRequestAttributesGetter.java @@ -0,0 +1,64 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awssdk.v1_11; + +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; +import java.util.Collections; +import java.util.List; +import javax.annotation.Nullable; + +enum SqsProcessRequestAttributesGetter + implements MessagingAttributesGetter { + INSTANCE; + + @Override + public String getSystem(SqsProcessRequest request) { + return "AmazonSQS"; + } + + @Override + public String getDestination(SqsProcessRequest request) { + Object originalRequest = request.getRequest().getOriginalRequest(); + String queueUrl = RequestAccess.getQueueUrl(originalRequest); + int i = queueUrl.lastIndexOf('/'); + return i > 0 ? queueUrl.substring(i + 1) : null; + } + + @Override + public boolean isTemporaryDestination(SqsProcessRequest request) { + return false; + } + + @Override + @Nullable + public String getConversationId(SqsProcessRequest request) { + return null; + } + + @Override + @Nullable + public Long getMessagePayloadSize(SqsProcessRequest request) { + return null; + } + + @Override + @Nullable + public Long getMessagePayloadCompressedSize(SqsProcessRequest request) { + return null; + } + + @Override + @Nullable + public String getMessageId(SqsProcessRequest request, @Nullable Void response) { + return null; + } + + @Override + public List getMessageHeader(SqsProcessRequest request, String name) { + String value = SqsAccess.getMessageAttributes(request.getRequest()).get(name); + return value != null ? Collections.singletonList(value) : Collections.emptyList(); + } +} diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/TracingIterator.java b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/TracingIterator.java new file mode 100644 index 000000000000..6c476f38adb0 --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/TracingIterator.java @@ -0,0 +1,94 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awssdk.v1_11; + +import com.amazonaws.Request; +import com.amazonaws.services.sqs.model.Message; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import java.util.Iterator; +import javax.annotation.Nullable; + +class TracingIterator implements Iterator { + + private final Iterator delegateIterator; + private final Instrumenter instrumenter; + private final Request request; + private final Context receiveContext; + + /* + * Note: this may potentially create problems if this iterator is used from different threads. But + * at the moment we cannot do much about this. + */ + @Nullable private SqsProcessRequest currentRequest; + @Nullable private Context currentContext; + @Nullable private Scope currentScope; + + private TracingIterator( + Iterator delegateIterator, + Instrumenter instrumenter, + Request request, + Context receiveContext) { + this.delegateIterator = delegateIterator; + this.instrumenter = instrumenter; + this.request = request; + this.receiveContext = receiveContext; + } + + public static Iterator wrap( + Iterator delegateIterator, + Instrumenter instrumenter, + Request request, + Context receiveContext) { + return new TracingIterator(delegateIterator, instrumenter, request, receiveContext); + } + + @Override + public boolean hasNext() { + closeScopeAndEndSpan(); + return delegateIterator.hasNext(); + } + + @Override + public Message next() { + // in case they didn't call hasNext()... + closeScopeAndEndSpan(); + + // it's important not to suppress consumer span creation here using Instrumenter.shouldStart() + // because this instrumentation can leak the context and so there may be a leaked consumer span + // in the context, in which case it's important to overwrite the leaked span instead of + // suppressing the correct span + // (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947) + Message next = delegateIterator.next(); + if (next != null) { + Context parentContext = receiveContext; + if (parentContext == null) { + parentContext = SqsParentContext.ofSystemAttributes(next.getAttributes()); + } + + currentRequest = SqsProcessRequest.create(request, SqsMessageImpl.wrap(next)); + currentContext = instrumenter.start(parentContext, currentRequest); + currentScope = currentContext.makeCurrent(); + } + return next; + } + + private void closeScopeAndEndSpan() { + if (currentScope != null) { + currentScope.close(); + instrumenter.end(currentContext, currentRequest, null, null); + currentScope = null; + currentRequest = null; + currentContext = null; + } + } + + @Override + public void remove() { + delegateIterator.remove(); + } +} diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/TracingList.java b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/TracingList.java new file mode 100644 index 000000000000..769a52d8954d --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/TracingList.java @@ -0,0 +1,82 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awssdk.v1_11; + +import com.amazonaws.Request; +import com.amazonaws.internal.SdkInternalList; +import com.amazonaws.services.sqs.AmazonSQSClient; +import com.amazonaws.services.sqs.model.Message; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import java.util.Iterator; +import java.util.List; + +class TracingList extends SdkInternalList { + private static final long serialVersionUID = 1L; + + private final transient Instrumenter instrumenter; + private final transient Request request; + private final transient Context receiveContext; + private boolean firstIterator = true; + + private TracingList( + List list, + Instrumenter instrumenter, + Request request, + Context receiveContext) { + super(list); + this.instrumenter = instrumenter; + this.request = request; + this.receiveContext = receiveContext; + } + + public static SdkInternalList wrap( + List list, + Instrumenter instrumenter, + Request request, + Context receiveContext) { + return new TracingList(list, instrumenter, request, receiveContext); + } + + @Override + public Iterator iterator() { + Iterator it; + // We should only return one iterator with tracing. + // However, this is not thread-safe, but usually the first (hopefully only) traversal of + // List is performed in the same thread that called receiveMessage() + if (firstIterator && !inAwsClient()) { + it = TracingIterator.wrap(super.iterator(), instrumenter, request, receiveContext); + firstIterator = false; + } else { + it = super.iterator(); + } + + return it; + } + + private static boolean inAwsClient() { + for (Class caller : CallerClass.INSTANCE.getClassContext()) { + if (AmazonSQSClient.class == caller) { + return true; + } + } + return false; + } + + private Object writeReplace() { + // serialize this object to SdkInternalList + return new SdkInternalList<>(this); + } + + private static class CallerClass extends SecurityManager { + public static final CallerClass INSTANCE = new CallerClass(); + + @Override + public Class[] getClassContext() { + return super.getClassContext(); + } + } +} diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/TracingRequestHandler.java b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/TracingRequestHandler.java index ca559825fc3a..edf2747181bc 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/TracingRequestHandler.java +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/TracingRequestHandler.java @@ -11,12 +11,13 @@ import com.amazonaws.handlers.HandlerContextKey; import com.amazonaws.handlers.RequestHandler2; import com.google.errorprone.annotations.CanIgnoreReturnValue; +import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Context; import io.opentelemetry.context.ContextKey; import io.opentelemetry.contrib.awsxray.propagator.AwsXrayPropagator; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil; -import java.time.Instant; +import io.opentelemetry.instrumentation.api.internal.Timer; import javax.annotation.Nullable; /** Tracing Request Handler. */ @@ -24,21 +25,26 @@ final class TracingRequestHandler extends RequestHandler2 { static final HandlerContextKey CONTEXT = new HandlerContextKey<>(Context.class.getName()); - private static final ContextKey REQUEST_START_KEY = - ContextKey.named(TracingRequestHandler.class.getName() + ".RequestStart"); private static final ContextKey PARENT_CONTEXT_KEY = ContextKey.named(TracingRequestHandler.class.getName() + ".ParentContext"); + private static final ContextKey REQUEST_TIMER_KEY = + ContextKey.named(TracingRequestHandler.class.getName() + ".Timer"); + private static final ContextKey REQUEST_SPAN_SUPPRESSED_KEY = + ContextKey.named(TracingRequestHandler.class.getName() + ".RequestSpanSuppressed"); private final Instrumenter, Response> requestInstrumenter; - private final Instrumenter, Response> consumerInstrumenter; + private final Instrumenter, Response> consumerReceiveInstrumenter; + private final Instrumenter consumerProcessInstrumenter; private final Instrumenter, Response> producerInstrumenter; TracingRequestHandler( Instrumenter, Response> requestInstrumenter, - Instrumenter, Response> consumerInstrumenter, + Instrumenter, Response> consumerReceiveInstrumenter, + Instrumenter consumerProcessInstrumenter, Instrumenter, Response> producerInstrumenter) { this.requestInstrumenter = requestInstrumenter; - this.consumerInstrumenter = consumerInstrumenter; + this.consumerReceiveInstrumenter = consumerReceiveInstrumenter; + this.consumerProcessInstrumenter = consumerProcessInstrumenter; this.producerInstrumenter = producerInstrumenter; } @@ -64,17 +70,20 @@ public void beforeRequest(Request request) { // also suppress the span from the underlying http client. Request/http client span appears in a // separate trace from message producer/consumer spans if there is no parent span just having // a trace with only the request/http client span isn't useful. - if (Context.root() == parentContext + if (Span.fromContextOrNull(parentContext) == null && "com.amazonaws.services.sqs.model.ReceiveMessageRequest" .equals(request.getOriginalRequest().getClass().getName())) { Context context = InstrumenterUtil.suppressSpan(instrumenter, parentContext, request); - context = context.with(REQUEST_START_KEY, Instant.now()); + context = context.with(REQUEST_TIMER_KEY, Timer.start()); context = context.with(PARENT_CONTEXT_KEY, parentContext); + context = context.with(REQUEST_SPAN_SUPPRESSED_KEY, Boolean.TRUE); request.addHandlerContext(CONTEXT, context); return; } Context context = instrumenter.start(parentContext, request); + context = context.with(REQUEST_TIMER_KEY, Timer.start()); + context = context.with(PARENT_CONTEXT_KEY, parentContext); AwsXrayPropagator.getInstance().inject(context, request, HeaderSetter.INSTANCE); @@ -90,9 +99,26 @@ public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request return request; } + Instrumenter, Response> getConsumerReceiveInstrumenter() { + return consumerReceiveInstrumenter; + } + + Instrumenter getConsumerProcessInstrumenter() { + return consumerProcessInstrumenter; + } + @Override public void afterResponse(Request request, Response response) { - SqsAccess.afterResponse(request, response, consumerInstrumenter); + Context context = request.getHandlerContext(CONTEXT); + if (context == null) { + return; + } + Timer timer = context.get(REQUEST_TIMER_KEY); + // javaagent instrumentation activates scope for the request span, we need to use the context + // we stored before creating the request span to avoid making request span the parent of the + // sqs receive span + Context parentContext = context.get(PARENT_CONTEXT_KEY); + SqsAccess.afterResponse(request, response, timer, parentContext, this); finish(request, response, null); } @@ -111,15 +137,18 @@ private void finish(Request request, Response response, @Nullable Throwabl Instrumenter, Response> instrumenter = getInstrumenter(request); - // see beforeRequest, requestStart is only set when we skip creating request span for sqs + // see beforeRequest, request suppressed is only set when we skip creating request span for sqs // AmazonSQSClient.receiveMessage calls - Instant requestStart = context.get(REQUEST_START_KEY); - if (requestStart != null) { + if (Boolean.TRUE.equals(context.get(REQUEST_SPAN_SUPPRESSED_KEY))) { Context parentContext = context.get(PARENT_CONTEXT_KEY); + Timer timer = context.get(REQUEST_TIMER_KEY); // create request span if there was an error - if (error != null && requestInstrumenter.shouldStart(parentContext, request)) { + if (error != null + && parentContext != null + && timer != null + && requestInstrumenter.shouldStart(parentContext, request)) { InstrumenterUtil.startAndEnd( - instrumenter, parentContext, request, response, error, requestStart, Instant.now()); + instrumenter, parentContext, request, response, error, timer.startTime(), timer.now()); } return; } diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/SqsSuppressReceiveSpansTest.groovy b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/SqsSuppressReceiveSpansTest.groovy new file mode 100644 index 000000000000..471ffb571408 --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/SqsSuppressReceiveSpansTest.groovy @@ -0,0 +1,20 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awssdk.v1_11 + +import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder +import io.opentelemetry.instrumentation.test.LibraryTestTrait + +class SqsSuppressReceiveSpansTest extends AbstractSqsSuppressReceiveSpansTest implements LibraryTestTrait { + @Override + AmazonSQSAsyncClientBuilder configureClient(AmazonSQSAsyncClientBuilder client) { + return client.withRequestHandlers( + AwsSdkTelemetry.builder(getOpenTelemetry()) + .setCaptureExperimentalSpanAttributes(true) + .build() + .newRequestHandler()) + } +} diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/SqsTracingTest.groovy b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/SqsTracingTest.groovy index 8f1083b796e7..d069411b15ad 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/SqsTracingTest.groovy +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/SqsTracingTest.groovy @@ -14,6 +14,7 @@ class SqsTracingTest extends AbstractSqsTracingTest implements LibraryTestTrait return client.withRequestHandlers( AwsSdkTelemetry.builder(getOpenTelemetry()) .setCaptureExperimentalSpanAttributes(true) + .setMessagingReceiveInstrumentationEnabled(true) .build() .newRequestHandler()) } diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/AbstractSqsSuppressReceiveSpansTest.groovy b/instrumentation/aws-sdk/aws-sdk-1.11/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/AbstractSqsSuppressReceiveSpansTest.groovy new file mode 100644 index 000000000000..782412c5ad2b --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-1.11/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/AbstractSqsSuppressReceiveSpansTest.groovy @@ -0,0 +1,283 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awssdk.v1_11 + +import com.amazonaws.auth.AWSStaticCredentialsProvider +import com.amazonaws.auth.BasicAWSCredentials +import com.amazonaws.client.builder.AwsClientBuilder +import com.amazonaws.services.sqs.AmazonSQSAsyncClient +import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder +import com.amazonaws.services.sqs.model.ReceiveMessageRequest +import com.amazonaws.services.sqs.model.SendMessageRequest +import io.opentelemetry.instrumentation.test.InstrumentationSpecification +import io.opentelemetry.instrumentation.test.utils.PortUtils +import io.opentelemetry.semconv.SemanticAttributes +import org.elasticmq.rest.sqs.SQSRestServerBuilder +import spock.lang.Shared + +import static io.opentelemetry.api.trace.SpanKind.CLIENT +import static io.opentelemetry.api.trace.SpanKind.CONSUMER +import static io.opentelemetry.api.trace.SpanKind.PRODUCER + +abstract class AbstractSqsSuppressReceiveSpansTest extends InstrumentationSpecification { + + abstract AmazonSQSAsyncClientBuilder configureClient(AmazonSQSAsyncClientBuilder client) + + @Shared + def sqs + @Shared + AmazonSQSAsyncClient client + @Shared + int sqsPort + + def setupSpec() { + + sqsPort = PortUtils.findOpenPort() + sqs = SQSRestServerBuilder.withPort(sqsPort).withInterface("localhost").start() + println getClass().name + " SQS server started at: localhost:$sqsPort/" + + def credentials = new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x")) + def endpointConfiguration = new AwsClientBuilder.EndpointConfiguration("http://localhost:" + sqsPort, "elasticmq") + client = configureClient(AmazonSQSAsyncClient.asyncBuilder()).withCredentials(credentials).withEndpointConfiguration(endpointConfiguration).build() + } + + def cleanupSpec() { + if (sqs != null) { + sqs.stopAndWait() + } + } + + def "simple sqs producer-consumer services"() { + setup: + client.createQueue("testSdkSqs") + + when: + SendMessageRequest send = new SendMessageRequest("http://localhost:$sqsPort/000000000000/testSdkSqs", "{\"type\": \"hello\"}") + client.sendMessage(send) + def receiveMessageResult = client.receiveMessage("http://localhost:$sqsPort/000000000000/testSdkSqs") + receiveMessageResult.messages.each {message -> runWithSpan("process child") {}} + + then: + assertTraces(2) { + trace(0, 1) { + + span(0) { + name "SQS.CreateQueue" + kind CLIENT + hasNoParent() + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" "http://localhost:$sqsPort" + "aws.queue.name" "testSdkSqs" + "rpc.system" "aws-api" + "rpc.service" "AmazonSQS" + "rpc.method" "CreateQueue" + "http.method" "POST" + "http.status_code" 200 + "http.url" "http://localhost:$sqsPort" + "net.peer.name" "localhost" + "net.peer.port" sqsPort + "$SemanticAttributes.NET_PROTOCOL_NAME" "http" + "$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1" + "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long + } + } + } + trace(1, 3) { + span(0) { + name "testSdkSqs publish" + kind PRODUCER + hasNoParent() + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" "http://localhost:$sqsPort" + "aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs" + "rpc.system" "aws-api" + "rpc.method" "SendMessage" + "rpc.service" "AmazonSQS" + "http.method" "POST" + "http.status_code" 200 + "http.url" "http://localhost:$sqsPort" + "net.peer.name" "localhost" + "net.peer.port" sqsPort + "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS" + "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs" + "$SemanticAttributes.MESSAGING_OPERATION" "publish" + "$SemanticAttributes.NET_PROTOCOL_NAME" "http" + "$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1" + "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long + } + } + span(1) { + name "testSdkSqs process" + kind CONSUMER + childOf span(0) + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" "http://localhost:$sqsPort" + "rpc.method" "ReceiveMessage" + "aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs" + "rpc.system" "aws-api" + "rpc.service" "AmazonSQS" + "http.method" "POST" + "http.url" "http://localhost:$sqsPort" + "net.peer.name" "localhost" + "net.peer.port" sqsPort + "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS" + "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs" + "$SemanticAttributes.MESSAGING_OPERATION" "process" + } + } + span(2) { + name "process child" + childOf span(1) + attributes { + } + } + } + } + } + + def "simple sqs producer-consumer services with parent span"() { + setup: + client.createQueue("testSdkSqs") + + when: + SendMessageRequest send = new SendMessageRequest("http://localhost:$sqsPort/000000000000/testSdkSqs", "{\"type\": \"hello\"}") + client.sendMessage(send) + runWithSpan("parent") { + def receiveMessageResult = client.receiveMessage("http://localhost:$sqsPort/000000000000/testSdkSqs") + receiveMessageResult.messages.each {message -> runWithSpan("process child") {}} + } + + then: + assertTraces(3) { + trace(0, 1) { + + span(0) { + name "SQS.CreateQueue" + kind CLIENT + hasNoParent() + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" "http://localhost:$sqsPort" + "aws.queue.name" "testSdkSqs" + "rpc.system" "aws-api" + "rpc.service" "AmazonSQS" + "rpc.method" "CreateQueue" + "http.method" "POST" + "http.status_code" 200 + "http.url" "http://localhost:$sqsPort" + "net.peer.name" "localhost" + "net.peer.port" sqsPort + "$SemanticAttributes.NET_PROTOCOL_NAME" "http" + "$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1" + "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long + } + } + } + trace(1, 3) { + span(0) { + name "testSdkSqs publish" + kind PRODUCER + hasNoParent() + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" "http://localhost:$sqsPort" + "aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs" + "rpc.system" "aws-api" + "rpc.method" "SendMessage" + "rpc.service" "AmazonSQS" + "http.method" "POST" + "http.status_code" 200 + "http.url" "http://localhost:$sqsPort" + "net.peer.name" "localhost" + "net.peer.port" sqsPort + "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS" + "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs" + "$SemanticAttributes.MESSAGING_OPERATION" "publish" + "$SemanticAttributes.NET_PROTOCOL_NAME" "http" + "$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1" + "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long + } + } + span(1) { + name "testSdkSqs process" + kind CONSUMER + childOf span(0) + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" "http://localhost:$sqsPort" + "rpc.method" "ReceiveMessage" + "aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs" + "rpc.system" "aws-api" + "rpc.service" "AmazonSQS" + "http.method" "POST" + "http.url" "http://localhost:$sqsPort" + "net.peer.name" "localhost" + "net.peer.port" sqsPort + "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS" + "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs" + "$SemanticAttributes.MESSAGING_OPERATION" "process" + } + } + span(2) { + name "process child" + childOf span(1) + attributes { + } + } + } + /** + * This span represents HTTP "sending of receive message" operation. It's always single, while there can be multiple CONSUMER spans (one per consumed message). + * This one could be suppressed (by IF in TracingRequestHandler#beforeRequest but then HTTP instrumentation span would appear + */ + trace(2, 2) { + span(0) { + name "parent" + hasNoParent() + } + span(1) { + name "SQS.ReceiveMessage" + kind CLIENT + childOf span(0) + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" "http://localhost:$sqsPort" + "rpc.method" "ReceiveMessage" + "aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs" + "rpc.system" "aws-api" + "rpc.service" "AmazonSQS" + "http.method" "POST" + "http.status_code" 200 + "http.url" "http://localhost:$sqsPort" + "net.peer.name" "localhost" + "net.peer.port" sqsPort + "$SemanticAttributes.NET_PROTOCOL_NAME" "http" + "$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1" + "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long + } + } + } + } + } + + def "only adds attribute name once when request reused"() { + setup: + client.createQueue("testSdkSqs2") + + when: + SendMessageRequest send = new SendMessageRequest("http://localhost:$sqsPort/000000000000/testSdkSqs2", "{\"type\": \"hello\"}") + client.sendMessage(send) + ReceiveMessageRequest receive = new ReceiveMessageRequest("http://localhost:$sqsPort/000000000000/testSdkSqs2") + client.receiveMessage(receive) + client.sendMessage(send) + client.receiveMessage(receive) + + then: + receive.getAttributeNames() == ["AWSTraceHeader"] + } +} diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/AbstractSqsTracingTest.groovy b/instrumentation/aws-sdk/aws-sdk-1.11/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/AbstractSqsTracingTest.groovy index 0db2fe15eaf7..da70dd13740e 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/AbstractSqsTracingTest.groovy +++ b/instrumentation/aws-sdk/aws-sdk-1.11/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/AbstractSqsTracingTest.groovy @@ -14,6 +14,7 @@ import com.amazonaws.services.sqs.model.ReceiveMessageRequest import com.amazonaws.services.sqs.model.SendMessageRequest import io.opentelemetry.instrumentation.test.InstrumentationSpecification import io.opentelemetry.instrumentation.test.utils.PortUtils +import io.opentelemetry.sdk.trace.data.SpanData import io.opentelemetry.semconv.SemanticAttributes import org.elasticmq.rest.sqs.SQSRestServerBuilder import spock.lang.Shared @@ -57,10 +58,14 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification { when: SendMessageRequest send = new SendMessageRequest("http://localhost:$sqsPort/000000000000/testSdkSqs", "{\"type\": \"hello\"}") client.sendMessage(send) - client.receiveMessage("http://localhost:$sqsPort/000000000000/testSdkSqs") + def receiveMessageResult = client.receiveMessage("http://localhost:$sqsPort/000000000000/testSdkSqs") + receiveMessageResult.messages.each {message -> + runWithSpan("process child") {} + } then: - assertTraces(2) { + assertTraces(3) { + SpanData publishSpan trace(0, 1) { span(0) { @@ -85,7 +90,7 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification { } } } - trace(1, 2) { + trace(1, 1) { span(0) { name "testSdkSqs publish" kind PRODUCER @@ -110,10 +115,13 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification { "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long } } - span(1) { + publishSpan = span(0) + } + trace(2, 3) { + span(0) { name "testSdkSqs receive" kind CONSUMER - childOf span(0) + hasNoParent() attributes { "aws.agent" "java-aws-sdk" "aws.endpoint" "http://localhost:$sqsPort" @@ -134,6 +142,33 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification { "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long } } + span(1) { + name "testSdkSqs process" + kind CONSUMER + childOf span(0) + hasLink(publishSpan) + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" "http://localhost:$sqsPort" + "rpc.method" "ReceiveMessage" + "aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs" + "rpc.system" "aws-api" + "rpc.service" "AmazonSQS" + "http.method" "POST" + "http.url" "http://localhost:$sqsPort" + "net.peer.name" "localhost" + "net.peer.port" sqsPort + "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS" + "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs" + "$SemanticAttributes.MESSAGING_OPERATION" "process" + } + } + span(2) { + name "process child" + childOf span(1) + attributes { + } + } } } } @@ -146,11 +181,13 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification { SendMessageRequest send = new SendMessageRequest("http://localhost:$sqsPort/000000000000/testSdkSqs", "{\"type\": \"hello\"}") client.sendMessage(send) runWithSpan("parent") { - client.receiveMessage("http://localhost:$sqsPort/000000000000/testSdkSqs") + def receiveMessageResult = client.receiveMessage("http://localhost:$sqsPort/000000000000/testSdkSqs") + receiveMessageResult.messages.each {message -> runWithSpan("process child") {}} } then: assertTraces(3) { + SpanData publishSpan trace(0, 1) { span(0) { @@ -175,7 +212,7 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification { } } } - trace(1, 2) { + trace(1, 1) { span(0) { name "testSdkSqs publish" kind PRODUCER @@ -200,9 +237,16 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification { "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long } } + publishSpan = span(0) + } + trace(2, 5) { + span(0) { + name "parent" + hasNoParent() + } span(1) { - name "testSdkSqs receive" - kind CONSUMER + name "SQS.ReceiveMessage" + kind CLIENT childOf span(0) attributes { "aws.agent" "java-aws-sdk" @@ -216,27 +260,14 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification { "http.url" "http://localhost:$sqsPort" "net.peer.name" "localhost" "net.peer.port" sqsPort - "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS" - "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs" - "$SemanticAttributes.MESSAGING_OPERATION" "receive" "$SemanticAttributes.NET_PROTOCOL_NAME" "http" "$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1" "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long } } - } - /** - * This span represents HTTP "sending of receive message" operation. It's always single, while there can be multiple CONSUMER spans (one per consumed message). - * This one could be suppressed (by IF in TracingRequestHandler#beforeRequest but then HTTP instrumentation span would appear - */ - trace(2, 2) { - span(0) { - name "parent" - hasNoParent() - } - span(1) { - name "SQS.ReceiveMessage" - kind CLIENT + span(2) { + name "testSdkSqs receive" + kind CONSUMER childOf span(0) attributes { "aws.agent" "java-aws-sdk" @@ -250,11 +281,41 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification { "http.url" "http://localhost:$sqsPort" "net.peer.name" "localhost" "net.peer.port" sqsPort + "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS" + "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs" + "$SemanticAttributes.MESSAGING_OPERATION" "receive" "$SemanticAttributes.NET_PROTOCOL_NAME" "http" "$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1" "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long } } + span(3) { + name "testSdkSqs process" + kind CONSUMER + childOf span(2) + hasLink(publishSpan) + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" "http://localhost:$sqsPort" + "rpc.method" "ReceiveMessage" + "aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs" + "rpc.system" "aws-api" + "rpc.service" "AmazonSQS" + "http.method" "POST" + "http.url" "http://localhost:$sqsPort" + "net.peer.name" "localhost" + "net.peer.port" sqsPort + "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS" + "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs" + "$SemanticAttributes.MESSAGING_OPERATION" "process" + } + } + span(4) { + name "process child" + childOf span(3) + attributes { + } + } } } } diff --git a/instrumentation/camel-2.20/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apachecamel/aws/AwsConnector.java b/instrumentation/camel-2.20/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apachecamel/aws/AwsConnector.java index 962fac44f601..6743c4f60567 100644 --- a/instrumentation/camel-2.20/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apachecamel/aws/AwsConnector.java +++ b/instrumentation/camel-2.20/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apachecamel/aws/AwsConnector.java @@ -20,8 +20,10 @@ import com.amazonaws.services.sns.model.CreateTopicResult; import com.amazonaws.services.sqs.AmazonSQSAsyncClient; import com.amazonaws.services.sqs.model.GetQueueAttributesRequest; +import com.amazonaws.services.sqs.model.Message; import com.amazonaws.services.sqs.model.PurgeQueueRequest; import com.amazonaws.services.sqs.model.ReceiveMessageRequest; +import com.amazonaws.services.sqs.model.ReceiveMessageResult; import com.amazonaws.services.sqs.model.SendMessageRequest; import io.opentelemetry.instrumentation.test.utils.PortUtils; import java.util.Collections; @@ -146,7 +148,9 @@ void sendSampleMessage(String queueUrl) { void receiveMessage(String queueUrl) { logger.info("Receive message from queue {}", queueUrl); - sqsClient.receiveMessage(new ReceiveMessageRequest(queueUrl).withWaitTimeSeconds(20)); + ReceiveMessageResult receiveMessageResult = + sqsClient.receiveMessage(new ReceiveMessageRequest(queueUrl).withWaitTimeSeconds(20)); + for (Message ignored : receiveMessageResult.getMessages()) {} } void disconnect() { diff --git a/instrumentation/camel-2.20/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apachecamel/aws/AwsSpanAssertions.java b/instrumentation/camel-2.20/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apachecamel/aws/AwsSpanAssertions.java index 7001f7fcbc3f..ff7ecc69e471 100644 --- a/instrumentation/camel-2.20/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apachecamel/aws/AwsSpanAssertions.java +++ b/instrumentation/camel-2.20/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apachecamel/aws/AwsSpanAssertions.java @@ -44,7 +44,7 @@ static SpanDataAssert sqs( String rpcMethod; if (spanName.startsWith("SQS.")) { rpcMethod = spanName.substring(4); - } else if (spanName.endsWith("receive")) { + } else if (spanName.endsWith("process")) { rpcMethod = "ReceiveMessage"; } else if (spanName.endsWith("publish")) { rpcMethod = "SendMessage"; @@ -57,7 +57,6 @@ static SpanDataAssert sqs( Arrays.asList( equalTo(stringKey("aws.agent"), "java-aws-sdk"), satisfies(stringKey("aws.endpoint"), val -> val.isInstanceOf(String.class)), - equalTo(SemanticAttributes.HTTP_STATUS_CODE, 200), satisfies( stringKey("aws.queue.name"), val -> @@ -69,11 +68,6 @@ static SpanDataAssert sqs( val.satisfiesAnyOf( v -> assertThat(v).isEqualTo(queueUrl), v -> assertThat(v).isNull())), equalTo(SemanticAttributes.HTTP_METHOD, "POST"), - satisfies( - SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH, - val -> - val.satisfiesAnyOf( - v -> assertThat(v).isNull(), v -> assertThat(v).isInstanceOf(Long.class))), satisfies( SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH, val -> @@ -89,19 +83,34 @@ static SpanDataAssert sqs( val.satisfiesAnyOf( v -> assertThat(v).isNull(), v -> assertThat(v).isInstanceOf(Number.class))), - equalTo(SemanticAttributes.NET_PROTOCOL_NAME, "http"), - equalTo(SemanticAttributes.NET_PROTOCOL_VERSION, "1.1"), equalTo(stringKey("rpc.system"), "aws-api"), satisfies(stringKey("rpc.method"), stringAssert -> stringAssert.isEqualTo(rpcMethod)), equalTo(stringKey("rpc.service"), "AmazonSQS"))); - if (spanName.endsWith("receive") || spanName.endsWith("publish")) { + if (!spanName.endsWith("process")) { + attributeAssertions.addAll( + Arrays.asList( + equalTo(SemanticAttributes.HTTP_STATUS_CODE, 200), + satisfies( + SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH, + val -> + val.satisfiesAnyOf( + v -> assertThat(v).isNull(), + v -> assertThat(v).isInstanceOf(Long.class))), + equalTo(SemanticAttributes.NET_PROTOCOL_NAME, "http"), + equalTo(SemanticAttributes.NET_PROTOCOL_VERSION, "1.1"))); + } + if (spanName.endsWith("receive") + || spanName.endsWith("process") + || spanName.endsWith("publish")) { attributeAssertions.addAll( Arrays.asList( equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, queueName), equalTo(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS"))); if (spanName.endsWith("receive")) { attributeAssertions.add(equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive")); + } else if (spanName.endsWith("process")) { + attributeAssertions.add(equalTo(SemanticAttributes.MESSAGING_OPERATION, "process")); } else if (spanName.endsWith("publish")) { attributeAssertions.add(equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish")); } diff --git a/instrumentation/camel-2.20/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apachecamel/aws/SqsCamelTest.java b/instrumentation/camel-2.20/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apachecamel/aws/SqsCamelTest.java index 3e0849a22460..b47bcae7f807 100644 --- a/instrumentation/camel-2.20/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apachecamel/aws/SqsCamelTest.java +++ b/instrumentation/camel-2.20/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apachecamel/aws/SqsCamelTest.java @@ -60,7 +60,7 @@ void camelSqsProducerToCamelSqsConsumer() { .hasParent(trace.getSpan(1)), span -> AwsSpanAssertions.sqs( - span, "sqsCamelTest receive", queueUrl, queueName, SpanKind.CONSUMER) + span, "sqsCamelTest process", queueUrl, queueName, SpanKind.CONSUMER) .hasParent(trace.getSpan(2)), span -> CamelSpanAssertions.sqsConsume(span, queueName).hasParent(trace.getSpan(2))), @@ -95,7 +95,7 @@ void awsSdkSqsProducerToCamelSqsConsumer() { .hasNoParent(), span -> AwsSpanAssertions.sqs( - span, "sqsCamelTest receive", queueUrl, queueName, SpanKind.CONSUMER) + span, "sqsCamelTest process", queueUrl, queueName, SpanKind.CONSUMER) .hasParent(trace.getSpan(0)), span -> CamelSpanAssertions.sqsConsume(span, queueName).hasParent(trace.getSpan(0))), @@ -137,7 +137,7 @@ void camelSqsProducerToAwsSdkSqsConsumer() { span -> AwsSpanAssertions.sqs( span, - "sqsCamelTestSdkConsumer receive", + "sqsCamelTestSdkConsumer process", queueUrl, queueName, SpanKind.CONSUMER)