From d5be71a5f3f75c93eb308c6809d8c8bb2629eb4e Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Wed, 11 Oct 2023 12:43:57 +0300 Subject: [PATCH] Generate only consumer span for sqs receive message --- .../api/instrumenter/Instrumenter.java | 11 ++ .../api/internal/InstrumenterAccess.java | 3 + .../api/internal/InstrumenterUtil.java | 5 + .../src/test/groovy/S3TracingTest.groovy | 130 ++---------------- .../src/test/groovy/SnsTracingTest.groovy | 29 +--- .../awssdk/v1_11/TracingRequestHandler.java | 41 ++++++ .../v1_11/AbstractSqsTracingTest.groovy | 93 ++++++++++++- .../v2_2/TracingExecutionInterceptor.java | 62 +++++++-- .../v2_2/AbstractAws2SqsTracingTest.groovy | 111 ++++++++------- .../apachecamel/aws/SqsCamelTest.java | 20 +-- 10 files changed, 275 insertions(+), 230 deletions(-) diff --git a/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/Instrumenter.java b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/Instrumenter.java index b56874a2cd81..97f523bea61f 100644 --- a/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/Instrumenter.java +++ b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/Instrumenter.java @@ -268,6 +268,17 @@ public Context startAndEnd( return instrumenter.startAndEnd( parentContext, request, response, error, startTime, endTime); } + + @Override + public Context suppressSpan( + Instrumenter instrumenter, + Context parentContext, + REQUEST request) { + SpanKind spanKind = instrumenter.spanKindExtractor.extract(request); + + return instrumenter.spanSuppressor.storeInContext( + parentContext, spanKind, Span.getInvalid()); + } }); } } diff --git a/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/internal/InstrumenterAccess.java b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/internal/InstrumenterAccess.java index 2589ea56cfb3..6b899b9f1637 100644 --- a/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/internal/InstrumenterAccess.java +++ b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/internal/InstrumenterAccess.java @@ -24,4 +24,7 @@ Context startAndEnd( @Nullable Throwable error, Instant startTime, Instant endTime); + + Context suppressSpan( + Instrumenter instrumenter, Context parentContext, REQUEST request); } diff --git a/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/internal/InstrumenterUtil.java b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/internal/InstrumenterUtil.java index 4962d69b9f0e..24648b4b2fb6 100644 --- a/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/internal/InstrumenterUtil.java +++ b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/internal/InstrumenterUtil.java @@ -45,6 +45,11 @@ public static Context startAndEnd( instrumenter, parentContext, request, response, error, startTime, endTime); } + public static Context suppressSpan( + Instrumenter instrumenter, Context parentContext, REQUEST request) { + return instrumenterAccess.suppressSpan(instrumenter, parentContext, request); + } + public static Instrumenter buildUpstreamInstrumenter( InstrumenterBuilder builder, TextMapGetter getter, diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/S3TracingTest.groovy b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/S3TracingTest.groovy index c192a138daf0..e935535ed540 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/S3TracingTest.groovy +++ b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/S3TracingTest.groovy @@ -42,7 +42,7 @@ class S3TracingTest extends AgentInstrumentationSpecification { awsConnector.purgeQueue(queueUrl) then: - assertTraces(12) { + assertTraces(10) { trace(0, 1) { span(0) { @@ -168,31 +168,7 @@ class S3TracingTest extends AgentInstrumentationSpecification { } } } - trace(5, 1) { - span(0) { - name "SQS.ReceiveMessage" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "rpc.method" "ReceiveMessage" - "aws.queue.url" queueUrl - "rpc.system" "aws-api" - "rpc.service" "AmazonSQS" - "http.method" "POST" - "http.status_code" 200 - "http.url" String - "net.peer.name" String - "$SemanticAttributes.NET_PROTOCOL_NAME" "http" - "$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1" - "net.peer.port" { it == null || Number } - "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long } - "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long } - } - } - } - trace(6, 2) { + trace(5, 2) { span(0) { name "S3.PutObject" kind CLIENT @@ -239,36 +215,7 @@ class S3TracingTest extends AgentInstrumentationSpecification { } } } - - /** - * This span represents HTTP "sending of receive message" operation. It's always single, while there can be multiple CONSUMER spans (one per consumed message). - * This one could be suppressed (by IF in TracingRequestHandler#beforeRequest but then HTTP instrumentation span would appear - */ - trace(7, 1) { - span(0) { - name "SQS.ReceiveMessage" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "rpc.method" "ReceiveMessage" - "aws.queue.url" queueUrl - "rpc.system" "aws-api" - "rpc.service" "AmazonSQS" - "http.method" "POST" - "http.status_code" 200 - "http.url" String - "net.peer.name" String - "$SemanticAttributes.NET_PROTOCOL_NAME" "http" - "$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1" - "net.peer.port" { it == null || Number } - "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long } - "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long } - } - } - } - trace(8, 1) { + trace(6, 1) { span(0) { name "S3.ListObjects" kind CLIENT @@ -292,7 +239,7 @@ class S3TracingTest extends AgentInstrumentationSpecification { } } } - trace(9, 1) { + trace(7, 1) { span(0) { name "S3.DeleteObject" kind CLIENT @@ -316,7 +263,7 @@ class S3TracingTest extends AgentInstrumentationSpecification { } } } - trace(10, 1) { + trace(8, 1) { span(0) { name "S3.DeleteBucket" kind CLIENT @@ -340,7 +287,7 @@ class S3TracingTest extends AgentInstrumentationSpecification { } } } - trace(11, 1) { + trace(9, 1) { span(0) { name "SQS.PurgeQueue" kind CLIENT @@ -393,7 +340,7 @@ class S3TracingTest extends AgentInstrumentationSpecification { awsConnector.purgeQueue(queueUrl) then: - assertTraces(16) { + assertTraces(14) { trace(0, 1) { span(0) { name "SQS.CreateQueue" @@ -583,32 +530,7 @@ class S3TracingTest extends AgentInstrumentationSpecification { } } } - // test even receive trace(8, 1) { - span(0) { - name "SQS.ReceiveMessage" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "rpc.method" "ReceiveMessage" - "aws.queue.url" queueUrl - "rpc.system" "aws-api" - "rpc.service" "AmazonSQS" - "http.method" "POST" - "http.status_code" 200 - "http.url" String - "net.peer.name" String - "$SemanticAttributes.NET_PROTOCOL_NAME" "http" - "$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1" - "net.peer.port" { it == null || Number } - "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long } - "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long } - } - } - } - trace(9, 1) { span(0) { name "S3.PutObject" kind CLIENT @@ -632,35 +554,7 @@ class S3TracingTest extends AgentInstrumentationSpecification { } } } - /** - * This span represents HTTP "sending of receive message" operation. It's always single, while there can be multiple CONSUMER spans (one per consumed message). - * This one could be suppressed (by IF in TracingRequestHandler#beforeRequest but then HTTP instrumentation span would appear - */ - trace(10, 1) { - span(0) { - name "SQS.ReceiveMessage" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "rpc.method" "ReceiveMessage" - "aws.queue.url" queueUrl - "rpc.system" "aws-api" - "rpc.service" "AmazonSQS" - "http.method" "POST" - "http.status_code" 200 - "http.url" String - "net.peer.name" String - "$SemanticAttributes.NET_PROTOCOL_NAME" "http" - "$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1" - "net.peer.port" { it == null || Number } - "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long } - "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long } - } - } - } - trace(11, 1) { + trace(9, 1) { span(0) { name "SQS.ReceiveMessage" kind CONSUMER @@ -685,7 +579,7 @@ class S3TracingTest extends AgentInstrumentationSpecification { } } } - trace(12, 1) { + trace(10, 1) { span(0) { name "S3.ListObjects" kind CLIENT @@ -709,7 +603,7 @@ class S3TracingTest extends AgentInstrumentationSpecification { } } } - trace(13, 1) { + trace(11, 1) { span(0) { name "S3.DeleteObject" kind CLIENT @@ -733,7 +627,7 @@ class S3TracingTest extends AgentInstrumentationSpecification { } } } - trace(14, 1) { + trace(12, 1) { span(0) { name "S3.DeleteBucket" kind CLIENT @@ -757,7 +651,7 @@ class S3TracingTest extends AgentInstrumentationSpecification { } } } - trace(15, 1) { + trace(13, 1) { span(0) { name "SQS.PurgeQueue" kind CLIENT diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/SnsTracingTest.groovy b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/SnsTracingTest.groovy index b6fd2f5d70a8..31760b3b7206 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/SnsTracingTest.groovy +++ b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/SnsTracingTest.groovy @@ -35,7 +35,7 @@ class SnsTracingTest extends AgentInstrumentationSpecification { awsConnector.receiveMessage(queueUrl) then: - assertTraces(7) { + assertTraces(6) { trace(0, 1) { span(0) { @@ -198,33 +198,6 @@ class SnsTracingTest extends AgentInstrumentationSpecification { } } } - /** - * This span represents HTTP "sending of receive message" operation. It's always single, while there can be multiple CONSUMER spans (one per consumed message). - * This one could be suppressed (by IF in TracingRequestHandler#beforeRequest but then HTTP instrumentation span would appear - */ - trace(6, 1) { - span(0) { - name "SQS.ReceiveMessage" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "aws.queue.url" queueUrl - "rpc.system" "aws-api" - "rpc.service" "AmazonSQS" - "rpc.method" "ReceiveMessage" - "http.method" "POST" - "http.status_code" 200 - "http.url" String - "net.peer.name" String - "$SemanticAttributes.NET_PROTOCOL_NAME" "http" - "$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1" - "net.peer.port" { it == null || Number } - "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long - } - } - } } } } diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/TracingRequestHandler.java b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/TracingRequestHandler.java index 11d2f3a6fed3..1e6507e1c110 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/TracingRequestHandler.java +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/TracingRequestHandler.java @@ -12,8 +12,11 @@ import com.amazonaws.handlers.RequestHandler2; import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.opentelemetry.context.Context; +import io.opentelemetry.context.ContextKey; import io.opentelemetry.contrib.awsxray.propagator.AwsXrayPropagator; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil; +import java.time.Instant; import javax.annotation.Nullable; /** Tracing Request Handler. */ @@ -21,6 +24,10 @@ final class TracingRequestHandler extends RequestHandler2 { static final HandlerContextKey CONTEXT = new HandlerContextKey<>(Context.class.getName()); + private static final ContextKey REQUEST_START_KEY = + ContextKey.named(TracingRequestHandler.class.getName() + ".RequestStart"); + private static final ContextKey PARENT_CONTEXT_KEY = + ContextKey.named(TracingRequestHandler.class.getName() + ".ParentContext"); private final Instrumenter, Response> requestInstrumenter; private final Instrumenter, Response> consumerInstrumenter; @@ -47,6 +54,21 @@ public void beforeRequest(Request request) { if (!requestInstrumenter.shouldStart(parentContext, request)) { return; } + + // Skip creating request span for AmazonSQSClient.receiveMessage if there is no parent span and + // also suppress the span from the underlying http client. Request/http client span appears in a + // separate trace from message producer/consumer spans if there is no parent span just having + // a trace with only the request/http client span isn't useful. + if (Context.root() == parentContext + && "com.amazonaws.services.sqs.model.ReceiveMessageRequest" + .equals(request.getOriginalRequest().getClass().getName())) { + Context context = InstrumenterUtil.suppressSpan(requestInstrumenter, parentContext, request); + context = context.with(REQUEST_START_KEY, Instant.now()); + context = context.with(PARENT_CONTEXT_KEY, parentContext); + request.addHandlerContext(CONTEXT, context); + return; + } + Context context = requestInstrumenter.start(parentContext, request); AwsXrayPropagator.getInstance().inject(context, request, HeaderSetter.INSTANCE); @@ -81,6 +103,25 @@ private void finish(Request request, Response response, @Nullable Throwabl return; } request.addHandlerContext(CONTEXT, null); + + // see beforeRequest, requestStart is only set when we skip creating request span for sqs + // AmazonSQSClient.receiveMessage calls + Instant requestStart = context.get(REQUEST_START_KEY); + if (requestStart != null) { + // create request span if there was an error + if (error != null) { + InstrumenterUtil.startAndEnd( + requestInstrumenter, + context.get(PARENT_CONTEXT_KEY), + request, + response, + error, + requestStart, + Instant.now()); + } + return; + } + requestInstrumenter.end(context, request, response, error); } } diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/AbstractSqsTracingTest.groovy b/instrumentation/aws-sdk/aws-sdk-1.11/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/AbstractSqsTracingTest.groovy index 1a26fa81e4fa..781d63014255 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/AbstractSqsTracingTest.groovy +++ b/instrumentation/aws-sdk/aws-sdk-1.11/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/AbstractSqsTracingTest.groovy @@ -59,6 +59,91 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification { client.sendMessage(send) client.receiveMessage("http://localhost:$sqsPort/000000000000/testSdkSqs") + then: + assertTraces(2) { + trace(0, 1) { + + span(0) { + name "SQS.CreateQueue" + kind CLIENT + hasNoParent() + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" "http://localhost:$sqsPort" + "aws.queue.name" "testSdkSqs" + "rpc.system" "aws-api" + "rpc.service" "AmazonSQS" + "rpc.method" "CreateQueue" + "http.method" "POST" + "http.status_code" 200 + "http.url" "http://localhost:$sqsPort" + "net.peer.name" "localhost" + "net.peer.port" sqsPort + "$SemanticAttributes.NET_PROTOCOL_NAME" "http" + "$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1" + "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long + } + } + } + trace(1, 2) { + span(0) { + name "SQS.SendMessage" + kind PRODUCER + hasNoParent() + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" "http://localhost:$sqsPort" + "aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs" + "rpc.system" "aws-api" + "rpc.method" "SendMessage" + "rpc.service" "AmazonSQS" + "http.method" "POST" + "http.status_code" 200 + "http.url" "http://localhost:$sqsPort" + "net.peer.name" "localhost" + "net.peer.port" sqsPort + "$SemanticAttributes.NET_PROTOCOL_NAME" "http" + "$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1" + "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long + } + } + span(1) { + name "SQS.ReceiveMessage" + kind CONSUMER + childOf span(0) + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" "http://localhost:$sqsPort" + "rpc.method" "ReceiveMessage" + "aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs" + "rpc.system" "aws-api" + "rpc.service" "AmazonSQS" + "http.method" "POST" + "http.status_code" 200 + "http.url" "http://localhost:$sqsPort" + "$SemanticAttributes.USER_AGENT_ORIGINAL" String + "net.peer.name" "localhost" + "net.peer.port" sqsPort + "$SemanticAttributes.NET_PROTOCOL_NAME" "http" + "$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1" + "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long + } + } + } + } + } + + def "simple sqs producer-consumer services with parent span"() { + setup: + client.createQueue("testSdkSqs") + + when: + SendMessageRequest send = new SendMessageRequest("http://localhost:$sqsPort/000000000000/testSdkSqs", "{\"type\": \"hello\"}") + client.sendMessage(send) + runWithSpan("parent") { + client.receiveMessage("http://localhost:$sqsPort/000000000000/testSdkSqs") + } + then: assertTraces(3) { trace(0, 1) { @@ -134,11 +219,15 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification { * This span represents HTTP "sending of receive message" operation. It's always single, while there can be multiple CONSUMER spans (one per consumed message). * This one could be suppressed (by IF in TracingRequestHandler#beforeRequest but then HTTP instrumentation span would appear */ - trace(2, 1) { + trace(2, 2) { span(0) { + name "parent" + hasNoParent() + } + span(1) { name "SQS.ReceiveMessage" kind CLIENT - hasNoParent() + childOf span(0) attributes { "aws.agent" "java-aws-sdk" "aws.endpoint" "http://localhost:$sqsPort" diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/TracingExecutionInterceptor.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/TracingExecutionInterceptor.java index 89f6d4cc527a..cf9b58f220fc 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/TracingExecutionInterceptor.java +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/TracingExecutionInterceptor.java @@ -15,12 +15,14 @@ import io.opentelemetry.context.propagation.TextMapPropagator; import io.opentelemetry.contrib.awsxray.propagator.AwsXrayPropagator; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil; import io.opentelemetry.semconv.SemanticAttributes; import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.io.InputStreamReader; import java.nio.charset.Charset; +import java.time.Instant; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -43,16 +45,18 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor { // the class name is part of the attribute name, so that it will be shaded when used in javaagent // instrumentation, and won't conflict with usage outside javaagent instrumentation - static final ExecutionAttribute CONTEXT_ATTRIBUTE = + private static final ExecutionAttribute CONTEXT_ATTRIBUTE = new ExecutionAttribute<>(TracingExecutionInterceptor.class.getName() + ".Context"); - static final ExecutionAttribute SCOPE_ATTRIBUTE = + private static final ExecutionAttribute SCOPE_ATTRIBUTE = new ExecutionAttribute<>(TracingExecutionInterceptor.class.getName() + ".Scope"); - static final ExecutionAttribute AWS_SDK_REQUEST_ATTRIBUTE = + private static final ExecutionAttribute AWS_SDK_REQUEST_ATTRIBUTE = new ExecutionAttribute<>(TracingExecutionInterceptor.class.getName() + ".AwsSdkRequest"); static final ExecutionAttribute SDK_HTTP_REQUEST_ATTRIBUTE = new ExecutionAttribute<>(TracingExecutionInterceptor.class.getName() + ".SdkHttpRequest"); static final ExecutionAttribute SDK_REQUEST_ATTRIBUTE = new ExecutionAttribute<>(TracingExecutionInterceptor.class.getName() + ".SdkRequest"); + private static final ExecutionAttribute REQUEST_FINISHER_ATTRIBUTE = + new ExecutionAttribute<>(TracingExecutionInterceptor.class.getName() + ".RequestFinisher"); private final Instrumenter requestInstrumenter; private final Instrumenter consumerInstrumenter; @@ -120,9 +124,40 @@ public SdkRequest modifyRequest( return request; } - io.opentelemetry.context.Context otelContext = - requestInstrumenter.start(parentOtelContext, executionAttributes); + RequestSpanFinisher requestFinisher; + io.opentelemetry.context.Context otelContext; + Instant requestStart = Instant.now(); + // Skip creating request span for SqsClient.receiveMessage if there is no parent span and also + // suppress the span from the underlying http client. Request/http client span appears in a + // separate trace from message producer/consumer spans if there is no parent span just having + // a trace with only the request/http client span isn't useful. + if (parentOtelContext == io.opentelemetry.context.Context.root() + && "software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest" + .equals(request.getClass().getName())) { + otelContext = + InstrumenterUtil.suppressSpan( + requestInstrumenter, parentOtelContext, executionAttributes); + requestFinisher = + (otelContext12, executionAttributes12, response, exception) -> { + // generate request span when there was an error + if (exception != null) { + InstrumenterUtil.startAndEnd( + requestInstrumenter, + parentOtelContext, + executionAttributes12, + response, + exception, + requestStart, + Instant.now()); + } + }; + } else { + otelContext = requestInstrumenter.start(parentOtelContext, executionAttributes); + requestFinisher = requestInstrumenter::end; + } + executionAttributes.putAttribute(CONTEXT_ATTRIBUTE, otelContext); + executionAttributes.putAttribute(REQUEST_FINISHER_ATTRIBUTE, requestFinisher); if (executionAttributes .getAttribute(SdkExecutionAttribute.CLIENT_TYPE) .equals(ClientType.SYNC)) { @@ -140,7 +175,7 @@ public SdkRequest modifyRequest( populateRequestAttributes(span, awsSdkRequest, context.request(), executionAttributes); } } catch (Throwable throwable) { - requestInstrumenter.end(otelContext, executionAttributes, null, throwable); + requestFinisher.finish(otelContext, executionAttributes, null, throwable); clearAttributes(executionAttributes); throw throwable; } @@ -290,7 +325,8 @@ public void afterExecution( onHttpResponseAvailable( executionAttributes, otelContext, Span.fromContext(otelContext), httpResponse); - requestInstrumenter.end(otelContext, executionAttributes, httpResponse, null); + RequestSpanFinisher finisher = executionAttributes.getAttribute(REQUEST_FINISHER_ATTRIBUTE); + finisher.finish(otelContext, executionAttributes, httpResponse, null); } clearAttributes(executionAttributes); } @@ -353,7 +389,8 @@ public void onExecutionFailure( Context.FailedExecution context, ExecutionAttributes executionAttributes) { io.opentelemetry.context.Context otelContext = getContext(executionAttributes); if (otelContext != null) { - requestInstrumenter.end(otelContext, executionAttributes, null, context.exception()); + RequestSpanFinisher finisher = executionAttributes.getAttribute(REQUEST_FINISHER_ATTRIBUTE); + finisher.finish(otelContext, executionAttributes, null, context.exception()); } clearAttributes(executionAttributes); } @@ -366,6 +403,7 @@ private static void clearAttributes(ExecutionAttributes executionAttributes) { executionAttributes.putAttribute(CONTEXT_ATTRIBUTE, null); executionAttributes.putAttribute(AWS_SDK_REQUEST_ATTRIBUTE, null); executionAttributes.putAttribute(SDK_HTTP_REQUEST_ATTRIBUTE, null); + executionAttributes.putAttribute(REQUEST_FINISHER_ATTRIBUTE, null); } /** @@ -375,4 +413,12 @@ private static void clearAttributes(ExecutionAttributes executionAttributes) { static io.opentelemetry.context.Context getContext(ExecutionAttributes attributes) { return attributes.getAttribute(CONTEXT_ATTRIBUTE); } + + private interface RequestSpanFinisher { + void finish( + io.opentelemetry.context.Context otelContext, + ExecutionAttributes executionAttributes, + SdkHttpResponse response, + Throwable exception); + } } diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsTracingTest.groovy b/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsTracingTest.groovy index 48ca364715c7..7fd36ed3bd74 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsTracingTest.groovy +++ b/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsTracingTest.groovy @@ -110,8 +110,8 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification { } } - void assertSqsTraces() { - assertTraces(3) { + void assertSqsTraces(withParent = false) { + assertTraces(2 + (withParent ? 1 : 0)) { trace(0, 1) { span(0) { @@ -179,31 +179,37 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification { } } } - /** - * This span represents HTTP "sending of receive message" operation. It's always single, while there can be multiple CONSUMER spans (one per consumed message). - * This one could be suppressed (by IF in TracingRequestHandler#beforeRequest but then HTTP instrumentation span would appear - */ - trace(2, 1) { - span(0) { - name "Sqs.ReceiveMessage" - kind CLIENT - hasNoParent() - hasNoLinks() - attributes { - "aws.agent" "java-aws-sdk" - "aws.requestId" "00000000-0000-0000-0000-000000000000" - "rpc.method" "ReceiveMessage" - "aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs" - "rpc.system" "aws-api" - "rpc.service" "Sqs" - "http.method" "POST" - "http.status_code" 200 - "http.url" { it.startsWith("http://localhost:$sqsPort") } - "$SemanticAttributes.USER_AGENT_ORIGINAL" String - "net.peer.name" "localhost" - "net.peer.port" sqsPort - "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long } - "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long } + if (withParent) { + /** + * This span represents HTTP "sending of receive message" operation. It's always single, while there can be multiple CONSUMER spans (one per consumed message). + * This one could be suppressed (by IF in TracingRequestHandler#beforeRequest but then HTTP instrumentation span would appear + */ + trace(2, 2) { + span(0) { + name "parent" + hasNoParent() + } + span(1) { + name "Sqs.ReceiveMessage" + kind CLIENT + childOf span(0) + hasNoLinks() + attributes { + "aws.agent" "java-aws-sdk" + "aws.requestId" "00000000-0000-0000-0000-000000000000" + "rpc.method" "ReceiveMessage" + "aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs" + "rpc.system" "aws-api" + "rpc.service" "Sqs" + "http.method" "POST" + "http.status_code" 200 + "http.url" { it.startsWith("http://localhost:$sqsPort") } + "$SemanticAttributes.USER_AGENT_ORIGINAL" String + "net.peer.name" "localhost" + "net.peer.port" sqsPort + "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long } + "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long } + } } } } @@ -228,6 +234,26 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification { assertSqsTraces() } + def "simple sqs producer-consumer services with parent: sync"() { + setup: + def builder = SqsClient.builder() + configureSdkClient(builder) + def client = builder.build() + + client.createQueue(createQueueRequest) + + when: + client.sendMessage(sendMessageRequest) + + def resp = runWithSpan("parent") { + client.receiveMessage(receiveMessageRequest) + } + + then: + resp.messages().size() == 1 + assertSqsTraces(true) + } + def "simple sqs producer-consumer services: async"() { setup: def builder = SqsAsyncClient.builder() @@ -266,7 +292,7 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification { // +2: 3 messages, 2x traceparent, 1x not injected due to too many attrs totalAttrs == 18 + (sqsAttributeInjectionEnabled ? 2 : 0) - assertTraces(xrayInjectionEnabled ? 3 : 4) { + assertTraces(xrayInjectionEnabled ? 2 : 3) { trace(0, 1) { span(0) { @@ -320,35 +346,8 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification { } } } - /** - * This span represents HTTP "sending of receive message" operation. It's always single, while there can be multiple CONSUMER spans (one per consumed message). - * This one could be suppressed (by IF in TracingRequestHandler#beforeRequest but then HTTP instrumentation span would appear - */ - trace(2, 1) { - span(0) { - name "Sqs.ReceiveMessage" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.requestId" "00000000-0000-0000-0000-000000000000" - "rpc.method" "ReceiveMessage" - "aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs" - "rpc.system" "aws-api" - "rpc.service" "Sqs" - "http.method" "POST" - "http.status_code" 200 - "http.url" { it.startsWith("http://localhost:$sqsPort") } - "$SemanticAttributes.USER_AGENT_ORIGINAL" String - "net.peer.name" "localhost" - "net.peer.port" sqsPort - "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long } - "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long } - } - } - } if (!xrayInjectionEnabled) { - trace(3, 1) { + trace(2, 1) { span(0) { name "Sqs.ReceiveMessage" kind CONSUMER diff --git a/instrumentation/camel-2.20/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apachecamel/aws/SqsCamelTest.java b/instrumentation/camel-2.20/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apachecamel/aws/SqsCamelTest.java index 0e3617868367..622664815968 100644 --- a/instrumentation/camel-2.20/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apachecamel/aws/SqsCamelTest.java +++ b/instrumentation/camel-2.20/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apachecamel/aws/SqsCamelTest.java @@ -64,9 +64,6 @@ void camelSqsProducerToCamelSqsConsumer() { .hasParent(trace.getSpan(2)), span -> CamelSpanAssertions.sqsConsume(span, queueName).hasParent(trace.getSpan(2))), - trace -> - trace.hasSpansSatisfyingExactly( - span -> AwsSpanAssertions.sqs(span, "SQS.ReceiveMessage", queueUrl).hasNoParent()), trace -> trace.hasSpansSatisfyingExactly( span -> AwsSpanAssertions.sqs(span, "SQS.DeleteMessage", queueUrl).hasNoParent())); @@ -104,13 +101,7 @@ void awsSdkSqsProducerToCamelSqsConsumer() { CamelSpanAssertions.sqsConsume(span, queueName).hasParent(trace.getSpan(0))), trace -> trace.hasSpansSatisfyingExactly( - span -> AwsSpanAssertions.sqs(span, "SQS.ReceiveMessage", queueUrl).hasNoParent()), - trace -> - trace.hasSpansSatisfyingExactly( - span -> AwsSpanAssertions.sqs(span, "SQS.DeleteMessage", queueUrl).hasNoParent()), - trace -> - trace.hasSpansSatisfyingExactly( - span -> AwsSpanAssertions.sqs(span, "SQS.ReceiveMessage", queueUrl).hasNoParent())); + span -> AwsSpanAssertions.sqs(span, "SQS.DeleteMessage", queueUrl).hasNoParent())); camelApp.stop(); } @@ -142,14 +133,7 @@ void camelSqsProducerToAwsSdkSqsConsumer() { span -> AwsSpanAssertions.sqs( span, "SQS.ReceiveMessage", queueUrl, null, SpanKind.CONSUMER) - .hasParent(trace.getSpan(2))), - /* - * This span represents HTTP "sending of receive message" operation. It's always single, while there can be multiple CONSUMER spans (one per consumed message). - * This one could be suppressed (by IF in TracingRequestHandler#beforeRequest but then HTTP instrumentation span would appear - */ - trace -> - trace.hasSpansSatisfyingExactly( - span -> AwsSpanAssertions.sqs(span, "SQS.ReceiveMessage", queueUrl).hasNoParent())); + .hasParent(trace.getSpan(2)))); camelApp.stop(); } }