Skip to content

Commit

Permalink
CDI context propagation improvements for the reactive stack
Browse files Browse the repository at this point in the history
== ArC
Introduce the VertxCurrentContextFactory so that the Vert.x duplicated
context can be used to store the "current context" for normal scopes.

== resteasy-reactive
Don't clear our CDI current request when suspending.

This used to be done in order to prevent subsequent
requests running on the same thread as a suspended
request from accessing the former's data.
With the advent of DuplicatedContext backed
storage, there is no longer any chance of mixing
data so there is no need to clear it out.

Furthermore, by not clearing out current request,
code that accesses the request scoped CurrentVertxRequest
that is executed while the request is suspended, can now
work even if context propagation is not in play.

== Tests
Add leak detection tests for resteasy reactive, graphql and reactive rest client. Also improve the OpenTelemetry reactive tests.

== OpenTelemetry
Only register the OpenTelemetryClientFilter for RESTEasy Client Classic. Use Capabilities to determine when to register the OpenTelemetryClientFilter.

Co-authored-by: Georgios Andrianakis <[email protected]>
Co-authored-by: Clement Escoffier <[email protected]>
Co-authored-by: Roberto Cortez <[email protected]>
Co-authored-by: brunobat <[email protected]>
Co-authored-by: Martin Kouba <[email protected]>
  • Loading branch information
6 people committed Sep 7, 2022
1 parent 96183c6 commit 00ce709
Show file tree
Hide file tree
Showing 19 changed files with 939 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public interface Capability {

String REST = QUARKUS_PREFIX + "rest";
String REST_CLIENT = REST + ".client";
String REST_CLIENT_REACTIVE = REST_CLIENT + ".reactive";
String REST_JACKSON = REST + ".jackson";
String REST_JSONB = REST + ".jsonb";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;

import org.jboss.jandex.AnnotationInstance;
Expand All @@ -21,6 +20,8 @@
import io.quarkus.arc.processor.AnnotationsTransformer;
import io.quarkus.arc.processor.InterceptorBindingRegistrar;
import io.quarkus.bootstrap.classloading.QuarkusClassLoader;
import io.quarkus.deployment.Capabilities;
import io.quarkus.deployment.Capability;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.BuildSteps;
Expand Down Expand Up @@ -53,15 +54,6 @@ public class OpenTelemetryProcessor {
io.opentelemetry.extension.annotations.SpanAttribute.class.getName());;
private static final DotName SPAN_ATTRIBUTE = DotName.createSimple(SpanAttribute.class.getName());

static class RestClientAvailable implements BooleanSupplier {
private static final boolean IS_REST_CLIENT_AVAILABLE = isClassPresent("javax.ws.rs.client.ClientRequestFilter");

@Override
public boolean getAsBoolean() {
return IS_REST_CLIENT_AVAILABLE;
}
}

@BuildStep
AdditionalBeanBuildItem ensureProducerIsRetained() {
return AdditionalBeanBuildItem.builder()
Expand Down Expand Up @@ -138,11 +130,15 @@ public void transform(TransformationContext context) {
}));
}

@BuildStep(onlyIf = RestClientAvailable.class)
void registerProvider(BuildProducer<AdditionalIndexedClassesBuildItem> additionalIndexed,
@BuildStep
void registerRestClientClassicProvider(
Capabilities capabilities,
BuildProducer<AdditionalIndexedClassesBuildItem> additionalIndexed,
BuildProducer<AdditionalBeanBuildItem> additionalBeans) {
additionalIndexed.produce(new AdditionalIndexedClassesBuildItem(OpenTelemetryClientFilter.class.getName()));
additionalBeans.produce(new AdditionalBeanBuildItem(OpenTelemetryClientFilter.class));
if (capabilities.isPresent(Capability.REST_CLIENT) && capabilities.isMissing(Capability.REST_CLIENT_REACTIVE)) {
additionalIndexed.produce(new AdditionalIndexedClassesBuildItem(OpenTelemetryClientFilter.class.getName()));
additionalBeans.produce(new AdditionalBeanBuildItem(OpenTelemetryClientFilter.class));
}
}

@BuildStep
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package io.quarkus.opentelemetry.runtime;

import static io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.setContextSafe;
import static io.smallrye.common.vertx.VertxContext.getOrCreateDuplicatedContext;
import static io.smallrye.common.vertx.VertxContext.isDuplicatedContext;

import org.jboss.logging.Logger;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.ContextStorage;
import io.opentelemetry.context.Scope;
import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.Vertx;

/**
Expand Down Expand Up @@ -118,8 +118,10 @@ public static Context getContext(io.vertx.core.Context vertxContext) {
*/
public static io.vertx.core.Context getVertxContext() {
io.vertx.core.Context context = Vertx.currentContext();
if (context != null) {
io.vertx.core.Context dc = getOrCreateDuplicatedContext(context);
if (context != null && VertxContext.isOnDuplicatedContext()) {
return context;
} else if (context != null) {
io.vertx.core.Context dc = VertxContext.createNewDuplicatedContext(context);
setContextSafe(dc, true);
return dc;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import io.quarkus.opentelemetry.runtime.QuarkusContextStorage;

/**
* A client filter for the JAX-RS Client and MicroProfile REST Client that records OpenTelemetry data.
* A client filter for the JAX-RS Client and MicroProfile REST Client that records OpenTelemetry data. This is only used
* by RESTEast Classic, because the handling implementation is provided by RESTEasy. This is not used by RESTEasy
* Reactive because tracing is handled by Vert.x.
*/
@Unremovable
@Provider
Expand Down Expand Up @@ -77,9 +79,19 @@ public void filter(final ClientRequestContext request) {
if (parentContext == null) {
parentContext = io.opentelemetry.context.Context.current();
}

// For each request, we need a new OTel Context from the **current one**
// the parent context needs to be the one from which the call originates.

if (instrumenter.shouldStart(parentContext, request)) {
Context spanContext = instrumenter.start(parentContext, request);
Scope scope = QuarkusContextStorage.INSTANCE.attach(vertxContext, spanContext);
// Create a new scope with an empty termination callback.
Scope scope = new Scope() {
@Override
public void close() {

}
};
request.setProperty(REST_CLIENT_OTEL_SPAN_CLIENT_CONTEXT, spanContext);
request.setProperty(REST_CLIENT_OTEL_SPAN_CLIENT_PARENT_CONTEXT, parentContext);
request.setProperty(REST_CLIENT_OTEL_SPAN_CLIENT_SCOPE, scope);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package io.quarkus.opentelemetry.runtime.tracing.vertx;

import static io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.setContextSafe;

import java.util.Map;
import java.util.function.BiConsumer;

import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.quarkus.opentelemetry.runtime.QuarkusContextStorage;
import io.quarkus.opentelemetry.runtime.tracing.vertx.OpenTelemetryVertxTracer.SpanOperation;
import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.Context;
import io.vertx.core.MultiMap;
import io.vertx.core.http.impl.headers.HeadersAdaptor;
Expand Down Expand Up @@ -92,10 +89,14 @@ default <R> SpanOperation sendRequest(
if (instrumenter.shouldStart(parentContext, (REQ) request)) {
io.opentelemetry.context.Context spanContext = instrumenter.start(parentContext,
writableHeaders((REQ) request, headers));
Context duplicatedContext = VertxContext.createNewDuplicatedContext(context);
setContextSafe(duplicatedContext, true);
Scope scope = QuarkusContextStorage.INSTANCE.attach(duplicatedContext, spanContext);
return spanOperation(duplicatedContext, (REQ) request, toMultiMap(headers), spanContext, scope);
// Create a new scope with an empty termination callback.
Scope scope = new Scope() {
@Override
public void close() {

}
};
return spanOperation(context, (REQ) request, toMultiMap(headers), spanContext, scope);
}

return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package io.quarkus.resteasy.reactive.server.test;

import static org.awaitility.Awaitility.await;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.RequestScoped;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.MediaType;

import org.jboss.shrinkwrap.api.asset.EmptyAsset;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.restassured.RestAssured;
import io.restassured.response.ResponseBody;
import io.smallrye.common.vertx.VertxContext;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;
import io.vertx.core.Vertx;

public class RequestLeakDetectionTest {

@RegisterExtension
static QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot((jar) -> jar.addClasses(MyRestAPI.class, MyRequestScopeBean.class, Barrier.class, Task.class)
.addAsManifestResource(EmptyAsset.INSTANCE, "beans.xml"));

@Inject
Barrier barrier;

@Test
public void testWithConcurrentCalls() {
List<String> results = new CopyOnWriteArrayList<>();
int count = 100;
barrier.setMaxConcurrency(count);
for (int i = 0; i < count; i++) {
int c = i;
new Thread(() -> {
ResponseBody<?> body = RestAssured.given().pathParam("val", c).contentType(MediaType.APPLICATION_JSON)
.get("/test/{val}").thenReturn().body();
String value = body.toString();
results.add(value);
}).start();
}
await().until(() -> results.size() == count);
Set<String> asSet = new HashSet<>(results);
Assertions.assertEquals(asSet.size(), count);
}

@Test
public void testWithConcurrentBlockingCalls() {
List<String> results = new CopyOnWriteArrayList<>();
int count = 100;
barrier.setMaxConcurrency(count);
for (int i = 0; i < count; i++) {
int c = i;
new Thread(() -> {
ResponseBody<?> body = RestAssured.given().pathParam("val", c).contentType(MediaType.APPLICATION_JSON)
.get("/test/blocking/{val}").thenReturn().body();
String value = body.toString();
results.add(value);
}).start();
}
await().until(() -> results.size() == count);
Set<String> asSet = new HashSet<>(results);
Assertions.assertEquals(asSet.size(), count);
}

@ApplicationScoped
@Path("/test")
public static class MyRestAPI {

@Inject
MyRequestScopeBean bean;

@Inject
Barrier barrier;

@GET
@Path("/{val}")
public Uni<Foo> foo(int val) {
Assertions.assertTrue(VertxContext.isOnDuplicatedContext());
Vertx.currentContext().putLocal("count", val);
bean.setValue(val);

return Uni.createFrom().<Integer> emitter(e -> {
barrier.enqueue(Vertx.currentContext(), () -> {
Assertions.assertTrue(VertxContext.isOnDuplicatedContext());
int r = Vertx.currentContext().getLocal("count");
Assertions.assertEquals(r, val);
e.complete(bean.getValue());
});
}).map(i -> new Foo(Integer.toString(i)));
}

@GET
@Path("/blocking/{val}")
public Foo blocking(int val) {
Assertions.assertTrue(VertxContext.isOnDuplicatedContext());
Vertx.currentContext().putLocal("count", val);
bean.setValue(val);

return Uni.createFrom().<Integer> emitter(e -> {
barrier.enqueue(Vertx.currentContext(), () -> {
Assertions.assertTrue(VertxContext.isOnDuplicatedContext());
int r = Vertx.currentContext().getLocal("count");
Assertions.assertEquals(r, val);
e.complete(bean.getValue());
});
})
.map(i -> new Foo(Integer.toString(i)))
.await().indefinitely();
}
}

@ApplicationScoped
public static class Barrier {

private int level;

public void setMaxConcurrency(int level) {
this.level = level;
}

private final AtomicInteger counter = new AtomicInteger();
private final List<Task> tasks = new CopyOnWriteArrayList<>();

public void enqueue(Context context, Runnable runnable) {
Task task = new Task(context, runnable);
tasks.add(task);
if (counter.incrementAndGet() >= level) {
for (Task tbr : new ArrayList<>(tasks)) {
tbr.run();
tasks.remove(tbr);
}
}
}
}

private static class Task {
private final Context context;
private final Runnable runnable;

private Task(Context context, Runnable runnable) {
this.context = context;
this.runnable = runnable;
}

void run() {
context.runOnContext(x -> runnable.run());
}
}

@RequestScoped
public static class MyRequestScopeBean {

private int value = -1;

public void setValue(int v) {
if (value != -1) {
throw new IllegalStateException("Already initialized");
}
value = v;
}

public int getValue() {
return value;
}

}

public static class Foo {

public final String value;

public Foo(String value) {
this.value = value;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ protected void handleRequestScopeActivation() {
}
}

@Override
protected void requestScopeDeactivated() {
// we intentionally don't call 'CurrentRequestManager.set(null)'
// because there is no need to clear the current request
// as that is backed by a DuplicatedContext and not accessible to other requests anyway
}

protected SecurityContext createSecurityContext() {
return new ResteasyReactiveSecurityContext(context);
}
Expand Down
Loading

0 comments on commit 00ce709

Please sign in to comment.