diff --git a/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/Helpers.java b/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/Helpers.java index 4eed55e13751..6f00d4cd69d9 100644 --- a/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/Helpers.java +++ b/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/Helpers.java @@ -18,12 +18,11 @@ import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.util.VirtualField; import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys; -import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContext; +import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContexts; import io.opentelemetry.instrumentation.netty.v4_1.internal.client.HttpClientTracingHandler; import io.opentelemetry.instrumentation.netty.v4_1.internal.server.HttpServerTracingHandler; import io.opentelemetry.javaagent.instrumentation.netty.v4_1.NettyHttpServerResponseBeforeCommitHandler; import io.opentelemetry.javaagent.instrumentation.netty.v4_1.NettyServerSingletons; -import java.util.Deque; public final class Helpers { @@ -42,9 +41,8 @@ protected void initChannel(C channel) throws Exception { // the parent channel is the original http/1.1 channel and has the contexts stored in it; // we assign to this new channel as the old one will not be evaluated in the upgraded h2c // chain - Deque serverContexts = - channel.parent().attr(AttributeKeys.SERVER_CONTEXT).get(); - channel.attr(AttributeKeys.SERVER_CONTEXT).set(serverContexts); + ServerContexts serverContexts = ServerContexts.get(channel.parent()); + channel.attr(AttributeKeys.SERVER_CONTEXTS).set(serverContexts); // todo add way to propagate the protocol version override up to the netty instrumentation; // why: the netty instrumentation extracts the http protocol version from the HttpRequest diff --git a/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/AbstractChannelHandlerContextInstrumentation.java b/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/AbstractChannelHandlerContextInstrumentation.java index ae5f3e0f43eb..474701939f06 100644 --- a/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/AbstractChannelHandlerContextInstrumentation.java +++ b/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/AbstractChannelHandlerContextInstrumentation.java @@ -18,9 +18,9 @@ import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel; import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys; import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContext; +import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContexts; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; -import java.util.Deque; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -61,8 +61,7 @@ public static void onEnter( instrumenter().end(clientContext, request, null, throwable); return; } - Deque serverContexts = ctx.channel().attr(AttributeKeys.SERVER_CONTEXT).get(); - ServerContext serverContext = serverContexts != null ? serverContexts.peekFirst() : null; + ServerContext serverContext = ServerContexts.peekFirst(ctx.channel()); if (serverContext != null) { NettyErrorHolder.set(serverContext.context(), throwable); } diff --git a/instrumentation/netty/netty-4.1/library/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/internal/AttributeKeys.java b/instrumentation/netty/netty-4.1/library/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/internal/AttributeKeys.java index f18bd38bc89a..ea9947441b89 100644 --- a/instrumentation/netty/netty-4.1/library/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/internal/AttributeKeys.java +++ b/instrumentation/netty/netty-4.1/library/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/internal/AttributeKeys.java @@ -7,7 +7,6 @@ import io.netty.util.AttributeKey; import io.opentelemetry.context.Context; -import java.util.Deque; /** * This class is internal and is hence not for public use. Its APIs are unstable and can change at @@ -17,9 +16,9 @@ public final class AttributeKeys { // this is the context that has the server span // - // note: this attribute key is also used by ratpack instrumentation - public static final AttributeKey> SERVER_CONTEXT = - AttributeKey.valueOf(AttributeKeys.class, "server-context"); + // note: this attribute key is also used by finagle instrumentation + public static final AttributeKey SERVER_CONTEXTS = + AttributeKey.valueOf(AttributeKeys.class, "server-contexts"); public static final AttributeKey CLIENT_CONTEXT = AttributeKey.valueOf(AttributeKeys.class, "client-context"); diff --git a/instrumentation/netty/netty-4.1/library/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/internal/ServerContexts.java b/instrumentation/netty/netty-4.1/library/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/internal/ServerContexts.java new file mode 100644 index 000000000000..34697a006200 --- /dev/null +++ b/instrumentation/netty/netty-4.1/library/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/internal/ServerContexts.java @@ -0,0 +1,79 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.netty.v4_1.internal; + +import io.netty.channel.Channel; +import io.netty.util.Attribute; +import java.util.ArrayDeque; +import java.util.Deque; + +/** + * A helper class for keeping track of incoming requests and spans associated with them. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public final class ServerContexts { + private static final int PIPELINING_LIMIT = 1000; + // With http pipelining multiple requests can be sent on the same connection. Responses should be + // sent in the same order the requests came in. We use this deque to store the request context + // and pop elements as responses are sent. + private final Deque serverContexts = new ArrayDeque<>(); + private volatile boolean broken = false; + + private ServerContexts() {} + + public static ServerContexts get(Channel channel) { + return channel.attr(AttributeKeys.SERVER_CONTEXTS).get(); + } + + public static ServerContexts getOrCreate(Channel channel) { + Attribute attribute = channel.attr(AttributeKeys.SERVER_CONTEXTS); + ServerContexts result = attribute.get(); + if (result == null) { + result = new ServerContexts(); + attribute.set(result); + } + return result; + } + + public static ServerContext peekFirst(Channel channel) { + ServerContexts serverContexts = get(channel); + return serverContexts != null ? serverContexts.peekFirst() : null; + } + + public ServerContext peekFirst() { + return serverContexts.peekFirst(); + } + + public ServerContext peekLast() { + return serverContexts.peekFirst(); + } + + public ServerContext pollFirst() { + return serverContexts.pollFirst(); + } + + public ServerContext pollLast() { + return serverContexts.pollLast(); + } + + public void addLast(ServerContext context) { + if (broken) { + return; + } + // If the pipelining limit is exceeded we'll stop tracing and mark the channel as broken. + // Exceeding the limit indicates that there is good chance that server context are not removed + // from the deque and there could be a memory leak. This could happen when http server decides + // not to send response to some requests, for example see + // https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/11942 + if (serverContexts.size() > PIPELINING_LIMIT) { + broken = true; + serverContexts.clear(); + } + serverContexts.addLast(context); + } +} diff --git a/instrumentation/netty/netty-4.1/library/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/internal/server/HttpServerRequestTracingHandler.java b/instrumentation/netty/netty-4.1/library/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/internal/server/HttpServerRequestTracingHandler.java index 14618125bd8f..8f5b9a56fe84 100644 --- a/instrumentation/netty/netty-4.1/library/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/internal/server/HttpServerRequestTracingHandler.java +++ b/instrumentation/netty/netty-4.1/library/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/internal/server/HttpServerRequestTracingHandler.java @@ -10,16 +10,12 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponse; -import io.netty.util.Attribute; -import io.netty.util.AttributeKey; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel; -import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys; import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContext; -import java.util.ArrayDeque; -import java.util.Deque; +import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContexts; /** * This class is internal and is hence not for public use. Its APIs are unstable and can change at @@ -37,7 +33,7 @@ public HttpServerRequestTracingHandler( @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Channel channel = ctx.channel(); - Deque serverContexts = getOrCreate(channel, AttributeKeys.SERVER_CONTEXT); + ServerContexts serverContexts = ServerContexts.getOrCreate(channel); if (!(msg instanceof HttpRequest)) { ServerContext serverContext = serverContexts.peekLast(); @@ -66,8 +62,10 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception // the span is ended normally in HttpServerResponseTracingHandler } catch (Throwable throwable) { // make sure to remove the server context on end() call - ServerContext serverContext = serverContexts.removeLast(); - instrumenter.end(serverContext.context(), serverContext.request(), null, throwable); + ServerContext serverContext = serverContexts.pollLast(); + if (serverContext != null) { + instrumenter.end(serverContext.context(), serverContext.request(), null, throwable); + } throw throwable; } } @@ -75,8 +73,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // connection was closed, close all remaining requests - Attribute> contextAttr = ctx.channel().attr(AttributeKeys.SERVER_CONTEXT); - Deque serverContexts = contextAttr.get(); + ServerContexts serverContexts = ServerContexts.get(ctx.channel()); if (serverContexts == null) { super.channelInactive(ctx); @@ -89,14 +86,4 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { } super.channelInactive(ctx); } - - private static Deque getOrCreate(Channel channel, AttributeKey> key) { - Attribute> attribute = channel.attr(key); - Deque deque = attribute.get(); - if (deque == null) { - deque = new ArrayDeque<>(); - attribute.set(deque); - } - return deque; - } } diff --git a/instrumentation/netty/netty-4.1/library/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/internal/server/HttpServerResponseTracingHandler.java b/instrumentation/netty/netty-4.1/library/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/internal/server/HttpServerResponseTracingHandler.java index 4bdcf99a777e..2c89e310c6e5 100644 --- a/instrumentation/netty/netty-4.1/library/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/internal/server/HttpServerResponseTracingHandler.java +++ b/instrumentation/netty/netty-4.1/library/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/internal/server/HttpServerResponseTracingHandler.java @@ -13,18 +13,16 @@ import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.LastHttpContent; -import io.netty.util.Attribute; import io.netty.util.AttributeKey; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.netty.common.internal.NettyErrorHolder; import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel; -import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys; import io.opentelemetry.instrumentation.netty.v4_1.internal.ProtocolEventHandler; import io.opentelemetry.instrumentation.netty.v4_1.internal.ProtocolSpecificEvent; import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContext; -import java.util.Deque; +import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContexts; import javax.annotation.Nullable; /** @@ -51,12 +49,8 @@ public HttpServerResponseTracingHandler( @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise prm) throws Exception { - Attribute> serverContextAttr = - ctx.channel().attr(AttributeKeys.SERVER_CONTEXT); - - Deque serverContexts = serverContextAttr.get(); + ServerContexts serverContexts = ServerContexts.get(ctx.channel()); ServerContext serverContext = serverContexts != null ? serverContexts.peekFirst() : null; - if (serverContext == null) { super.write(ctx, msg, prm); return; @@ -86,7 +80,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise prm) thr } else { // Headers and body all sent together, we have the response information in the msg. beforeCommitHandler.handle(serverContext.context(), (HttpResponse) msg); - serverContexts.removeFirst(); + serverContexts.pollFirst(); writePromise.addListener( future -> end( @@ -102,7 +96,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise prm) thr // Body sent after headers. We stored the response information in the context when // encountering HttpResponse (which was not FullHttpResponse since it's not // LastHttpContent). - serverContexts.removeFirst(); + serverContexts.pollFirst(); HttpResponse response = ctx.channel().attr(HTTP_SERVER_RESPONSE).getAndSet(null); writePromise.addListener( future -> @@ -130,7 +124,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise prm) thr try (Scope ignored = serverContext.context().makeCurrent()) { super.write(ctx, msg, writePromise); } catch (Throwable throwable) { - serverContexts.removeFirst(); + serverContexts.pollFirst(); end(serverContext.context(), serverContext.request(), null, throwable); throw throwable; } diff --git a/instrumentation/ratpack/ratpack-1.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/ratpack/TracingHandler.java b/instrumentation/ratpack/ratpack-1.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/ratpack/TracingHandler.java index eb61bbf5f57c..cdfd48657a56 100644 --- a/instrumentation/ratpack/ratpack-1.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/ratpack/TracingHandler.java +++ b/instrumentation/ratpack/ratpack-1.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/ratpack/TracingHandler.java @@ -9,12 +9,10 @@ import static io.opentelemetry.javaagent.instrumentation.ratpack.RatpackSingletons.updateServerSpanName; import static io.opentelemetry.javaagent.instrumentation.ratpack.RatpackSingletons.updateSpanNames; -import io.netty.util.Attribute; import io.opentelemetry.context.Scope; -import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys; import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContext; +import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContexts; import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge; -import java.util.Deque; import ratpack.handling.Context; import ratpack.handling.Handler; @@ -26,10 +24,8 @@ public final class TracingHandler implements Handler { @Override public void handle(Context ctx) { - Attribute> serverContextAttribute = - ctx.getDirectChannelAccess().getChannel().attr(AttributeKeys.SERVER_CONTEXT); - Deque serverContexts = serverContextAttribute.get(); - ServerContext serverContext = serverContexts != null ? serverContexts.peekFirst() : null; + ServerContext serverContext = + ServerContexts.peekFirst(ctx.getDirectChannelAccess().getChannel()); // Must use context from channel, as executor instrumentation is not accurate - Ratpack // internally queues events and then drains them in batches, causing executor instrumentation to diff --git a/instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/webflux/v5_0/server/reactornetty/ContextHandlerInstrumentation.java b/instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/webflux/v5_0/server/reactornetty/ContextHandlerInstrumentation.java index aa51160e5fc0..f1d6af9e73d6 100644 --- a/instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/webflux/v5_0/server/reactornetty/ContextHandlerInstrumentation.java +++ b/instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/webflux/v5_0/server/reactornetty/ContextHandlerInstrumentation.java @@ -10,11 +10,10 @@ import io.netty.channel.Channel; import io.opentelemetry.context.Scope; -import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys; import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContext; +import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContexts; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; -import java.util.Deque; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -39,8 +38,7 @@ public static class CreateOperationsAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static Scope onEnter(@Advice.Argument(0) Channel channel) { // set context to the first unprocessed request - Deque serverContextx = channel.attr(AttributeKeys.SERVER_CONTEXT).get(); - ServerContext serverContext = serverContextx != null ? serverContextx.peekFirst() : null; + ServerContext serverContext = ServerContexts.peekFirst(channel); if (serverContext != null) { return serverContext.context().makeCurrent(); } diff --git a/instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/webflux/v5_0/server/reactornetty/HttpTrafficHandlerInstrumentation.java b/instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/webflux/v5_0/server/reactornetty/HttpTrafficHandlerInstrumentation.java index efe171194b18..3567a8524644 100644 --- a/instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/webflux/v5_0/server/reactornetty/HttpTrafficHandlerInstrumentation.java +++ b/instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/webflux/v5_0/server/reactornetty/HttpTrafficHandlerInstrumentation.java @@ -10,11 +10,10 @@ import io.netty.channel.ChannelHandlerContext; import io.opentelemetry.context.Scope; -import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys; import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContext; +import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContexts; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; -import java.util.Deque; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -40,9 +39,7 @@ public static class RunAdvice { public static Scope onEnter( @Advice.FieldValue("ctx") ChannelHandlerContext channelHandlerContext) { // set context to the first unprocessed request - Deque serverContexts = - channelHandlerContext.channel().attr(AttributeKeys.SERVER_CONTEXT).get(); - ServerContext serverContext = serverContexts != null ? serverContexts.peekFirst() : null; + ServerContext serverContext = ServerContexts.peekFirst(channelHandlerContext.channel()); if (serverContext != null) { return serverContext.context().makeCurrent(); }