Skip to content

Commit

Permalink
Merge pull request #27443 from mkouba/vertx-context-reference-factory
Browse files Browse the repository at this point in the history
CDI context propagation improvements for the reactive stack
  • Loading branch information
mkouba authored Sep 8, 2022
2 parents 5677c2f + 2b64b81 commit 79e9f85
Show file tree
Hide file tree
Showing 22 changed files with 983 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public interface Capability {

String REST = QUARKUS_PREFIX + "rest";
String REST_CLIENT = REST + ".client";
String REST_CLIENT_REACTIVE = REST_CLIENT + ".reactive";
String REST_JACKSON = REST + ".jackson";
String REST_JSONB = REST + ".jsonb";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ void registerFeature(BuildProducer<FeatureBuildItem> feature, Capabilities capab
"'quarkus-narayana-lra' can only work if 'quarkus-resteasy-jackson' or 'quarkus-resteasy-reactive-jackson' is present");
}

if (!capabilities.isPresent(Capability.REST_CLIENT)) {
if (!capabilities.isCapabilityWithPrefixPresent(Capability.REST_CLIENT)) {
throw new IllegalStateException(
"'quarkus-narayana-lra' can only work if 'quarkus-rest-client' or 'quarkus-rest-client-reactive' is present");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;

import org.jboss.jandex.AnnotationInstance;
Expand All @@ -21,6 +20,8 @@
import io.quarkus.arc.processor.AnnotationsTransformer;
import io.quarkus.arc.processor.InterceptorBindingRegistrar;
import io.quarkus.bootstrap.classloading.QuarkusClassLoader;
import io.quarkus.deployment.Capabilities;
import io.quarkus.deployment.Capability;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.BuildSteps;
Expand Down Expand Up @@ -53,15 +54,6 @@ public class OpenTelemetryProcessor {
io.opentelemetry.extension.annotations.SpanAttribute.class.getName());;
private static final DotName SPAN_ATTRIBUTE = DotName.createSimple(SpanAttribute.class.getName());

static class RestClientAvailable implements BooleanSupplier {
private static final boolean IS_REST_CLIENT_AVAILABLE = isClassPresent("javax.ws.rs.client.ClientRequestFilter");

@Override
public boolean getAsBoolean() {
return IS_REST_CLIENT_AVAILABLE;
}
}

@BuildStep
AdditionalBeanBuildItem ensureProducerIsRetained() {
return AdditionalBeanBuildItem.builder()
Expand Down Expand Up @@ -138,11 +130,15 @@ public void transform(TransformationContext context) {
}));
}

@BuildStep(onlyIf = RestClientAvailable.class)
void registerProvider(BuildProducer<AdditionalIndexedClassesBuildItem> additionalIndexed,
@BuildStep
void registerRestClientClassicProvider(
Capabilities capabilities,
BuildProducer<AdditionalIndexedClassesBuildItem> additionalIndexed,
BuildProducer<AdditionalBeanBuildItem> additionalBeans) {
additionalIndexed.produce(new AdditionalIndexedClassesBuildItem(OpenTelemetryClientFilter.class.getName()));
additionalBeans.produce(new AdditionalBeanBuildItem(OpenTelemetryClientFilter.class));
if (capabilities.isPresent(Capability.REST_CLIENT) && capabilities.isMissing(Capability.REST_CLIENT_REACTIVE)) {
additionalIndexed.produce(new AdditionalIndexedClassesBuildItem(OpenTelemetryClientFilter.class.getName()));
additionalBeans.produce(new AdditionalBeanBuildItem(OpenTelemetryClientFilter.class));
}
}

@BuildStep
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package io.quarkus.opentelemetry.runtime;

import static io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.setContextSafe;
import static io.smallrye.common.vertx.VertxContext.getOrCreateDuplicatedContext;
import static io.smallrye.common.vertx.VertxContext.isDuplicatedContext;

import org.jboss.logging.Logger;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.ContextStorage;
import io.opentelemetry.context.Scope;
import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.Vertx;

/**
Expand Down Expand Up @@ -118,8 +118,10 @@ public static Context getContext(io.vertx.core.Context vertxContext) {
*/
public static io.vertx.core.Context getVertxContext() {
io.vertx.core.Context context = Vertx.currentContext();
if (context != null) {
io.vertx.core.Context dc = getOrCreateDuplicatedContext(context);
if (context != null && VertxContext.isOnDuplicatedContext()) {
return context;
} else if (context != null) {
io.vertx.core.Context dc = VertxContext.createNewDuplicatedContext(context);
setContextSafe(dc, true);
return dc;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import io.quarkus.opentelemetry.runtime.QuarkusContextStorage;

/**
* A client filter for the JAX-RS Client and MicroProfile REST Client that records OpenTelemetry data.
* A client filter for the JAX-RS Client and MicroProfile REST Client that records OpenTelemetry data. This is only used
* by RESTEasy Classic, because the handling implementation is provided by RESTEasy. This is not used by RESTEasy
* Reactive because tracing is handled by Vert.x.
*/
@Unremovable
@Provider
Expand Down Expand Up @@ -77,9 +79,19 @@ public void filter(final ClientRequestContext request) {
if (parentContext == null) {
parentContext = io.opentelemetry.context.Context.current();
}

// For each request, we need a new OTel Context from the **current one**
// the parent context needs to be the one from which the call originates.

if (instrumenter.shouldStart(parentContext, request)) {
Context spanContext = instrumenter.start(parentContext, request);
Scope scope = QuarkusContextStorage.INSTANCE.attach(vertxContext, spanContext);
// Create a new scope with an empty termination callback.
Scope scope = new Scope() {
@Override
public void close() {

}
};
request.setProperty(REST_CLIENT_OTEL_SPAN_CLIENT_CONTEXT, spanContext);
request.setProperty(REST_CLIENT_OTEL_SPAN_CLIENT_PARENT_CONTEXT, parentContext);
request.setProperty(REST_CLIENT_OTEL_SPAN_CLIENT_SCOPE, scope);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package io.quarkus.opentelemetry.runtime.tracing.vertx;

import static io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.setContextSafe;

import java.util.Map;
import java.util.function.BiConsumer;

import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.quarkus.opentelemetry.runtime.QuarkusContextStorage;
import io.quarkus.opentelemetry.runtime.tracing.vertx.OpenTelemetryVertxTracer.SpanOperation;
import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.Context;
import io.vertx.core.MultiMap;
import io.vertx.core.http.impl.headers.HeadersAdaptor;
Expand Down Expand Up @@ -92,10 +89,14 @@ default <R> SpanOperation sendRequest(
if (instrumenter.shouldStart(parentContext, (REQ) request)) {
io.opentelemetry.context.Context spanContext = instrumenter.start(parentContext,
writableHeaders((REQ) request, headers));
Context duplicatedContext = VertxContext.createNewDuplicatedContext(context);
setContextSafe(duplicatedContext, true);
Scope scope = QuarkusContextStorage.INSTANCE.attach(duplicatedContext, spanContext);
return spanOperation(duplicatedContext, (REQ) request, toMultiMap(headers), spanContext, scope);
// Create a new scope with an empty termination callback.
Scope scope = new Scope() {
@Override
public void close() {

}
};
return spanOperation(context, (REQ) request, toMultiMap(headers), spanContext, scope);
}

return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package io.quarkus.resteasy.reactive.server.test;

import static org.awaitility.Awaitility.await;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.RequestScoped;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.MediaType;

import org.jboss.shrinkwrap.api.asset.EmptyAsset;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.restassured.RestAssured;
import io.restassured.response.ResponseBody;
import io.smallrye.common.vertx.VertxContext;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;
import io.vertx.core.Vertx;

public class RequestLeakDetectionTest {

@RegisterExtension
static QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot((jar) -> jar.addClasses(MyRestAPI.class, MyRequestScopeBean.class, Barrier.class, Task.class)
.addAsManifestResource(EmptyAsset.INSTANCE, "beans.xml"));

@Inject
Barrier barrier;

@Test
public void testWithConcurrentCalls() {
List<String> results = new CopyOnWriteArrayList<>();
int count = 100;
barrier.setMaxConcurrency(count);
for (int i = 0; i < count; i++) {
int c = i;
new Thread(() -> {
ResponseBody<?> body = RestAssured.given().pathParam("val", c).contentType(MediaType.APPLICATION_JSON)
.get("/test/{val}").thenReturn().body();
String value = body.toString();
results.add(value);
}).start();
}
await().until(() -> results.size() == count);
Set<String> asSet = new HashSet<>(results);
Assertions.assertEquals(asSet.size(), count);
}

@Test
public void testWithConcurrentBlockingCalls() {
List<String> results = new CopyOnWriteArrayList<>();
int count = 100;
barrier.setMaxConcurrency(count);
for (int i = 0; i < count; i++) {
int c = i;
new Thread(() -> {
ResponseBody<?> body = RestAssured.given().pathParam("val", c).contentType(MediaType.APPLICATION_JSON)
.get("/test/blocking/{val}").thenReturn().body();
String value = body.toString();
results.add(value);
}).start();
}
await().until(() -> results.size() == count);
Set<String> asSet = new HashSet<>(results);
Assertions.assertEquals(asSet.size(), count);
}

@ApplicationScoped
@Path("/test")
public static class MyRestAPI {

@Inject
MyRequestScopeBean bean;

@Inject
Barrier barrier;

@GET
@Path("/{val}")
public Uni<Foo> foo(int val) {
Assertions.assertTrue(VertxContext.isOnDuplicatedContext());
Vertx.currentContext().putLocal("count", val);
bean.setValue(val);

return Uni.createFrom().<Integer> emitter(e -> {
barrier.enqueue(Vertx.currentContext(), () -> {
Assertions.assertTrue(VertxContext.isOnDuplicatedContext());
int r = Vertx.currentContext().getLocal("count");
Assertions.assertEquals(r, val);
e.complete(bean.getValue());
});
}).map(i -> new Foo(Integer.toString(i)));
}

@GET
@Path("/blocking/{val}")
public Foo blocking(int val) {
Assertions.assertTrue(VertxContext.isOnDuplicatedContext());
Vertx.currentContext().putLocal("count", val);
bean.setValue(val);

return Uni.createFrom().<Integer> emitter(e -> {
barrier.enqueue(Vertx.currentContext(), () -> {
Assertions.assertTrue(VertxContext.isOnDuplicatedContext());
int r = Vertx.currentContext().getLocal("count");
Assertions.assertEquals(r, val);
e.complete(bean.getValue());
});
})
.map(i -> new Foo(Integer.toString(i)))
.await().indefinitely();
}
}

@ApplicationScoped
public static class Barrier {

private int level;

public void setMaxConcurrency(int level) {
this.level = level;
}

private final AtomicInteger counter = new AtomicInteger();
private final List<Task> tasks = new CopyOnWriteArrayList<>();

public void enqueue(Context context, Runnable runnable) {
Task task = new Task(context, runnable);
tasks.add(task);
if (counter.incrementAndGet() >= level) {
for (Task tbr : new ArrayList<>(tasks)) {
tbr.run();
tasks.remove(tbr);
}
}
}
}

private static class Task {
private final Context context;
private final Runnable runnable;

private Task(Context context, Runnable runnable) {
this.context = context;
this.runnable = runnable;
}

void run() {
context.runOnContext(x -> runnable.run());
}
}

@RequestScoped
public static class MyRequestScopeBean {

private int value = -1;

public void setValue(int v) {
if (value != -1) {
throw new IllegalStateException("Already initialized");
}
value = v;
}

public int getValue() {
return value;
}

}

public static class Foo {

public final String value;

public Foo(String value) {
this.value = value;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ protected void handleRequestScopeActivation() {
}
}

@Override
protected void requestScopeDeactivated() {
// we intentionally don't call 'CurrentRequestManager.set(null)'
// because there is no need to clear the current request
// as that is backed by a DuplicatedContext and not accessible to other requests anyway
}

protected SecurityContext createSecurityContext() {
return new ResteasyReactiveSecurityContext(context);
}
Expand Down
Loading

0 comments on commit 79e9f85

Please sign in to comment.