Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument akka-http bindAndHandle #8174

Merged
merged 2 commits into from
Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import akka.dispatch.sysmsg.SystemMessage;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
import io.opentelemetry.javaagent.bootstrap.executors.PropagatedContext;
import io.opentelemetry.javaagent.bootstrap.executors.TaskAdviceHelper;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
Expand Down Expand Up @@ -52,6 +53,11 @@ public static void exit(@Advice.Enter Scope scope) {
if (scope != null) {
scope.close();
}
// akka-http instrumentation can leak scopes
// reset the context to clear the leaked scopes
if (Java8BytecodeBridge.currentContext() != Java8BytecodeBridge.rootContext()) {
Java8BytecodeBridge.rootContext().makeCurrent();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ dependencies {
library("com.typesafe.akka:akka-http_2.11:10.0.0")
library("com.typesafe.akka:akka-stream_2.11:2.4.14")

// these instrumentations are not needed for the tests to pass
// they are here to test for context leaks
testInstrumentation(project(":instrumentation:akka:akka-actor-2.3:javaagent"))
testInstrumentation(project(":instrumentation:akka:akka-actor-fork-join-2.5:javaagent"))
testInstrumentation(project(":instrumentation:scala-fork-join-2.8:javaagent"))

latestDepTestLibrary("com.typesafe.akka:akka-http_2.13:+")
latestDepTestLibrary("com.typesafe.akka:akka-stream_2.13:+")
Expand All @@ -48,6 +47,8 @@ tasks.withType<Test>().configureEach {
jvmArgs("--add-exports=java.base/sun.security.util=ALL-UNNAMED")
jvmArgs("-XX:+IgnoreUnrecognizedVMOptions")

jvmArgs("-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=false")

systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.akkahttp.server;

import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.akkahttp.server.AkkaHttpServerSingletons.errorResponse;
import static io.opentelemetry.javaagent.instrumentation.akkahttp.server.AkkaHttpServerSingletons.instrumenter;

import akka.http.scaladsl.model.HttpRequest;
import akka.http.scaladsl.model.HttpResponse;
import akka.stream.Attributes;
import akka.stream.BidiShape;
import akka.stream.Inlet;
import akka.stream.Outlet;
import akka.stream.scaladsl.Flow;
import akka.stream.stage.AbstractInHandler;
import akka.stream.stage.AbstractOutHandler;
import akka.stream.stage.GraphStage;
import akka.stream.stage.GraphStageLogic;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.util.ArrayDeque;
import java.util.Deque;

public class AkkaFlowWrapper
extends GraphStage<BidiShape<HttpResponse, HttpResponse, HttpRequest, HttpRequest>> {
private final Inlet<HttpRequest> requestIn = Inlet.create("otel.requestIn");
private final Outlet<HttpRequest> requestOut = Outlet.create("otel.requestOut");
private final Inlet<HttpResponse> responseIn = Inlet.create("otel.responseIn");
private final Outlet<HttpResponse> responseOut = Outlet.create("otel.responseOut");

private final BidiShape<HttpResponse, HttpResponse, HttpRequest, HttpRequest> shape =
BidiShape.of(responseIn, responseOut, requestIn, requestOut);

public static Flow<HttpRequest, HttpResponse, ?> wrap(
Flow<HttpRequest, HttpResponse, ?> handler) {
return handler.join(new AkkaFlowWrapper());
}

@Override
public BidiShape<HttpResponse, HttpResponse, HttpRequest, HttpRequest> shape() {
return shape;
}

@Override
public GraphStageLogic createLogic(Attributes attributes) {
return new TracingLogic();
}

private class TracingLogic extends GraphStageLogic {
private final Deque<TracingRequest> requests = new ArrayDeque<>();

public TracingLogic() {
super(shape);

// server pulls response, pass response from user code to server
setHandler(
responseOut,
new AbstractOutHandler() {
@Override
public void onPull() {
pull(responseIn);
}

@Override
public void onDownstreamFinish() {
cancel(responseIn);
}
});

// user code pulls request, pass request from server to user code
setHandler(
requestOut,
new AbstractOutHandler() {
@Override
public void onPull() {
pull(requestIn);
}

@Override
public void onDownstreamFinish() {
// Invoked on errors. Don't complete this stage to allow error-capturing
cancel(requestIn);
}
});

// new request from server
setHandler(
requestIn,
new AbstractInHandler() {
@Override
public void onPush() {
HttpRequest request = grab(requestIn);

TracingRequest tracingRequest = TracingRequest.EMPTY;
Context parentContext = currentContext();
if (instrumenter().shouldStart(parentContext, request)) {
Context context = instrumenter().start(parentContext, request);
// scope opened here may leak, actor instrumentation will close it
Scope scope = context.makeCurrent();
tracingRequest = new TracingRequest(context, scope, request);
}
// event if span wasn't started we need to push TracingRequest to match response
// with request
requests.push(tracingRequest);

push(requestOut, request);
}

@Override
public void onUpstreamFinish() {
complete(requestOut);
}

@Override
public void onUpstreamFailure(Throwable exception) {
fail(requestOut, exception);
}
});

// response from user code
setHandler(
responseIn,
new AbstractInHandler() {
@Override
public void onPush() {
HttpResponse response = grab(responseIn);

TracingRequest tracingRequest = requests.poll();
if (tracingRequest != null && tracingRequest != TracingRequest.EMPTY) {
// this may happen on a different thread from the one that opened the scope
// actor instrumentation will take care of the leaked scopes
tracingRequest.scope.close();
instrumenter().end(tracingRequest.context, tracingRequest.request, response, null);
}
push(responseOut, response);
}

@Override
public void onUpstreamFailure(Throwable exception) {
TracingRequest tracingRequest;
while ((tracingRequest = requests.poll()) != null) {
if (tracingRequest == TracingRequest.EMPTY) {
continue;
}
tracingRequest.scope.close();
instrumenter()
.end(
tracingRequest.context, tracingRequest.request, errorResponse(), exception);
}

fail(responseOut, exception);
}

@Override
public void onUpstreamFinish() {
completeStage();
}
});
}
}

private static class TracingRequest {
static final TracingRequest EMPTY = new TracingRequest(null, null, null);
final Context context;
final Scope scope;
final HttpRequest request;

TracingRequest(Context context, Scope scope, HttpRequest request) {
this.context = context;
this.scope = scope;
this.request = request;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,12 @@

package io.opentelemetry.javaagent.instrumentation.akkahttp.server;

import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.akkahttp.server.AkkaHttpServerSingletons.errorResponse;
import static io.opentelemetry.javaagent.instrumentation.akkahttp.server.AkkaHttpServerSingletons.instrumenter;
import static java.util.Collections.singletonList;

import akka.http.scaladsl.model.HttpRequest;
import akka.http.scaladsl.model.HttpResponse;
import com.google.auto.service.AutoService;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.List;
import scala.Function1;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.runtime.AbstractFunction1;

@AutoService(InstrumentationModule.class)
public class AkkaHttpServerInstrumentationModule extends InstrumentationModule {
Expand All @@ -33,73 +22,4 @@ public AkkaHttpServerInstrumentationModule() {
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new HttpExtServerInstrumentation());
}

public static class SyncWrapper extends AbstractFunction1<HttpRequest, HttpResponse> {
private final Function1<HttpRequest, HttpResponse> userHandler;

public SyncWrapper(Function1<HttpRequest, HttpResponse> userHandler) {
this.userHandler = userHandler;
}

@Override
public HttpResponse apply(HttpRequest request) {
Context parentContext = currentContext();
if (!instrumenter().shouldStart(parentContext, request)) {
return userHandler.apply(request);
}
Context context = instrumenter().start(parentContext, request);
try (Scope ignored = context.makeCurrent()) {
HttpResponse response = userHandler.apply(request);
instrumenter().end(context, request, response, null);
return response;
} catch (Throwable t) {
instrumenter().end(context, request, errorResponse(), t);
throw t;
}
}
}

public static class AsyncWrapper extends AbstractFunction1<HttpRequest, Future<HttpResponse>> {
private final Function1<HttpRequest, Future<HttpResponse>> userHandler;
private final ExecutionContext executionContext;

public AsyncWrapper(
Function1<HttpRequest, Future<HttpResponse>> userHandler,
ExecutionContext executionContext) {
this.userHandler = userHandler;
this.executionContext = executionContext;
}

@Override
public Future<HttpResponse> apply(HttpRequest request) {
Context parentContext = currentContext();
if (!instrumenter().shouldStart(parentContext, request)) {
return userHandler.apply(request);
}
Context context = instrumenter().start(parentContext, request);
try (Scope ignored = context.makeCurrent()) {
return userHandler
.apply(request)
.transform(
new AbstractFunction1<HttpResponse, HttpResponse>() {
@Override
public HttpResponse apply(HttpResponse response) {
instrumenter().end(context, request, response, null);
return response;
}
},
new AbstractFunction1<Throwable, Throwable>() {
@Override
public Throwable apply(Throwable t) {
instrumenter().end(context, request, errorResponse(), t);
return t;
}
},
executionContext);
} catch (Throwable t) {
instrumenter().end(context, request, null, t);
throw t;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,12 @@

import akka.http.scaladsl.model.HttpRequest;
import akka.http.scaladsl.model.HttpResponse;
import akka.stream.Materializer;
import akka.stream.scaladsl.Flow;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import scala.Function1;
import scala.concurrent.Future;

public class HttpExtServerInstrumentation implements TypeInstrumentation {
@Override
Expand All @@ -27,42 +25,18 @@ public ElementMatcher<TypeDescription> typeMatcher() {

@Override
public void transform(TypeTransformer transformer) {
// Instrumenting akka-streams bindAndHandle api was previously attempted.
// This proved difficult as there was no clean way to close the async scope
// in the graph logic after the user's request handler completes.
//
// Instead, we're instrumenting the bindAndHandle function helpers by
// wrapping the scala functions with our own handlers.
transformer.applyAdviceToMethod(
named("bindAndHandleSync").and(takesArgument(0, named("scala.Function1"))),
this.getClass().getName() + "$AkkaHttpSyncAdvice");
transformer.applyAdviceToMethod(
named("bindAndHandleAsync").and(takesArgument(0, named("scala.Function1"))),
this.getClass().getName() + "$AkkaHttpAsyncAdvice");
}

@SuppressWarnings("unused")
public static class AkkaHttpSyncAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void wrapHandler(
@Advice.Argument(value = 0, readOnly = false)
Function1<HttpRequest, HttpResponse> handler) {
handler = new AkkaHttpServerInstrumentationModule.SyncWrapper(handler);
}
named("bindAndHandle").and(takesArgument(0, named("akka.stream.scaladsl.Flow"))),
this.getClass().getName() + "$AkkaBindAndHandleAdvice");
}

@SuppressWarnings("unused")
public static class AkkaHttpAsyncAdvice {
public static class AkkaBindAndHandleAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void wrapHandler(
@Advice.Argument(value = 0, readOnly = false)
Function1<HttpRequest, Future<HttpResponse>> handler,
@Advice.Argument(7) Materializer materializer) {
handler =
new AkkaHttpServerInstrumentationModule.AsyncWrapper(
handler, materializer.executionContext());
@Advice.Argument(value = 0, readOnly = false) Flow<HttpRequest, HttpResponse, ?> handler) {
handler = AkkaFlowWrapper.wrap(handler);
}
}
}
Loading