Skip to content

Commit

Permalink
Fix netty memory leak (#12003)
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit authored Aug 16, 2024
1 parent b231d34 commit 888accf
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContext;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContexts;
import io.opentelemetry.instrumentation.netty.v4_1.internal.client.HttpClientTracingHandler;
import io.opentelemetry.instrumentation.netty.v4_1.internal.server.HttpServerTracingHandler;
import io.opentelemetry.javaagent.instrumentation.netty.v4_1.NettyHttpServerResponseBeforeCommitHandler;
import io.opentelemetry.javaagent.instrumentation.netty.v4_1.NettyServerSingletons;
import java.util.Deque;

public final class Helpers {

Expand All @@ -42,9 +41,8 @@ protected void initChannel(C channel) throws Exception {
// the parent channel is the original http/1.1 channel and has the contexts stored in it;
// we assign to this new channel as the old one will not be evaluated in the upgraded h2c
// chain
Deque<ServerContext> serverContexts =
channel.parent().attr(AttributeKeys.SERVER_CONTEXT).get();
channel.attr(AttributeKeys.SERVER_CONTEXT).set(serverContexts);
ServerContexts serverContexts = ServerContexts.get(channel.parent());
channel.attr(AttributeKeys.SERVER_CONTEXTS).set(serverContexts);

// todo add way to propagate the protocol version override up to the netty instrumentation;
// why: the netty instrumentation extracts the http protocol version from the HttpRequest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel;
import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContext;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContexts;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.Deque;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
Expand Down Expand Up @@ -61,8 +61,7 @@ public static void onEnter(
instrumenter().end(clientContext, request, null, throwable);
return;
}
Deque<ServerContext> serverContexts = ctx.channel().attr(AttributeKeys.SERVER_CONTEXT).get();
ServerContext serverContext = serverContexts != null ? serverContexts.peekFirst() : null;
ServerContext serverContext = ServerContexts.peekFirst(ctx.channel());
if (serverContext != null) {
NettyErrorHolder.set(serverContext.context(), throwable);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import io.netty.util.AttributeKey;
import io.opentelemetry.context.Context;
import java.util.Deque;

/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
Expand All @@ -17,9 +16,9 @@ public final class AttributeKeys {

// this is the context that has the server span
//
// note: this attribute key is also used by ratpack instrumentation
public static final AttributeKey<Deque<ServerContext>> SERVER_CONTEXT =
AttributeKey.valueOf(AttributeKeys.class, "server-context");
// note: this attribute key is also used by finagle instrumentation
public static final AttributeKey<ServerContexts> SERVER_CONTEXTS =
AttributeKey.valueOf(AttributeKeys.class, "server-contexts");

public static final AttributeKey<Context> CLIENT_CONTEXT =
AttributeKey.valueOf(AttributeKeys.class, "client-context");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.netty.v4_1.internal;

import io.netty.channel.Channel;
import io.netty.util.Attribute;
import java.util.ArrayDeque;
import java.util.Deque;

/**
* A helper class for keeping track of incoming requests and spans associated with them.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class ServerContexts {
private static final int PIPELINING_LIMIT = 1000;
// With http pipelining multiple requests can be sent on the same connection. Responses should be
// sent in the same order the requests came in. We use this deque to store the request context
// and pop elements as responses are sent.
private final Deque<ServerContext> serverContexts = new ArrayDeque<>();
private volatile boolean broken = false;

private ServerContexts() {}

public static ServerContexts get(Channel channel) {
return channel.attr(AttributeKeys.SERVER_CONTEXTS).get();
}

public static ServerContexts getOrCreate(Channel channel) {
Attribute<ServerContexts> attribute = channel.attr(AttributeKeys.SERVER_CONTEXTS);
ServerContexts result = attribute.get();
if (result == null) {
result = new ServerContexts();
attribute.set(result);
}
return result;
}

public static ServerContext peekFirst(Channel channel) {
ServerContexts serverContexts = get(channel);
return serverContexts != null ? serverContexts.peekFirst() : null;
}

public ServerContext peekFirst() {
return serverContexts.peekFirst();
}

public ServerContext peekLast() {
return serverContexts.peekFirst();
}

public ServerContext pollFirst() {
return serverContexts.pollFirst();
}

public ServerContext pollLast() {
return serverContexts.pollLast();
}

public void addLast(ServerContext context) {
if (broken) {
return;
}
// If the pipelining limit is exceeded we'll stop tracing and mark the channel as broken.
// Exceeding the limit indicates that there is good chance that server context are not removed
// from the deque and there could be a memory leak. This could happen when http server decides
// not to send response to some requests, for example see
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/11942
if (serverContexts.size() > PIPELINING_LIMIT) {
broken = true;
serverContexts.clear();
}
serverContexts.addLast(context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,12 @@
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel;
import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContext;
import java.util.ArrayDeque;
import java.util.Deque;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContexts;

/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
Expand All @@ -37,7 +33,7 @@ public HttpServerRequestTracingHandler(
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel channel = ctx.channel();
Deque<ServerContext> serverContexts = getOrCreate(channel, AttributeKeys.SERVER_CONTEXT);
ServerContexts serverContexts = ServerContexts.getOrCreate(channel);

if (!(msg instanceof HttpRequest)) {
ServerContext serverContext = serverContexts.peekLast();
Expand Down Expand Up @@ -66,17 +62,18 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
// the span is ended normally in HttpServerResponseTracingHandler
} catch (Throwable throwable) {
// make sure to remove the server context on end() call
ServerContext serverContext = serverContexts.removeLast();
instrumenter.end(serverContext.context(), serverContext.request(), null, throwable);
ServerContext serverContext = serverContexts.pollLast();
if (serverContext != null) {
instrumenter.end(serverContext.context(), serverContext.request(), null, throwable);
}
throw throwable;
}
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// connection was closed, close all remaining requests
Attribute<Deque<ServerContext>> contextAttr = ctx.channel().attr(AttributeKeys.SERVER_CONTEXT);
Deque<ServerContext> serverContexts = contextAttr.get();
ServerContexts serverContexts = ServerContexts.get(ctx.channel());

if (serverContexts == null) {
super.channelInactive(ctx);
Expand All @@ -89,14 +86,4 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
}
super.channelInactive(ctx);
}

private static <T> Deque<T> getOrCreate(Channel channel, AttributeKey<Deque<T>> key) {
Attribute<Deque<T>> attribute = channel.attr(key);
Deque<T> deque = attribute.get();
if (deque == null) {
deque = new ArrayDeque<>();
attribute.set(deque);
}
return deque;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,16 @@
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.netty.common.internal.NettyErrorHolder;
import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel;
import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ProtocolEventHandler;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ProtocolSpecificEvent;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContext;
import java.util.Deque;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContexts;
import javax.annotation.Nullable;

/**
Expand All @@ -51,12 +49,8 @@ public HttpServerResponseTracingHandler(

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise prm) throws Exception {
Attribute<Deque<ServerContext>> serverContextAttr =
ctx.channel().attr(AttributeKeys.SERVER_CONTEXT);

Deque<ServerContext> serverContexts = serverContextAttr.get();
ServerContexts serverContexts = ServerContexts.get(ctx.channel());
ServerContext serverContext = serverContexts != null ? serverContexts.peekFirst() : null;

if (serverContext == null) {
super.write(ctx, msg, prm);
return;
Expand Down Expand Up @@ -86,7 +80,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise prm) thr
} else {
// Headers and body all sent together, we have the response information in the msg.
beforeCommitHandler.handle(serverContext.context(), (HttpResponse) msg);
serverContexts.removeFirst();
serverContexts.pollFirst();
writePromise.addListener(
future ->
end(
Expand All @@ -102,7 +96,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise prm) thr
// Body sent after headers. We stored the response information in the context when
// encountering HttpResponse (which was not FullHttpResponse since it's not
// LastHttpContent).
serverContexts.removeFirst();
serverContexts.pollFirst();
HttpResponse response = ctx.channel().attr(HTTP_SERVER_RESPONSE).getAndSet(null);
writePromise.addListener(
future ->
Expand Down Expand Up @@ -130,7 +124,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise prm) thr
try (Scope ignored = serverContext.context().makeCurrent()) {
super.write(ctx, msg, writePromise);
} catch (Throwable throwable) {
serverContexts.removeFirst();
serverContexts.pollFirst();
end(serverContext.context(), serverContext.request(), null, throwable);
throw throwable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,10 @@
import static io.opentelemetry.javaagent.instrumentation.ratpack.RatpackSingletons.updateServerSpanName;
import static io.opentelemetry.javaagent.instrumentation.ratpack.RatpackSingletons.updateSpanNames;

import io.netty.util.Attribute;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContext;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContexts;
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
import java.util.Deque;
import ratpack.handling.Context;
import ratpack.handling.Handler;

Expand All @@ -26,10 +24,8 @@ public final class TracingHandler implements Handler {

@Override
public void handle(Context ctx) {
Attribute<Deque<ServerContext>> serverContextAttribute =
ctx.getDirectChannelAccess().getChannel().attr(AttributeKeys.SERVER_CONTEXT);
Deque<ServerContext> serverContexts = serverContextAttribute.get();
ServerContext serverContext = serverContexts != null ? serverContexts.peekFirst() : null;
ServerContext serverContext =
ServerContexts.peekFirst(ctx.getDirectChannelAccess().getChannel());

// Must use context from channel, as executor instrumentation is not accurate - Ratpack
// internally queues events and then drains them in batches, causing executor instrumentation to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@

import io.netty.channel.Channel;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContext;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContexts;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.Deque;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
Expand All @@ -39,8 +38,7 @@ public static class CreateOperationsAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope onEnter(@Advice.Argument(0) Channel channel) {
// set context to the first unprocessed request
Deque<ServerContext> serverContextx = channel.attr(AttributeKeys.SERVER_CONTEXT).get();
ServerContext serverContext = serverContextx != null ? serverContextx.peekFirst() : null;
ServerContext serverContext = ServerContexts.peekFirst(channel);
if (serverContext != null) {
return serverContext.context().makeCurrent();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@

import io.netty.channel.ChannelHandlerContext;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContext;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContexts;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.Deque;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
Expand All @@ -40,9 +39,7 @@ public static class RunAdvice {
public static Scope onEnter(
@Advice.FieldValue("ctx") ChannelHandlerContext channelHandlerContext) {
// set context to the first unprocessed request
Deque<ServerContext> serverContexts =
channelHandlerContext.channel().attr(AttributeKeys.SERVER_CONTEXT).get();
ServerContext serverContext = serverContexts != null ? serverContexts.peekFirst() : null;
ServerContext serverContext = ServerContexts.peekFirst(channelHandlerContext.channel());
if (serverContext != null) {
return serverContext.context().makeCurrent();
}
Expand Down

0 comments on commit 888accf

Please sign in to comment.