diff --git a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/QuarkusContextStorage.java b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/QuarkusContextStorage.java index ba7ad80b76113..fb8b1d0ad1adb 100644 --- a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/QuarkusContextStorage.java +++ b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/QuarkusContextStorage.java @@ -112,7 +112,7 @@ public static Context getContext(io.vertx.core.Context vertxContext) { * * @return a duplicated Vert.x Context or null. */ - private static io.vertx.core.Context getVertxContext() { + public static io.vertx.core.Context getVertxContext() { io.vertx.core.Context context = Vertx.currentContext(); if (context != null) { io.vertx.core.Context dc = getOrCreateDuplicatedContext(context); diff --git a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/restclient/OpenTelemetryClientFilter.java b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/restclient/OpenTelemetryClientFilter.java index 108b3d5f2873f..e6e07d13d028d 100644 --- a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/restclient/OpenTelemetryClientFilter.java +++ b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/restclient/OpenTelemetryClientFilter.java @@ -12,7 +12,6 @@ import javax.ws.rs.client.ClientRequestFilter; import javax.ws.rs.client.ClientResponseContext; import javax.ws.rs.client.ClientResponseFilter; -import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.UriBuilder; import javax.ws.rs.ext.Provider; @@ -28,17 +27,10 @@ import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesGetter; import io.opentelemetry.instrumentation.api.instrumenter.http.HttpSpanStatusExtractor; import io.quarkus.arc.Unremovable; +import io.quarkus.opentelemetry.runtime.QuarkusContextStorage; /** * A client filter for the JAX-RS Client and MicroProfile REST Client that records OpenTelemetry data. - * - * For the Resteasy Reactive Client, we skip the OpenTelemetry registration, since this can be handled by the - * {@link io.quarkus.opentelemetry.runtime.tracing.vertx.OpenTelemetryVertxTracer}. In theory, this wouldn't be an - * issue, because the OpenTelemetry Instrumenter detects two Client Span and merge both together, but they need to be - * executed with the same OpenTelemetry Context. Right now, the Reactive REST Client filters are executed outside the - * Vert.x Context, so we are unable to propagate the OpenTelemetry Context. This is also not a big issue, because the - * correct OpenTelemetry data will be populated in Vert.x. The only missing piece is the route name available in - * io.quarkus.resteasy.reactive.server.runtime.observability.ObservabilityHandler, which is not propagated to Vert.x. */ @Unremovable @Provider @@ -47,7 +39,15 @@ public class OpenTelemetryClientFilter implements ClientRequestFilter, ClientRes public static final String REST_CLIENT_OTEL_SPAN_CLIENT_PARENT_CONTEXT = "otel.span.client.parentContext"; public static final String REST_CLIENT_OTEL_SPAN_CLIENT_SCOPE = "otel.span.client.scope"; - private Instrumenter instrumenter; + /** + * Property stored in the Client Request context to retrieve the captured Vert.x context. + * This context is captured and stored by the Reactive REST Client. + * + * We use this property to avoid having to depend on the Reactive REST Client explicitly. + */ + private static final String VERTX_CONTEXT_PROPERTY = "__context"; + + private final Instrumenter instrumenter; // RESTEasy requires no-arg constructor for CDI injection: https://issues.redhat.com/browse/RESTEASY-1538 // In Reactive Rest Client this is the constructor called. In the classic is the next one with injection. @@ -72,20 +72,25 @@ public OpenTelemetryClientFilter(final OpenTelemetry openTelemetry) { @Override public void filter(final ClientRequestContext request) { - if (isReactiveClient(request)) { - return; - } - Context parentContext = Context.current(); if (instrumenter.shouldStart(parentContext, request)) { Context spanContext = instrumenter.start(parentContext, request); - Scope scope = spanContext.makeCurrent(); + Scope scope = QuarkusContextStorage.INSTANCE.attach(getVertxContext(request), spanContext); request.setProperty(REST_CLIENT_OTEL_SPAN_CLIENT_CONTEXT, spanContext); request.setProperty(REST_CLIENT_OTEL_SPAN_CLIENT_PARENT_CONTEXT, parentContext); request.setProperty(REST_CLIENT_OTEL_SPAN_CLIENT_SCOPE, scope); } } + private static io.vertx.core.Context getVertxContext(final ClientRequestContext request) { + io.vertx.core.Context vertxContext = (io.vertx.core.Context) request.getProperty(VERTX_CONTEXT_PROPERTY); + if (vertxContext == null) { + return QuarkusContextStorage.getVertxContext(); + } else { + return vertxContext; + } + } + @Override public void filter(final ClientRequestContext request, final ClientResponseContext response) { Scope scope = (Scope) request.getProperty(REST_CLIENT_OTEL_SPAN_CLIENT_SCOPE); @@ -106,7 +111,7 @@ public void filter(final ClientRequestContext request, final ClientResponseConte } static boolean isReactiveClient(final ClientRequestContext request) { - return "Resteasy Reactive Client".equals(request.getHeaderString(HttpHeaders.USER_AGENT)); + return request.getProperty(VERTX_CONTEXT_PROPERTY) != null; } private static class ClientRequestContextTextMapSetter implements TextMapSetter { diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/QuarkusResteasyReactiveRequestContext.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/QuarkusResteasyReactiveRequestContext.java index ccc61ff03464c..4496dd8b19f1f 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/QuarkusResteasyReactiveRequestContext.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/QuarkusResteasyReactiveRequestContext.java @@ -14,6 +14,7 @@ import io.quarkus.security.identity.SecurityIdentity; import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle; import io.quarkus.vertx.http.runtime.security.QuarkusHttpUser; +import io.smallrye.common.vertx.VertxContext; import io.vertx.ext.web.RoutingContext; public class QuarkusResteasyReactiveRequestContext extends VertxResteasyReactiveRequestContext { @@ -27,7 +28,9 @@ public QuarkusResteasyReactiveRequestContext(Deployment deployment, ProvidersImp CurrentIdentityAssociation currentIdentityAssociation) { super(deployment, providers, context, requestContext, handlerChain, abortHandlerChain, devModeTccl); this.association = currentIdentityAssociation; - VertxContextSafetyToggle.setCurrentContextSafe(true); + if (VertxContext.isOnDuplicatedContext()) { + VertxContextSafetyToggle.setCurrentContextSafe(true); + } } protected void handleRequestScopeActivation() { diff --git a/independent-projects/resteasy-reactive/client/runtime/pom.xml b/independent-projects/resteasy-reactive/client/runtime/pom.xml index 645ed6b1b2447..f0d2dfcaea241 100644 --- a/independent-projects/resteasy-reactive/client/runtime/pom.xml +++ b/independent-projects/resteasy-reactive/client/runtime/pom.xml @@ -44,6 +44,10 @@ io.vertx vertx-web-client + + io.smallrye.common + smallrye-common-vertx-context + org.junit.jupiter diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/handlers/ClientSendRequestHandler.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/handlers/ClientSendRequestHandler.java index f32b8bd54ef41..cd26798627e55 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/handlers/ClientSendRequestHandler.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/handlers/ClientSendRequestHandler.java @@ -5,6 +5,7 @@ import io.smallrye.mutiny.Uni; import io.smallrye.stork.Stork; import io.smallrye.stork.api.ServiceInstance; +import io.vertx.core.Context; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.MultiMap; @@ -25,6 +26,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.Executor; import java.util.function.Consumer; import java.util.function.Function; import javax.ws.rs.InternalServerErrorException; @@ -40,6 +42,7 @@ import org.jboss.resteasy.reactive.client.api.LoggingScope; import org.jboss.resteasy.reactive.client.api.QuarkusRestClientProperties; import org.jboss.resteasy.reactive.client.impl.AsyncInvokerImpl; +import org.jboss.resteasy.reactive.client.impl.ClientRequestContextImpl; import org.jboss.resteasy.reactive.client.impl.RestClientRequestContext; import org.jboss.resteasy.reactive.client.impl.multipart.PausableHttpPostRequestEncoder; import org.jboss.resteasy.reactive.client.impl.multipart.QuarkusMultipartForm; @@ -72,7 +75,30 @@ public void handle(RestClientRequestContext requestContext) { return; } requestContext.suspend(); - Uni future = createRequest(requestContext); + Uni future = createRequest(requestContext) + .runSubscriptionOn(new Executor() { + @Override + public void execute(Runnable command) { + Context current = Vertx.currentContext(); + ClientRequestContextImpl clientRequestContext = requestContext.getClientRequestContext(); + Context captured = null; + if (clientRequestContext != null) { + captured = clientRequestContext.getContext(); + } + if (current == captured || captured == null) { + // No need to switch to another context. + command.run(); + } else { + // Switch back to the captured context + captured.runOnContext(new Handler() { + @Override + public void handle(Void ignored) { + command.run(); + } + }); + } + } + }); // DNS failures happen before we send the request future.subscribe().with(new Consumer<>() { @@ -91,6 +117,7 @@ public void accept(HttpClientRequest httpClientRequest) { Pipe pipe = actualEntity.pipe(); // Shouldn't this be called in an earlier phase ? requestPromise.future().onComplete(ar -> { + if (ar.succeeded()) { HttpClientRequest req = ar.result(); if (httpClientRequest.headers() == null @@ -109,7 +136,6 @@ public void accept(HttpClientRequest httpClientRequest) { } }); sent = httpClientRequest.response(); - requestPromise.complete(httpClientRequest); } catch (Throwable e) { reportFinish(e, requestContext); diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/ClientRequestContextImpl.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/ClientRequestContextImpl.java index 3fac66b0cf13c..faff2cfa99bf1 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/ClientRequestContextImpl.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/ClientRequestContextImpl.java @@ -3,6 +3,8 @@ import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; +import io.smallrye.common.vertx.VertxContext; +import io.vertx.core.Context; import java.io.OutputStream; import java.lang.annotation.Annotation; import java.lang.reflect.Type; @@ -43,7 +45,7 @@ public class ClientRequestContextImpl implements ResteasyReactiveClientRequestCo private final ConfigurationImpl configuration; private final RestClientRequestContext restClientRequestContext; private final ClientRequestHeadersMap headersMap; - private OutputStream entityStream; + private final Context context; public ClientRequestContextImpl(RestClientRequestContext restClientRequestContext, ClientImpl client, ConfigurationImpl configuration) { @@ -51,6 +53,16 @@ public ClientRequestContextImpl(RestClientRequestContext restClientRequestContex this.client = client; this.configuration = configuration; this.headersMap = new ClientRequestHeadersMap(); //restClientRequestContext.requestHeaders.getHeaders() + + // Capture or create a duplicated context, and store it. + Context current = client.vertx.getOrCreateContext(); + this.context = VertxContext.getOrCreateDuplicatedContext(current); + restClientRequestContext.properties.put(VERTX_CONTEXT_PROPERTY, context); + } + + @Override + public Context getContext() { + return context; } @Override diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/spi/ResteasyReactiveClientRequestContext.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/spi/ResteasyReactiveClientRequestContext.java index 773fead812861..0050dcaab639d 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/spi/ResteasyReactiveClientRequestContext.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/spi/ResteasyReactiveClientRequestContext.java @@ -1,12 +1,27 @@ package org.jboss.resteasy.reactive.client.spi; +import io.vertx.core.Context; import javax.ws.rs.client.ClientRequestContext; public interface ResteasyReactiveClientRequestContext extends ClientRequestContext { + /** + * The property used to store the (duplicated) vert.x context with the request. + * This context is captured when the ResteasyReactiveClientRequestContext instance is created. + * If, at that moment, there is no context, a new duplicated context is created. + * If, we are executed on a root context, it creates a new duplicated context from it. + * Otherwise, (we are already on a duplicated context), it captures it. + */ + String VERTX_CONTEXT_PROPERTY = "__context"; + void suspend(); void resume(); void resume(Throwable t); + + /** + * @return the captured or created duplicated context. See {@link #VERTX_CONTEXT_PROPERTY} for details. + */ + Context getContext(); } diff --git a/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveClientTest.java b/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveClientTest.java index 06f6e6a1acafc..6c68ee2098f2f 100644 --- a/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveClientTest.java +++ b/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveClientTest.java @@ -71,10 +71,8 @@ void get() { assertEquals(HttpMethod.GET.name(), ((Map) server.get("attributes")).get(HTTP_METHOD.getKey())); assertEquals(SpanKind.CLIENT.toString(), client.get("kind")); - // TODO - radcortez - Requires a fix to pass in the UrlPathTemplate in the Vert.x Context. Check: - // io.quarkus.resteasy.reactive.server.runtime.observability.ObservabilityHandler - // org.jboss.resteasy.reactive.client.AsyncResultUni - //assertEquals("reactive", client.get("name")); + assertEquals("/reactive", client.get("name")); + assertEquals(HTTP_OK, ((Map) client.get("attributes")).get(HTTP_STATUS_CODE.getKey())); assertEquals(HttpMethod.GET.name(), ((Map) client.get("attributes")).get(HTTP_METHOD.getKey())); } @@ -107,10 +105,8 @@ void post() { assertEquals(HttpMethod.POST.name(), ((Map) server.get("attributes")).get(HTTP_METHOD.getKey())); assertEquals(SpanKind.CLIENT.toString(), client.get("kind")); - // TODO - radcortez - Requires a fix to pass in the UrlPathTemplate in the Vert.x Context. Check: - // io.quarkus.resteasy.reactive.server.runtime.observability.ObservabilityHandler - // org.jboss.resteasy.reactive.client.AsyncResultUni - //assertEquals("reactive", client.get("name")); + assertEquals("/reactive", client.get("name")); + assertEquals(HTTP_OK, ((Map) client.get("attributes")).get(HTTP_STATUS_CODE.getKey())); assertEquals(HttpMethod.POST.name(), ((Map) client.get("attributes")).get(HTTP_METHOD.getKey())); } diff --git a/integration-tests/rest-client-reactive/src/main/java/io/quarkus/it/rest/client/main/DefaultCtorTestFilter.java b/integration-tests/rest-client-reactive/src/main/java/io/quarkus/it/rest/client/main/DefaultCtorTestFilter.java index 5ca8adb4bab54..9e28969533afd 100644 --- a/integration-tests/rest-client-reactive/src/main/java/io/quarkus/it/rest/client/main/DefaultCtorTestFilter.java +++ b/integration-tests/rest-client-reactive/src/main/java/io/quarkus/it/rest/client/main/DefaultCtorTestFilter.java @@ -7,6 +7,6 @@ public class DefaultCtorTestFilter implements ClientRequestFilter { @Override public void filter(ClientRequestContext requestContext) { - System.out.println(requestContext.getMethod()); + // Do nothing on purpose. } } diff --git a/integration-tests/rest-client-reactive/src/main/java/io/quarkus/it/rest/client/main/NonDefaultCtorTestFilter.java b/integration-tests/rest-client-reactive/src/main/java/io/quarkus/it/rest/client/main/NonDefaultCtorTestFilter.java index f5e3e88d47c52..1d613a901ef02 100644 --- a/integration-tests/rest-client-reactive/src/main/java/io/quarkus/it/rest/client/main/NonDefaultCtorTestFilter.java +++ b/integration-tests/rest-client-reactive/src/main/java/io/quarkus/it/rest/client/main/NonDefaultCtorTestFilter.java @@ -17,6 +17,5 @@ public NonDefaultCtorTestFilter(ObjectMapper mapper) { @Override public void filter(ClientRequestContext requestContext) { mapper.getFactory(); - System.out.println(requestContext.getUri()); } } diff --git a/integration-tests/rest-client-reactive/src/test/java/io/quarkus/it/rest/client/BasicTest.java b/integration-tests/rest-client-reactive/src/test/java/io/quarkus/it/rest/client/BasicTest.java index bad0eab912763..2309a9caf4bc9 100644 --- a/integration-tests/rest-client-reactive/src/test/java/io/quarkus/it/rest/client/BasicTest.java +++ b/integration-tests/rest-client-reactive/src/test/java/io/quarkus/it/rest/client/BasicTest.java @@ -154,15 +154,11 @@ void shouldCreateClientSpans() { Assertions.assertNotNull(spanData.get("attr_http.client_ip")); Assertions.assertNotNull(spanData.get("attr_http.user_agent")); } else if (spanData.get("kind").equals(SpanKind.CLIENT.toString()) - && spanData.get("name").equals("HTTP POST")) { + && spanData.get("name").equals("/hello")) { clientFound = true; // Client span + Assertions.assertEquals("/hello", spanData.get("name")); - // TODO - radcortez - Requires a fix to pass in the UrlPathTemplate in the Vert.x Context. Check: - // io.quarkus.resteasy.reactive.server.runtime.observability.ObservabilityHandler - // org.jboss.resteasy.reactive.client.AsyncResultUni - //assertEquals("reactive", client.get("name")); - Assertions.assertEquals("HTTP POST", spanData.get("name")); Assertions.assertEquals(SpanKind.CLIENT.toString(), spanData.get("kind")); Assertions.assertTrue((Boolean) spanData.get("ended"));