From c7617dc06800c15ef5fbe729688c3fcbe2c1c0d4 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Fri, 22 Sep 2023 23:08:14 +0300 Subject: [PATCH] Rework reactor netty context tracking (#9286) --- .../http/HttpClientAttributesExtractor.java | 10 ++++---- .../HttpClientAttributesExtractorTest.java | 4 ++-- ...entAttributesExtractorBothSemconvTest.java | 4 ++-- ...tAttributesExtractorStableSemconvTest.java | 4 ++-- .../v1_0/InstrumentationContexts.java | 24 ++++++++++++++----- 5 files changed, 29 insertions(+), 17 deletions(-) diff --git a/instrumentation-api-semconv/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/http/HttpClientAttributesExtractor.java b/instrumentation-api-semconv/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/http/HttpClientAttributesExtractor.java index 23efcb67593e..0723d72d4794 100644 --- a/instrumentation-api-semconv/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/http/HttpClientAttributesExtractor.java +++ b/instrumentation-api-semconv/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/http/HttpClientAttributesExtractor.java @@ -113,6 +113,11 @@ public void onStart(AttributesBuilder attributes, Context parentContext, REQUEST if (SemconvStability.emitOldHttpSemconv()) { internalSet(attributes, SemanticAttributes.HTTP_URL, fullUrl); } + + int resendCount = resendCountIncrementer.applyAsInt(parentContext); + if (resendCount > 0) { + attributes.put(SemanticAttributes.HTTP_RESEND_COUNT, resendCount); + } } @Override @@ -127,11 +132,6 @@ public void onEnd( internalNetExtractor.onEnd(attributes, request, response); internalNetworkExtractor.onEnd(attributes, request, response); internalServerExtractor.onEnd(attributes, request, response); - - int resendCount = resendCountIncrementer.applyAsInt(context); - if (resendCount > 0) { - attributes.put(SemanticAttributes.HTTP_RESEND_COUNT, resendCount); - } } /** diff --git a/instrumentation-api-semconv/src/test/java/io/opentelemetry/instrumentation/api/instrumenter/http/HttpClientAttributesExtractorTest.java b/instrumentation-api-semconv/src/test/java/io/opentelemetry/instrumentation/api/instrumenter/http/HttpClientAttributesExtractorTest.java index 947082de8a50..eb5e9c9b17d0 100644 --- a/instrumentation-api-semconv/src/test/java/io/opentelemetry/instrumentation/api/instrumenter/http/HttpClientAttributesExtractorTest.java +++ b/instrumentation-api-semconv/src/test/java/io/opentelemetry/instrumentation/api/instrumenter/http/HttpClientAttributesExtractorTest.java @@ -148,7 +148,8 @@ void normal() { AttributeKey.stringArrayKey("http.request.header.custom_request_header"), asList("123", "456")), entry(SemanticAttributes.NET_PEER_NAME, "github.com"), - entry(SemanticAttributes.NET_PEER_PORT, 123L)); + entry(SemanticAttributes.NET_PEER_PORT, 123L), + entry(SemanticAttributes.HTTP_RESEND_COUNT, 2L)); AttributesBuilder endAttributes = Attributes.builder(); extractor.onEnd(endAttributes, Context.root(), request, response, null); @@ -157,7 +158,6 @@ void normal() { entry(SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH, 10L), entry(SemanticAttributes.HTTP_STATUS_CODE, 202L), entry(SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH, 20L), - entry(SemanticAttributes.HTTP_RESEND_COUNT, 2L), entry( AttributeKey.stringArrayKey("http.response.header.custom_response_header"), asList("654", "321")), diff --git a/instrumentation-api-semconv/src/testBothHttpSemconv/java/io/opentelemetry/instrumentation/api/instrumenter/http/HttpClientAttributesExtractorBothSemconvTest.java b/instrumentation-api-semconv/src/testBothHttpSemconv/java/io/opentelemetry/instrumentation/api/instrumenter/http/HttpClientAttributesExtractorBothSemconvTest.java index 7a98494dad46..00637a71356a 100644 --- a/instrumentation-api-semconv/src/testBothHttpSemconv/java/io/opentelemetry/instrumentation/api/instrumenter/http/HttpClientAttributesExtractorBothSemconvTest.java +++ b/instrumentation-api-semconv/src/testBothHttpSemconv/java/io/opentelemetry/instrumentation/api/instrumenter/http/HttpClientAttributesExtractorBothSemconvTest.java @@ -145,7 +145,8 @@ void normal() { entry(SemanticAttributes.NET_PEER_NAME, "github.com"), entry(SemanticAttributes.NET_PEER_PORT, 123L), entry(SemanticAttributes.SERVER_ADDRESS, "github.com"), - entry(SemanticAttributes.SERVER_PORT, 123L)); + entry(SemanticAttributes.SERVER_PORT, 123L), + entry(SemanticAttributes.HTTP_RESEND_COUNT, 2L)); AttributesBuilder endAttributes = Attributes.builder(); extractor.onEnd(endAttributes, Context.root(), request, response, null); @@ -157,7 +158,6 @@ void normal() { entry(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE, 202L), entry(SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH, 20L), entry(SemanticAttributes.HTTP_RESPONSE_BODY_SIZE, 20L), - entry(SemanticAttributes.HTTP_RESEND_COUNT, 2L), entry( AttributeKey.stringArrayKey("http.response.header.custom_response_header"), asList("654", "321")), diff --git a/instrumentation-api-semconv/src/testStableHttpSemconv/java/io/opentelemetry/instrumentation/api/instrumenter/http/HttpClientAttributesExtractorStableSemconvTest.java b/instrumentation-api-semconv/src/testStableHttpSemconv/java/io/opentelemetry/instrumentation/api/instrumenter/http/HttpClientAttributesExtractorStableSemconvTest.java index fa9e6e2eef54..6c6702dfb843 100644 --- a/instrumentation-api-semconv/src/testStableHttpSemconv/java/io/opentelemetry/instrumentation/api/instrumenter/http/HttpClientAttributesExtractorStableSemconvTest.java +++ b/instrumentation-api-semconv/src/testStableHttpSemconv/java/io/opentelemetry/instrumentation/api/instrumenter/http/HttpClientAttributesExtractorStableSemconvTest.java @@ -152,7 +152,8 @@ void normal() { AttributeKey.stringArrayKey("http.request.header.custom_request_header"), asList("123", "456")), entry(SemanticAttributes.SERVER_ADDRESS, "github.com"), - entry(SemanticAttributes.SERVER_PORT, 123L)); + entry(SemanticAttributes.SERVER_PORT, 123L), + entry(SemanticAttributes.HTTP_RESEND_COUNT, 2L)); AttributesBuilder endAttributes = Attributes.builder(); extractor.onEnd(endAttributes, Context.root(), request, response, null); @@ -161,7 +162,6 @@ void normal() { entry(SemanticAttributes.HTTP_REQUEST_BODY_SIZE, 10L), entry(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE, 202L), entry(SemanticAttributes.HTTP_RESPONSE_BODY_SIZE, 20L), - entry(SemanticAttributes.HTTP_RESEND_COUNT, 2L), entry( AttributeKey.stringArrayKey("http.response.header.custom_response_header"), asList("654", "321")), diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/InstrumentationContexts.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/InstrumentationContexts.java index 7aee2f2cea93..abc2e6820e9a 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/InstrumentationContexts.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/InstrumentationContexts.java @@ -11,6 +11,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientResendCount; import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil; import io.opentelemetry.instrumentation.api.internal.Timer; +import io.opentelemetry.instrumentation.api.util.VirtualField; import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -19,6 +20,8 @@ import reactor.netty.http.client.HttpClientResponse; final class InstrumentationContexts { + private static final VirtualField requestContextVirtualField = + VirtualField.find(HttpClientRequest.class, Context.class); private static final AtomicReferenceFieldUpdater parentContextUpdater = @@ -56,18 +59,27 @@ Context startClientSpan(HttpClientRequest request) { Context context = null; if (instrumenter().shouldStart(parentContext, request)) { context = instrumenter().start(parentContext, request); + requestContextVirtualField.set(request, context); clientContexts.offer(new RequestAndContext(request, context)); } return context; } - // we are synchronizing here to ensure that spans are ended in the oder they are read from the - // queue - synchronized void endClientSpan( - @Nullable HttpClientResponse response, @Nullable Throwable error) { + void endClientSpan(@Nullable HttpClientResponse response, @Nullable Throwable error) { + HttpClientRequest request = null; + Context context = null; RequestAndContext requestAndContext = clientContexts.poll(); - if (requestAndContext != null) { - instrumenter().end(requestAndContext.context, requestAndContext.request, response, error); + if (response instanceof HttpClientRequest) { + request = (HttpClientRequest) response; + context = requestContextVirtualField.get(request); + } else if (requestAndContext != null) { + // this branch is taken when there was an error (e.g. timeout) and response was null + request = requestAndContext.request; + context = requestAndContext.context; + } + + if (request != null && context != null) { + instrumenter().end(context, request, response, error); } }