Skip to content

Commit

Permalink
Rework reactor netty context tracking (#9286)
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit authored Sep 22, 2023
1 parent 6ba2f49 commit c7617dc
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ public void onStart(AttributesBuilder attributes, Context parentContext, REQUEST
if (SemconvStability.emitOldHttpSemconv()) {
internalSet(attributes, SemanticAttributes.HTTP_URL, fullUrl);
}

int resendCount = resendCountIncrementer.applyAsInt(parentContext);
if (resendCount > 0) {
attributes.put(SemanticAttributes.HTTP_RESEND_COUNT, resendCount);
}
}

@Override
Expand All @@ -127,11 +132,6 @@ public void onEnd(
internalNetExtractor.onEnd(attributes, request, response);
internalNetworkExtractor.onEnd(attributes, request, response);
internalServerExtractor.onEnd(attributes, request, response);

int resendCount = resendCountIncrementer.applyAsInt(context);
if (resendCount > 0) {
attributes.put(SemanticAttributes.HTTP_RESEND_COUNT, resendCount);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ void normal() {
AttributeKey.stringArrayKey("http.request.header.custom_request_header"),
asList("123", "456")),
entry(SemanticAttributes.NET_PEER_NAME, "github.com"),
entry(SemanticAttributes.NET_PEER_PORT, 123L));
entry(SemanticAttributes.NET_PEER_PORT, 123L),
entry(SemanticAttributes.HTTP_RESEND_COUNT, 2L));

AttributesBuilder endAttributes = Attributes.builder();
extractor.onEnd(endAttributes, Context.root(), request, response, null);
Expand All @@ -157,7 +158,6 @@ void normal() {
entry(SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH, 10L),
entry(SemanticAttributes.HTTP_STATUS_CODE, 202L),
entry(SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH, 20L),
entry(SemanticAttributes.HTTP_RESEND_COUNT, 2L),
entry(
AttributeKey.stringArrayKey("http.response.header.custom_response_header"),
asList("654", "321")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ void normal() {
entry(SemanticAttributes.NET_PEER_NAME, "github.com"),
entry(SemanticAttributes.NET_PEER_PORT, 123L),
entry(SemanticAttributes.SERVER_ADDRESS, "github.com"),
entry(SemanticAttributes.SERVER_PORT, 123L));
entry(SemanticAttributes.SERVER_PORT, 123L),
entry(SemanticAttributes.HTTP_RESEND_COUNT, 2L));

AttributesBuilder endAttributes = Attributes.builder();
extractor.onEnd(endAttributes, Context.root(), request, response, null);
Expand All @@ -157,7 +158,6 @@ void normal() {
entry(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE, 202L),
entry(SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH, 20L),
entry(SemanticAttributes.HTTP_RESPONSE_BODY_SIZE, 20L),
entry(SemanticAttributes.HTTP_RESEND_COUNT, 2L),
entry(
AttributeKey.stringArrayKey("http.response.header.custom_response_header"),
asList("654", "321")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ void normal() {
AttributeKey.stringArrayKey("http.request.header.custom_request_header"),
asList("123", "456")),
entry(SemanticAttributes.SERVER_ADDRESS, "github.com"),
entry(SemanticAttributes.SERVER_PORT, 123L));
entry(SemanticAttributes.SERVER_PORT, 123L),
entry(SemanticAttributes.HTTP_RESEND_COUNT, 2L));

AttributesBuilder endAttributes = Attributes.builder();
extractor.onEnd(endAttributes, Context.root(), request, response, null);
Expand All @@ -161,7 +162,6 @@ void normal() {
entry(SemanticAttributes.HTTP_REQUEST_BODY_SIZE, 10L),
entry(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE, 202L),
entry(SemanticAttributes.HTTP_RESPONSE_BODY_SIZE, 20L),
entry(SemanticAttributes.HTTP_RESEND_COUNT, 2L),
entry(
AttributeKey.stringArrayKey("http.response.header.custom_response_header"),
asList("654", "321")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientResendCount;
import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
import io.opentelemetry.instrumentation.api.internal.Timer;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
Expand All @@ -19,6 +20,8 @@
import reactor.netty.http.client.HttpClientResponse;

final class InstrumentationContexts {
private static final VirtualField<HttpClientRequest, Context> requestContextVirtualField =
VirtualField.find(HttpClientRequest.class, Context.class);

private static final AtomicReferenceFieldUpdater<InstrumentationContexts, Context>
parentContextUpdater =
Expand Down Expand Up @@ -56,18 +59,27 @@ Context startClientSpan(HttpClientRequest request) {
Context context = null;
if (instrumenter().shouldStart(parentContext, request)) {
context = instrumenter().start(parentContext, request);
requestContextVirtualField.set(request, context);
clientContexts.offer(new RequestAndContext(request, context));
}
return context;
}

// we are synchronizing here to ensure that spans are ended in the oder they are read from the
// queue
synchronized void endClientSpan(
@Nullable HttpClientResponse response, @Nullable Throwable error) {
void endClientSpan(@Nullable HttpClientResponse response, @Nullable Throwable error) {
HttpClientRequest request = null;
Context context = null;
RequestAndContext requestAndContext = clientContexts.poll();
if (requestAndContext != null) {
instrumenter().end(requestAndContext.context, requestAndContext.request, response, error);
if (response instanceof HttpClientRequest) {
request = (HttpClientRequest) response;
context = requestContextVirtualField.get(request);
} else if (requestAndContext != null) {
// this branch is taken when there was an error (e.g. timeout) and response was null
request = requestAndContext.request;
context = requestAndContext.context;
}

if (request != null && context != null) {
instrumenter().end(context, request, response, error);
}
}

Expand Down

0 comments on commit c7617dc

Please sign in to comment.