From 9cd24c4e44e549f7dbed6aa62cde483c4962cc7e Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Tue, 21 Dec 2021 10:53:40 +1100 Subject: [PATCH] CDI context propagation improvements for the reactive stack == ArC Introduce the VertxCurrentContextFactory so that the Vert.x duplicated context can be used to store the "current context" for normal scopes. == resteasy-reactive Don't clear our CDI current request when suspending. This used to be done in order to prevent subsequent requests running on the same thread as a suspended request from accessing the former's data. With the advent of DuplicatedContext backed storage, there is no longer any chance of mixing data so there is no need to clear it out. Furthermore, by not clearing out current request, code that accesses the request scoped CurrentVertxRequest that is executed while the request is suspended, can now work even if context propagation is not in play. == Tests Add leak detection tests for resteasy reactive, graphql and reactive rest client. Also improve the OpenTelemetry reactive tests. == OpenTelemetry Only register the OpenTelemetryClientFilter for RESTEasy Client Classic. Use Capabilities to determine when to register the OpenTelemetryClientFilter. Co-authored-by: Georgios Andrianakis Co-authored-by: Clement Escoffier Co-authored-by: Roberto Cortez Co-authored-by: brunobat Co-authored-by: Martin Kouba --- .../io/quarkus/deployment/Capability.java | 1 + .../deployment/OpenTelemetryProcessor.java | 24 +- .../runtime/QuarkusContextStorage.java | 8 +- .../restclient/OpenTelemetryClientFilter.java | 16 +- .../vertx/InstrumenterVertxTracer.java | 15 +- .../server/test/RequestLeakDetectionTest.java | 192 +++++++++++ ...QuarkusResteasyReactiveRequestContext.java | 7 + .../reactive/RequestLeakDetectionTest.java | 309 ++++++++++++++++++ .../rest-client-reactive/runtime/pom.xml | 2 +- .../smallrye-graphql/deployment/pom.xml | 2 +- .../deployment/RequestLeakDetectionTest.java | 185 +++++++++++ .../vertx/deployment/VertxProcessor.java | 7 + .../runtime/VertxCurrentContextFactory.java | 58 ++++ .../quarkus/vertx/runtime/VertxRecorder.java | 5 + .../client/impl/ClientRequestContextImpl.java | 11 +- .../client/impl/RestClientRequestContext.java | 5 + .../core/AbstractResteasyReactiveContext.java | 6 +- .../reactive/ReactiveResource.java | 15 +- .../reactive/OpenTelemetryReactiveTest.java | 112 ++++++- 19 files changed, 939 insertions(+), 41 deletions(-) create mode 100644 extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/RequestLeakDetectionTest.java create mode 100644 extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/RequestLeakDetectionTest.java create mode 100644 extensions/smallrye-graphql/deployment/src/test/java/io/quarkus/smallrye/graphql/deployment/RequestLeakDetectionTest.java create mode 100644 extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxCurrentContextFactory.java diff --git a/core/deployment/src/main/java/io/quarkus/deployment/Capability.java b/core/deployment/src/main/java/io/quarkus/deployment/Capability.java index 5af46ef4d0b2fc..e5463d19ed91ac 100644 --- a/core/deployment/src/main/java/io/quarkus/deployment/Capability.java +++ b/core/deployment/src/main/java/io/quarkus/deployment/Capability.java @@ -39,6 +39,7 @@ public interface Capability { String REST = QUARKUS_PREFIX + "rest"; String REST_CLIENT = REST + ".client"; + String REST_CLIENT_REACTIVE = REST_CLIENT + ".reactive"; String REST_JACKSON = REST + ".jackson"; String REST_JSONB = REST + ".jsonb"; diff --git a/extensions/opentelemetry/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/OpenTelemetryProcessor.java b/extensions/opentelemetry/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/OpenTelemetryProcessor.java index 004bc8cc6dc208..4a54dc1c8ea00e 100644 --- a/extensions/opentelemetry/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/OpenTelemetryProcessor.java +++ b/extensions/opentelemetry/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/OpenTelemetryProcessor.java @@ -3,7 +3,6 @@ import java.util.List; import java.util.Optional; import java.util.Set; -import java.util.function.BooleanSupplier; import java.util.stream.Collectors; import org.jboss.jandex.AnnotationInstance; @@ -21,6 +20,8 @@ import io.quarkus.arc.processor.AnnotationsTransformer; import io.quarkus.arc.processor.InterceptorBindingRegistrar; import io.quarkus.bootstrap.classloading.QuarkusClassLoader; +import io.quarkus.deployment.Capabilities; +import io.quarkus.deployment.Capability; import io.quarkus.deployment.annotations.BuildProducer; import io.quarkus.deployment.annotations.BuildStep; import io.quarkus.deployment.annotations.BuildSteps; @@ -53,15 +54,6 @@ public class OpenTelemetryProcessor { io.opentelemetry.extension.annotations.SpanAttribute.class.getName());; private static final DotName SPAN_ATTRIBUTE = DotName.createSimple(SpanAttribute.class.getName()); - static class RestClientAvailable implements BooleanSupplier { - private static final boolean IS_REST_CLIENT_AVAILABLE = isClassPresent("javax.ws.rs.client.ClientRequestFilter"); - - @Override - public boolean getAsBoolean() { - return IS_REST_CLIENT_AVAILABLE; - } - } - @BuildStep AdditionalBeanBuildItem ensureProducerIsRetained() { return AdditionalBeanBuildItem.builder() @@ -138,11 +130,15 @@ public void transform(TransformationContext context) { })); } - @BuildStep(onlyIf = RestClientAvailable.class) - void registerProvider(BuildProducer additionalIndexed, + @BuildStep + void registerRestClientClassicProvider( + Capabilities capabilities, + BuildProducer additionalIndexed, BuildProducer additionalBeans) { - additionalIndexed.produce(new AdditionalIndexedClassesBuildItem(OpenTelemetryClientFilter.class.getName())); - additionalBeans.produce(new AdditionalBeanBuildItem(OpenTelemetryClientFilter.class)); + if (capabilities.isPresent(Capability.REST_CLIENT) && capabilities.isMissing(Capability.REST_CLIENT_REACTIVE)) { + additionalIndexed.produce(new AdditionalIndexedClassesBuildItem(OpenTelemetryClientFilter.class.getName())); + additionalBeans.produce(new AdditionalBeanBuildItem(OpenTelemetryClientFilter.class)); + } } @BuildStep diff --git a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/QuarkusContextStorage.java b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/QuarkusContextStorage.java index 7452f6dcfcf8a0..e37a29e68ff84c 100644 --- a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/QuarkusContextStorage.java +++ b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/QuarkusContextStorage.java @@ -1,7 +1,6 @@ package io.quarkus.opentelemetry.runtime; import static io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.setContextSafe; -import static io.smallrye.common.vertx.VertxContext.getOrCreateDuplicatedContext; import static io.smallrye.common.vertx.VertxContext.isDuplicatedContext; import org.jboss.logging.Logger; @@ -9,6 +8,7 @@ import io.opentelemetry.context.Context; import io.opentelemetry.context.ContextStorage; import io.opentelemetry.context.Scope; +import io.smallrye.common.vertx.VertxContext; import io.vertx.core.Vertx; /** @@ -118,8 +118,10 @@ public static Context getContext(io.vertx.core.Context vertxContext) { */ public static io.vertx.core.Context getVertxContext() { io.vertx.core.Context context = Vertx.currentContext(); - if (context != null) { - io.vertx.core.Context dc = getOrCreateDuplicatedContext(context); + if (context != null && VertxContext.isOnDuplicatedContext()) { + return context; + } else if (context != null) { + io.vertx.core.Context dc = VertxContext.createNewDuplicatedContext(context); setContextSafe(dc, true); return dc; } diff --git a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/restclient/OpenTelemetryClientFilter.java b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/restclient/OpenTelemetryClientFilter.java index 2da1d59d039f2e..f7e051e15fb51b 100644 --- a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/restclient/OpenTelemetryClientFilter.java +++ b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/restclient/OpenTelemetryClientFilter.java @@ -30,7 +30,9 @@ import io.quarkus.opentelemetry.runtime.QuarkusContextStorage; /** - * A client filter for the JAX-RS Client and MicroProfile REST Client that records OpenTelemetry data. + * A client filter for the JAX-RS Client and MicroProfile REST Client that records OpenTelemetry data. This is only used + * by RESTEast Classic, because the handling implementation is provided by RESTEasy. This is not used by RESTEasy + * Reactive because tracing is handled by Vert.x. */ @Unremovable @Provider @@ -77,9 +79,19 @@ public void filter(final ClientRequestContext request) { if (parentContext == null) { parentContext = io.opentelemetry.context.Context.current(); } + + // For each request, we need a new OTel Context from the **current one** + // the parent context needs to be the one from which the call originates. + if (instrumenter.shouldStart(parentContext, request)) { Context spanContext = instrumenter.start(parentContext, request); - Scope scope = QuarkusContextStorage.INSTANCE.attach(vertxContext, spanContext); + // Create a new scope with an empty termination callback. + Scope scope = new Scope() { + @Override + public void close() { + + } + }; request.setProperty(REST_CLIENT_OTEL_SPAN_CLIENT_CONTEXT, spanContext); request.setProperty(REST_CLIENT_OTEL_SPAN_CLIENT_PARENT_CONTEXT, parentContext); request.setProperty(REST_CLIENT_OTEL_SPAN_CLIENT_SCOPE, scope); diff --git a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/vertx/InstrumenterVertxTracer.java b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/vertx/InstrumenterVertxTracer.java index 272c1a83bb9fda..6fe1bb57cdb3fa 100644 --- a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/vertx/InstrumenterVertxTracer.java +++ b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/vertx/InstrumenterVertxTracer.java @@ -1,7 +1,5 @@ package io.quarkus.opentelemetry.runtime.tracing.vertx; -import static io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.setContextSafe; - import java.util.Map; import java.util.function.BiConsumer; @@ -9,7 +7,6 @@ import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.quarkus.opentelemetry.runtime.QuarkusContextStorage; import io.quarkus.opentelemetry.runtime.tracing.vertx.OpenTelemetryVertxTracer.SpanOperation; -import io.smallrye.common.vertx.VertxContext; import io.vertx.core.Context; import io.vertx.core.MultiMap; import io.vertx.core.http.impl.headers.HeadersAdaptor; @@ -92,10 +89,14 @@ default SpanOperation sendRequest( if (instrumenter.shouldStart(parentContext, (REQ) request)) { io.opentelemetry.context.Context spanContext = instrumenter.start(parentContext, writableHeaders((REQ) request, headers)); - Context duplicatedContext = VertxContext.createNewDuplicatedContext(context); - setContextSafe(duplicatedContext, true); - Scope scope = QuarkusContextStorage.INSTANCE.attach(duplicatedContext, spanContext); - return spanOperation(duplicatedContext, (REQ) request, toMultiMap(headers), spanContext, scope); + // Create a new scope with an empty termination callback. + Scope scope = new Scope() { + @Override + public void close() { + + } + }; + return spanOperation(context, (REQ) request, toMultiMap(headers), spanContext, scope); } return null; diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/RequestLeakDetectionTest.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/RequestLeakDetectionTest.java new file mode 100644 index 00000000000000..b62fe45b9a2f8f --- /dev/null +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/RequestLeakDetectionTest.java @@ -0,0 +1,192 @@ +package io.quarkus.resteasy.reactive.server.test; + +import static org.awaitility.Awaitility.await; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.context.RequestScoped; +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.core.MediaType; + +import org.jboss.shrinkwrap.api.asset.EmptyAsset; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; +import io.restassured.response.ResponseBody; +import io.smallrye.common.vertx.VertxContext; +import io.smallrye.mutiny.Uni; +import io.vertx.core.Context; +import io.vertx.core.Vertx; + +public class RequestLeakDetectionTest { + + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar.addClasses(MyRestAPI.class, MyRequestScopeBean.class, Barrier.class, Task.class) + .addAsManifestResource(EmptyAsset.INSTANCE, "beans.xml")); + + @Inject + Barrier barrier; + + @Test + public void testWithConcurrentCalls() { + List results = new CopyOnWriteArrayList<>(); + int count = 100; + barrier.setMaxConcurrency(count); + for (int i = 0; i < count; i++) { + int c = i; + new Thread(() -> { + ResponseBody body = RestAssured.given().pathParam("val", c).contentType(MediaType.APPLICATION_JSON) + .get("/test/{val}").thenReturn().body(); + String value = body.toString(); + results.add(value); + }).start(); + } + await().until(() -> results.size() == count); + Set asSet = new HashSet<>(results); + Assertions.assertEquals(asSet.size(), count); + } + + @Test + public void testWithConcurrentBlockingCalls() { + List results = new CopyOnWriteArrayList<>(); + int count = 100; + barrier.setMaxConcurrency(count); + for (int i = 0; i < count; i++) { + int c = i; + new Thread(() -> { + ResponseBody body = RestAssured.given().pathParam("val", c).contentType(MediaType.APPLICATION_JSON) + .get("/test/blocking/{val}").thenReturn().body(); + String value = body.toString(); + results.add(value); + }).start(); + } + await().until(() -> results.size() == count); + Set asSet = new HashSet<>(results); + Assertions.assertEquals(asSet.size(), count); + } + + @ApplicationScoped + @Path("/test") + public static class MyRestAPI { + + @Inject + MyRequestScopeBean bean; + + @Inject + Barrier barrier; + + @GET + @Path("/{val}") + public Uni foo(int val) { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + Vertx.currentContext().putLocal("count", val); + bean.setValue(val); + + return Uni.createFrom(). emitter(e -> { + barrier.enqueue(Vertx.currentContext(), () -> { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int r = Vertx.currentContext().getLocal("count"); + Assertions.assertEquals(r, val); + e.complete(bean.getValue()); + }); + }).map(i -> new Foo(Integer.toString(i))); + } + + @GET + @Path("/blocking/{val}") + public Foo blocking(int val) { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + Vertx.currentContext().putLocal("count", val); + bean.setValue(val); + + return Uni.createFrom(). emitter(e -> { + barrier.enqueue(Vertx.currentContext(), () -> { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int r = Vertx.currentContext().getLocal("count"); + Assertions.assertEquals(r, val); + e.complete(bean.getValue()); + }); + }) + .map(i -> new Foo(Integer.toString(i))) + .await().indefinitely(); + } + } + + @ApplicationScoped + public static class Barrier { + + private int level; + + public void setMaxConcurrency(int level) { + this.level = level; + } + + private final AtomicInteger counter = new AtomicInteger(); + private final List tasks = new CopyOnWriteArrayList<>(); + + public void enqueue(Context context, Runnable runnable) { + Task task = new Task(context, runnable); + tasks.add(task); + if (counter.incrementAndGet() >= level) { + for (Task tbr : new ArrayList<>(tasks)) { + tbr.run(); + tasks.remove(tbr); + } + } + } + } + + private static class Task { + private final Context context; + private final Runnable runnable; + + private Task(Context context, Runnable runnable) { + this.context = context; + this.runnable = runnable; + } + + void run() { + context.runOnContext(x -> runnable.run()); + } + } + + @RequestScoped + public static class MyRequestScopeBean { + + private int value = -1; + + public void setValue(int v) { + if (value != -1) { + throw new IllegalStateException("Already initialized"); + } + value = v; + } + + public int getValue() { + return value; + } + + } + + public static class Foo { + + public final String value; + + public Foo(String value) { + this.value = value; + } + } + +} diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/QuarkusResteasyReactiveRequestContext.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/QuarkusResteasyReactiveRequestContext.java index fb47a5faf78e4a..9ca38e98e49421 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/QuarkusResteasyReactiveRequestContext.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/QuarkusResteasyReactiveRequestContext.java @@ -45,6 +45,13 @@ protected void handleRequestScopeActivation() { } } + @Override + protected void requestScopeDeactivated() { + // we intentionally don't call 'CurrentRequestManager.set(null)' + // because there is no need to clear the current request + // as that is backed by a DuplicatedContext and not accessible to other requests anyway + } + protected SecurityContext createSecurityContext() { return new ResteasyReactiveSecurityContext(context); } diff --git a/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/RequestLeakDetectionTest.java b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/RequestLeakDetectionTest.java new file mode 100644 index 00000000000000..d1d2b403783a5e --- /dev/null +++ b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/RequestLeakDetectionTest.java @@ -0,0 +1,309 @@ +package io.quarkus.rest.client.reactive; + +import static io.quarkus.rest.client.reactive.RestClientTestUtil.setUrlForClass; +import static org.awaitility.Awaitility.await; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.context.RequestScoped; +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.core.MediaType; + +import org.eclipse.microprofile.rest.client.inject.RegisterRestClient; +import org.eclipse.microprofile.rest.client.inject.RestClient; +import org.jboss.shrinkwrap.api.asset.EmptyAsset; +import org.jboss.shrinkwrap.api.asset.StringAsset; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; +import io.restassured.response.ResponseBody; +import io.smallrye.common.vertx.VertxContext; +import io.smallrye.mutiny.Uni; +import io.vertx.core.Context; +import io.vertx.core.Vertx; + +public class RequestLeakDetectionTest { + + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addClasses(MyRestAPI.class, MyRequestScopeBean.class, Barrier.class, Task.class, RemoteClient.class, + RemoteService.class) + .addAsManifestResource(EmptyAsset.INSTANCE, "beans.xml") + .addAsResource( + new StringAsset(setUrlForClass(RemoteClient.class)), "application.properties")); + + @Inject + Barrier barrier; + + @ParameterizedTest + @ValueSource(strings = { + "reactive-server-and-client", + "blocking-server-and-reactive-client", + "blocking-server-and-client", + "reactive-server-and-blocking-client" + }) + public void testWithConcurrentCallsWithReactiveClientAndServer(String path) { + List results = new CopyOnWriteArrayList<>(); + int count = 100; + barrier.setMaxConcurrency(count); + for (int i = 0; i < count; i++) { + int c = i; + new Thread(() -> { + ResponseBody body = RestAssured.given().pathParam("val", c).contentType(MediaType.APPLICATION_JSON) + .get("/test/" + path + "/{val}").thenReturn().body(); + String value = body.toString(); + results.add(value); + }).start(); + } + await().until(() -> results.size() == count); + Set asSet = new HashSet<>(results); + Assertions.assertEquals(asSet.size(), count); + } + + @ApplicationScoped + @Path("/test") + public static class MyRestAPI { + + @Inject + MyRequestScopeBean bean; + + @Inject + Barrier barrier; + + @RestClient + RemoteClient client; + + @GET + @Path("/reactive-server-and-client/{val}") + public Uni reactiveServerAndClient(int val) { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + Vertx.currentContext().putLocal("count", val); + bean.setValue(val); + + return Uni.createFrom(). emitter(e -> { + barrier.enqueue(Vertx.currentContext(), () -> { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + Assertions.assertEquals(bean.getValue(), val); + int rBefore = Vertx.currentContext().getLocal("count"); + client.invokeReactive(Integer.toString(val)) + .invoke(s -> { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int rAfter = Vertx.currentContext().getLocal("count"); + Assertions.assertEquals(s, "hello " + rAfter); + Assertions.assertEquals(rBefore, rAfter); + Assertions.assertEquals(rAfter, val); + Assertions.assertEquals(bean.getValue(), val); + }).subscribe().with(x -> e.complete(val), e::fail); + }); + }).map(i -> { + Assertions.assertEquals(bean.getValue(), val); + return new Foo(Integer.toString(i)); + }); + } + + @GET + @Path("/blocking-server-and-reactive-client/{val}") + public Foo blockingServerWithReactiveClient(int val) { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + Vertx.currentContext().putLocal("count", val); + bean.setValue(val); + + return Uni.createFrom(). emitter(e -> { + barrier.enqueue(Vertx.currentContext(), () -> { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int rBefore = Vertx.currentContext().getLocal("count"); + Assertions.assertEquals(bean.getValue(), val); + client.invokeReactive(Integer.toString(val)) + .invoke(s -> { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int rAfter = Vertx.currentContext().getLocal("count"); + Assertions.assertEquals(s, "hello " + rAfter); + Assertions.assertEquals(rBefore, rAfter); + Assertions.assertEquals(rAfter, val); + Assertions.assertEquals(bean.getValue(), val); + }).subscribe().with(x -> e.complete(val), e::fail); + }); + }).map(i -> { + Assertions.assertEquals(bean.getValue(), val); + return new Foo(Integer.toString(i)); + }) + .await().indefinitely(); + } + + @GET + @Path("/blocking-server-and-client/{val}") + public Foo blockingServerAndBlockingClient(int val) { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + Vertx.currentContext().putLocal("count", val); + bean.setValue(val); + + return Uni.createFrom(). emitter(e -> { + barrier.enqueue(Vertx.currentContext(), () -> { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int rBefore = Vertx.currentContext().getLocal("count"); + Assertions.assertEquals(bean.getValue(), val); + String s = client.invokeBlocking(Integer.toString(val)); + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int rAfter = Vertx.currentContext().getLocal("count"); + Assertions.assertEquals(s, "hello " + rAfter); + Assertions.assertEquals(rBefore, rAfter); + Assertions.assertEquals(rAfter, val); + Assertions.assertEquals(bean.getValue(), val); + e.complete(val); + }, true); + }).map(i -> { + Assertions.assertEquals(bean.getValue(), val); + return new Foo(Integer.toString(i)); + }) + .await().indefinitely(); + } + + @GET + @Path("/reactive-server-and-blocking-client/{val}") + public Uni reactiveServerWithBlockingClient(int val) { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + Vertx.currentContext().putLocal("count", val); + bean.setValue(val); + + return Uni.createFrom(). emitter(e -> { + barrier.enqueue(Vertx.currentContext(), () -> { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int rBefore = Vertx.currentContext().getLocal("count"); + Assertions.assertEquals(bean.getValue(), val); + String s = client.invokeBlocking(Integer.toString(val)); + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int rAfter = Vertx.currentContext().getLocal("count"); + Assertions.assertEquals(s, "hello " + rAfter); + Assertions.assertEquals(rBefore, rAfter); + Assertions.assertEquals(rAfter, val); + Assertions.assertEquals(bean.getValue(), val); + e.complete(val); + }, true); + }).map(i -> { + Assertions.assertEquals(bean.getValue(), val); + return new Foo(Integer.toString(i)); + }); + } + } + + @Path("/remote") + public static class RemoteService { + + @GET + @Path("/reactive/{name}") + public Uni hello(String name) { + return Uni.createFrom().item("hello " + name); + } + + @GET + @Path("/blocking/{name}") + public String helloBlocking(String name) { + return "hello " + name; + } + } + + @RegisterRestClient + @Path("/remote") + public interface RemoteClient { + @GET + @Path("/reactive/{name}") + Uni invokeReactive(String name); + + @GET + @Path("/blocking/{name}") + String invokeBlocking(String name); + } + + @ApplicationScoped + public static class Barrier { + + private int level; + + public void setMaxConcurrency(int level) { + this.level = level; + } + + private final AtomicInteger counter = new AtomicInteger(); + private final List tasks = new CopyOnWriteArrayList<>(); + + public void enqueue(Context context, Runnable runnable) { + enqueue(context, runnable, false); + } + + public void enqueue(Context context, Runnable runnable, boolean blocking) { + Task task = new Task(context, runnable, blocking); + tasks.add(task); + if (counter.incrementAndGet() >= level) { + for (Task tbr : new ArrayList<>(tasks)) { + tbr.run(); + tasks.remove(tbr); + } + } + } + } + + private static class Task { + private final Context context; + private final Runnable runnable; + + private final boolean blocking; + + private Task(Context context, Runnable runnable, boolean blocking) { + this.context = context; + this.runnable = runnable; + this.blocking = blocking; + } + + void run() { + if (blocking) { + context.executeBlocking(p -> { + runnable.run(); + p.complete(); + }); + } else { + context.runOnContext(x -> runnable.run()); + } + } + } + + @RequestScoped + public static class MyRequestScopeBean { + + private int value = -1; + + public void setValue(int v) { + if (value != -1) { + throw new IllegalStateException("Already initialized"); + } + value = v; + } + + public int getValue() { + return value; + } + + } + + public static class Foo { + + public final String value; + + public Foo(String value) { + this.value = value; + } + } + +} diff --git a/extensions/resteasy-reactive/rest-client-reactive/runtime/pom.xml b/extensions/resteasy-reactive/rest-client-reactive/runtime/pom.xml index b80c852a476bbd..fb0ff6936531a9 100644 --- a/extensions/resteasy-reactive/rest-client-reactive/runtime/pom.xml +++ b/extensions/resteasy-reactive/rest-client-reactive/runtime/pom.xml @@ -79,7 +79,7 @@ quarkus-extension-maven-plugin - io.quarkus.rest.client + io.quarkus.rest.client.reactive diff --git a/extensions/smallrye-graphql/deployment/pom.xml b/extensions/smallrye-graphql/deployment/pom.xml index e2feaa37733a64..81a7272060baed 100644 --- a/extensions/smallrye-graphql/deployment/pom.xml +++ b/extensions/smallrye-graphql/deployment/pom.xml @@ -83,7 +83,7 @@ io.quarkus - quarkus-resteasy-deployment + quarkus-resteasy-reactive-deployment test diff --git a/extensions/smallrye-graphql/deployment/src/test/java/io/quarkus/smallrye/graphql/deployment/RequestLeakDetectionTest.java b/extensions/smallrye-graphql/deployment/src/test/java/io/quarkus/smallrye/graphql/deployment/RequestLeakDetectionTest.java new file mode 100644 index 00000000000000..a05c20265ce85f --- /dev/null +++ b/extensions/smallrye-graphql/deployment/src/test/java/io/quarkus/smallrye/graphql/deployment/RequestLeakDetectionTest.java @@ -0,0 +1,185 @@ +package io.quarkus.smallrye.graphql.deployment; + +import static org.awaitility.Awaitility.await; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.context.RequestScoped; +import javax.inject.Inject; + +import org.eclipse.microprofile.graphql.GraphQLApi; +import org.eclipse.microprofile.graphql.Query; +import org.eclipse.microprofile.graphql.Source; +import org.jboss.shrinkwrap.api.asset.EmptyAsset; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; +import io.restassured.response.ResponseBody; +import io.smallrye.common.vertx.VertxContext; +import io.smallrye.mutiny.Uni; +import io.vertx.core.Context; +import io.vertx.core.Vertx; + +public class RequestLeakDetectionTest extends AbstractGraphQLTest { + + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest().withApplicationRoot((jar) -> jar + .addClasses(RequestLeakDetectionTest.MyGraphQLApi.class, MyRequestScopeBean.class, Barrier.class, Task.class) + .addAsManifestResource(EmptyAsset.INSTANCE, "beans.xml")); + + @Inject + Barrier barrier; + + @Test + public void testWithConcurrentCalls() { + List results = new CopyOnWriteArrayList<>(); + List nested = new CopyOnWriteArrayList<>(); + List nestedUni = new CopyOnWriteArrayList<>(); + int count = 100; + barrier.setMaxConcurrency(count); + for (int i = 0; i < count; i++) { + int c = i; + new Thread(() -> { + String query = getPayload("{ foo(val:" + c + ") { value nested{ value } nestedUni { value } } }"); + ResponseBody body = RestAssured.given().body(query).contentType(MEDIATYPE_JSON).post("/graphql/") + .thenReturn().body(); + String value = body.path("data.foo.value"); + String nestedValue = body.path("data.foo.nested.value"); + String nestedUniValue = body.path("data.foo.nestedUni.value"); + results.add(value); + nested.add(nestedValue); + nestedUni.add(nestedUniValue); + }).start(); + } + await().until(() -> results.size() == count); + await().until(() -> nested.size() == count); + await().until(() -> nestedUni.size() == count); + Set asSet = new HashSet<>(results); + Assertions.assertEquals(count, asSet.size()); + asSet = new HashSet<>(nested); + Assertions.assertEquals(count, asSet.size()); + asSet = new HashSet<>(nestedUni); + Assertions.assertEquals(count, asSet.size()); + } + + @ApplicationScoped + @GraphQLApi + public static class MyGraphQLApi { + + @Inject + MyRequestScopeBean bean; + + @Inject + Barrier barrier; + + @Query + public Uni foo(int val) { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + Vertx.currentContext().putLocal("count", val); + bean.setValue(val); + + return Uni.createFrom(). emitter(e -> { + barrier.enqueue(Vertx.currentContext(), () -> { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int r = Vertx.currentContext().getLocal("count"); + Assertions.assertEquals(r, val); + e.complete(bean.getValue()); + }); + }).map(i -> new Foo(Integer.toString(i))); + } + + public Foo nested(@Source Foo foo) { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int r = Vertx.currentContext().getLocal("count"); + String rAsString = Integer.toString(r); + Assertions.assertEquals(rAsString, foo.value); + Assertions.assertEquals(bean.getValue(), r); + return new Foo("source field on foo " + foo.value); + } + + public Uni nestedUni(@Source Foo foo) { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + int r = Vertx.currentContext().getLocal("count"); + String rAsString = Integer.toString(r); + Assertions.assertEquals(rAsString, foo.value); + Assertions.assertEquals(bean.getValue(), r); + return Uni.createFrom().item(new Foo("uni source field on foo " + foo.value)); + } + + } + + @ApplicationScoped + public static class Barrier { + + private int level; + + public void setMaxConcurrency(int level) { + this.level = level; + } + + private final AtomicInteger counter = new AtomicInteger(); + private final List tasks = new CopyOnWriteArrayList<>(); + + public void enqueue(Context context, Runnable runnable) { + Task task = new Task(context, runnable); + tasks.add(task); + if (counter.incrementAndGet() >= level) { + for (Task tbr : new ArrayList<>(tasks)) { + tbr.run(); + tasks.remove(tbr); + } + } + } + } + + private static class Task { + private final Context context; + private final Runnable runnable; + + private Task(Context context, Runnable runnable) { + this.context = context; + this.runnable = runnable; + } + + void run() { + context.runOnContext(x -> runnable.run()); + } + } + + @RequestScoped + public static class MyRequestScopeBean { + + private int value = -1; + + public void setValue(int v) { + if (value != -1) { + throw new IllegalStateException("Already initialized"); + } + value = v; + } + + public int getValue() { + return value; + } + + } + + public static class Foo { + + public final String value; + + public Foo(String value) { + this.value = value; + } + } + +} diff --git a/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java index 15903bb7dec400..e90ce7c644bd01 100644 --- a/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java +++ b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java @@ -20,6 +20,7 @@ import io.quarkus.arc.deployment.AutoAddScopeBuildItem; import io.quarkus.arc.deployment.BeanRegistrationPhaseBuildItem; import io.quarkus.arc.deployment.BeanRegistrationPhaseBuildItem.BeanConfiguratorBuildItem; +import io.quarkus.arc.deployment.CurrentContextFactoryBuildItem; import io.quarkus.arc.deployment.UnremovableBeanBuildItem; import io.quarkus.arc.deployment.UnremovableBeanBuildItem.BeanClassAnnotationExclusion; import io.quarkus.arc.processor.AnnotationStore; @@ -98,6 +99,12 @@ VertxBuildItem build(CoreVertxBuildItem vertx, VertxRecorder recorder, return new VertxBuildItem(recorder.forceStart(vertx.getVertx())); } + @BuildStep + @Record(ExecutionTime.STATIC_INIT) + CurrentContextFactoryBuildItem currentContextFactory(VertxRecorder recorder) { + return new CurrentContextFactoryBuildItem(recorder.currentContextFactory()); + } + @BuildStep public UnremovableBeanBuildItem unremovableBeans() { return new UnremovableBeanBuildItem(new BeanClassAnnotationExclusion(CONSUME_EVENT)); diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxCurrentContextFactory.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxCurrentContextFactory.java new file mode 100644 index 00000000000000..a2806ebc628994 --- /dev/null +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxCurrentContextFactory.java @@ -0,0 +1,58 @@ +package io.quarkus.vertx.runtime; + +import java.lang.annotation.Annotation; + +import io.netty.util.concurrent.FastThreadLocal; +import io.quarkus.arc.CurrentContext; +import io.quarkus.arc.CurrentContextFactory; +import io.quarkus.arc.InjectableContext; +import io.quarkus.arc.InjectableContext.ContextState; +import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle; +import io.smallrye.common.vertx.VertxContext; +import io.vertx.core.Context; +import io.vertx.core.Vertx; + +public class VertxCurrentContextFactory implements CurrentContextFactory { + + @Override + public CurrentContext create(Class scope) { + return new VertxCurrentContext<>(); + } + + private static final class VertxCurrentContext implements CurrentContext { + + private final FastThreadLocal fallback = new FastThreadLocal<>(); + + @Override + public T get() { + Context context = Vertx.currentContext(); + if (context != null && VertxContext.isDuplicatedContext(context)) { + return context.getLocal(this); + } + return fallback.get(); + } + + @Override + public void set(T state) { + Context context = Vertx.currentContext(); + if (context != null && VertxContext.isDuplicatedContext(context)) { + VertxContextSafetyToggle.setContextSafe(context, true); + context.putLocal(this, state); + } else { + fallback.set(state); + } + } + + @Override + public void remove() { + Context context = Vertx.currentContext(); + if (context != null && VertxContext.isDuplicatedContext(context)) { + // NOOP - the DC should not be shared. + // context.removeLocal(this); + } else { + fallback.remove(); + } + } + + } +} diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxRecorder.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxRecorder.java index 9fdfd9def6ec9f..12dbb305919cf0 100644 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxRecorder.java +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxRecorder.java @@ -14,6 +14,7 @@ import org.jboss.logging.Logger; +import io.quarkus.arc.CurrentContextFactory; import io.quarkus.runtime.LaunchMode; import io.quarkus.runtime.RuntimeValue; import io.quarkus.runtime.ShutdownContext; @@ -66,6 +67,10 @@ public void run() { } } + public RuntimeValue currentContextFactory() { + return new RuntimeValue<>(new VertxCurrentContextFactory()); + } + public static Vertx getVertx() { return vertx; } diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/ClientRequestContextImpl.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/ClientRequestContextImpl.java index d31f5eba7ee000..56bdb0ca7e63f1 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/ClientRequestContextImpl.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/ClientRequestContextImpl.java @@ -5,6 +5,7 @@ import io.smallrye.common.vertx.VertxContext; import io.vertx.core.Context; +import io.vertx.core.Vertx; import java.io.OutputStream; import java.lang.annotation.Annotation; import java.lang.reflect.Type; @@ -54,10 +55,16 @@ public ClientRequestContextImpl(RestClientRequestContext restClientRequestContex this.configuration = configuration; this.headersMap = new ClientRequestHeadersMap(); //restClientRequestContext.requestHeaders.getHeaders() + // TODO This needs to be challenged: // Always create a duplicated context because each REST Client invocation must have its own context // A separate context allows integrations like OTel to create a separate Span for each invocation (expected) - Context current = client.vertx.getOrCreateContext(); - this.context = VertxContext.createNewDuplicatedContext(current); + Context ctxt = Vertx.currentContext(); + if (ctxt != null && VertxContext.isDuplicatedContext(ctxt)) { + this.context = ctxt; + } else { + Context current = client.vertx.getOrCreateContext(); + this.context = VertxContext.createNewDuplicatedContext(current); + } restClientRequestContext.properties.put(VERTX_CONTEXT_PROPERTY, context); } diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/RestClientRequestContext.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/RestClientRequestContext.java index d3216ecb939a73..24db57060daced 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/RestClientRequestContext.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/RestClientRequestContext.java @@ -527,4 +527,9 @@ private Boolean getBooleanProperty(String name, Boolean defaultValue) { } return defaultValue; } + + @Override + protected boolean isRequestScopeManagementRequired() { + return false; + } } diff --git a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/core/AbstractResteasyReactiveContext.java b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/core/AbstractResteasyReactiveContext.java index 345f4cf991d461..19f98396b6824c 100644 --- a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/core/AbstractResteasyReactiveContext.java +++ b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/core/AbstractResteasyReactiveContext.java @@ -130,7 +130,7 @@ public void run() { boolean processingSuspended = false; //if this is a blocking target we don't activate for the initial non-blocking part //unless there are pre-mapping filters as these may require CDI - boolean disasociateRequestScope = false; + boolean disassociateRequestScope = false; boolean aborted = false; try { while (position < handlers.length) { @@ -146,7 +146,7 @@ public void run() { running = true; if (isRequestScopeManagementRequired()) { if (requestScopeActivated) { - disasociateRequestScope = true; + disassociateRequestScope = true; requestScopeActivated = false; } } else { @@ -190,7 +190,7 @@ public void run() { } close(); } else { - if (disasociateRequestScope) { + if (disassociateRequestScope) { requestScopeDeactivated(); currentRequestScope.deactivate(); } diff --git a/integration-tests/opentelemetry-reactive/src/main/java/io/quarkus/it/opentelemetry/reactive/ReactiveResource.java b/integration-tests/opentelemetry-reactive/src/main/java/io/quarkus/it/opentelemetry/reactive/ReactiveResource.java index a0f6bc2099bb64..a1018019cdafdb 100644 --- a/integration-tests/opentelemetry-reactive/src/main/java/io/quarkus/it/opentelemetry/reactive/ReactiveResource.java +++ b/integration-tests/opentelemetry-reactive/src/main/java/io/quarkus/it/opentelemetry/reactive/ReactiveResource.java @@ -30,9 +30,18 @@ public Uni helloGet(@QueryParam("name") String name) { } @GET - @Path("/multiple") - public Uni helloMultiple() { - return Uni.combine().all().unis(client.helloGet("Naruto"), client.helloGet("Goku")) + @Path("/multiple-chain") + public Uni helloMultipleUsingChain() { + return client.helloGet("Naruto") + .chain(s1 -> client.helloGet("Goku").map(s2 -> s1 + " and " + s2)); + } + + @GET + @Path("/multiple-combine") + public Uni helloMultipleUsingCombine() { + return Uni.combine().all().unis( + client.helloGet("Naruto"), + client.helloGet("Goku")) .combinedWith((s, s2) -> s + " and " + s2); } diff --git a/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveTest.java b/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveTest.java index 277ae2fcb5101a..61bb0108934fd3 100644 --- a/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveTest.java +++ b/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveTest.java @@ -1,20 +1,29 @@ package io.quarkus.it.opentelemetry.reactive; +import static io.opentelemetry.api.trace.SpanKind.CLIENT; +import static io.opentelemetry.api.trace.SpanKind.INTERNAL; +import static io.opentelemetry.api.trace.SpanKind.SERVER; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_TARGET; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_URL; import static io.restassured.RestAssured.given; import static io.restassured.RestAssured.when; import static java.net.HttpURLConnection.HTTP_OK; +import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; import static org.awaitility.Awaitility.await; import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import io.opentelemetry.api.trace.SpanKind; import io.quarkus.test.junit.QuarkusTest; import io.restassured.common.mapper.TypeRef; @@ -29,7 +38,6 @@ void reset() { @Test void get() { given() - .contentType("application/json") .when() .queryParam("name", "Naruto") .get("/reactive") @@ -46,7 +54,6 @@ void get() { @Test void post() { given() - .contentType("application/json") .when() .body("Naruto") .post("/reactive") @@ -61,11 +68,10 @@ void post() { } @Test - void multiple() { + void multipleUsingChain() { given() - .contentType("application/json") .when() - .get("/reactive/multiple") + .get("/reactive/multiple-chain") .then() .statusCode(200) .body(equalTo("Hello Naruto and Hello Goku")); @@ -75,10 +81,106 @@ void multiple() { List> spans = getSpans(); assertEquals(7, spans.size()); assertEquals(1, spans.stream().map(map -> map.get("traceId")).collect(toSet()).size()); + + // First span is the call getting into the server. It does not have a parent span. + Map parent = getSpanByKindAndParentId(spans, SERVER, "0000000000000000"); + + // We should get 2 client spans originated by the server + List> clientSpans = getSpansByKindAndParentId(spans, CLIENT, parent.get("spanId")); + assertEquals(2, clientSpans.size()); + + // Each client calls the server and programmatically create a span, so each have a server and an internal span + + // Naruto Span + Optional> narutoSpan = clientSpans.stream() + .filter(map -> ((String) ((Map) map.get("attributes")).get(HTTP_URL.getKey())).contains("Naruto")) + .findFirst(); + assertTrue(narutoSpan.isPresent()); + Map naruto = narutoSpan.get(); + + Map narutoServer = getSpanByKindAndParentId(spans, SERVER, naruto.get("spanId")); + assertEquals("/reactive?name=Naruto", ((Map) narutoServer.get("attributes")).get(HTTP_TARGET.getKey())); + Map narutoInternal = getSpanByKindAndParentId(spans, INTERNAL, narutoServer.get("spanId")); + assertEquals("helloGet", narutoInternal.get("name")); + + // Goku Span + Optional> gokuSpan = clientSpans.stream() + .filter(map -> ((String) ((Map) map.get("attributes")).get(HTTP_URL.getKey())).contains("Goku")) + .findFirst(); + assertTrue(gokuSpan.isPresent()); + Map goku = gokuSpan.get(); + + Map gokuServer = getSpanByKindAndParentId(spans, SERVER, goku.get("spanId")); + assertEquals("/reactive?name=Goku", ((Map) gokuServer.get("attributes")).get(HTTP_TARGET.getKey())); + Map gokuInternal = getSpanByKindAndParentId(spans, INTERNAL, gokuServer.get("spanId")); + assertEquals("helloGet", gokuInternal.get("name")); + } + + @Test + void multipleUsingCombine() { + given() + .when() + .get("/reactive/multiple-combine") + .then() + .statusCode(200) + .body(equalTo("Hello Naruto and Hello Goku")); + + await().atMost(5, TimeUnit.SECONDS).until(() -> getSpans().size() == 7); + + List> spans = getSpans(); + assertEquals(7, spans.size()); + assertEquals(1, spans.stream().map(map -> map.get("traceId")).collect(toSet()).size()); + + // First span is the call getting into the server. It does not have a parent span. + Map parent = getSpanByKindAndParentId(spans, SERVER, "0000000000000000"); + + // We should get 2 client spans originated by the server + List> clientSpans = getSpansByKindAndParentId(spans, CLIENT, parent.get("spanId")); + assertEquals(2, clientSpans.size()); + + // Each client calls the server and programmatically create a span, so each have a server and an internal span + + // Naruto Span + Optional> narutoSpan = clientSpans.stream() + .filter(map -> ((String) ((Map) map.get("attributes")).get(HTTP_URL.getKey())).contains("Naruto")) + .findFirst(); + assertTrue(narutoSpan.isPresent()); + Map naruto = narutoSpan.get(); + + Map narutoServer = getSpanByKindAndParentId(spans, SERVER, naruto.get("spanId")); + assertEquals("/reactive?name=Naruto", ((Map) narutoServer.get("attributes")).get(HTTP_TARGET.getKey())); + Map narutoInternal = getSpanByKindAndParentId(spans, INTERNAL, narutoServer.get("spanId")); + assertEquals("helloGet", narutoInternal.get("name")); + + // Goku Span + Optional> gokuSpan = clientSpans.stream() + .filter(map -> ((String) ((Map) map.get("attributes")).get(HTTP_URL.getKey())).contains("Goku")) + .findFirst(); + assertTrue(gokuSpan.isPresent()); + Map goku = gokuSpan.get(); + + Map gokuServer = getSpanByKindAndParentId(spans, SERVER, goku.get("spanId")); + assertEquals("/reactive?name=Goku", ((Map) gokuServer.get("attributes")).get(HTTP_TARGET.getKey())); + Map gokuInternal = getSpanByKindAndParentId(spans, INTERNAL, gokuServer.get("spanId")); + assertEquals("helloGet", gokuInternal.get("name")); } private static List> getSpans() { return when().get("/export").body().as(new TypeRef<>() { }); } + + private static Map getSpanByKindAndParentId(List> spans, SpanKind kind, + Object parentSpanId) { + List> span = getSpansByKindAndParentId(spans, kind, parentSpanId); + assertEquals(1, span.size()); + return span.get(0); + } + + private static List> getSpansByKindAndParentId(List> spans, SpanKind kind, + Object parentSpanId) { + return spans.stream() + .filter(map -> map.get("kind").equals(kind.toString())) + .filter(map -> map.get("parentSpanId").equals(parentSpanId)).collect(toList()); + } }