diff --git a/extensions/opentelemetry/deployment/src/test/java/io/quarkus/opentelemetry/deployment/interceptor/WithSpanInterceptorTest.java b/extensions/opentelemetry/deployment/src/test/java/io/quarkus/opentelemetry/deployment/interceptor/WithSpanInterceptorTest.java index b12624fd36912c..a09599aee0316f 100644 --- a/extensions/opentelemetry/deployment/src/test/java/io/quarkus/opentelemetry/deployment/interceptor/WithSpanInterceptorTest.java +++ b/extensions/opentelemetry/deployment/src/test/java/io/quarkus/opentelemetry/deployment/interceptor/WithSpanInterceptorTest.java @@ -11,6 +11,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.fail; +import java.time.Duration; import java.util.List; import jakarta.enterprise.context.ApplicationScoped; @@ -28,6 +29,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.instrumentation.annotations.SpanAttribute; import io.opentelemetry.instrumentation.annotations.WithSpan; import io.opentelemetry.sdk.trace.data.SpanData; @@ -37,6 +39,8 @@ import io.quarkus.runtime.StartupEvent; import io.quarkus.test.QuarkusUnitTest; import io.smallrye.config.SmallRyeConfig; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; import io.vertx.ext.web.Router; public class WithSpanInterceptorTest { @@ -59,6 +63,20 @@ public class WithSpanInterceptorTest { @AfterEach void tearDown() { spanExporter.reset(); + // All tests execute in the same thread. For the ones using Uni/Multi, there is no automatic Duplicated context, + // because that's the default on CDI. So we need to clean the context manually, between tests. + // However, this is not needed because of changes made in OpenTelemetryMpContextPropagationProvider. + // try { + // Field storage = Thread.currentThread() + // .getContextClassLoader() + // .loadClass("io.opentelemetry.context.ThreadLocalContextStorage") + // .getDeclaredField("THREAD_LOCAL_STORAGE"); + // storage.setAccessible(true); + // ThreadLocal tl = (ThreadLocal) storage.get(null); + // tl.set(null); + // } catch (Exception e) { + // throw new RuntimeException(e); + // } } @Test @@ -137,6 +155,64 @@ void spanWithException() { ((ExceptionEventData) spanItems.get(0).getEvents().get(0)).getException().getMessage()); } + @Test + void spanUni() { + assertEquals("hello Uni", spanBean.spanUni().await().atMost(Duration.ofSeconds(1))); + List spans = spanExporter.getFinishedSpanItems(1); + + final SpanData parent = getSpanByKindAndParentId(spans, INTERNAL, "0000000000000000"); + assertEquals("withSpanAndUni", parent.getName()); + assertEquals(StatusCode.UNSET, parent.getStatus().getStatusCode()); + } + + @Test + void spanUniWithException() { + try { + spanBean.spanUniWithException().await().atMost(Duration.ofSeconds(1)); + fail("Exception expected"); + } catch (Exception e) { + assertThrows(RuntimeException.class, () -> { + throw e; + }); + } + List spanItems = spanExporter.getFinishedSpanItems(1); + assertEquals("withSpanAndUni", spanItems.get(0).getName()); + assertEquals(INTERNAL, spanItems.get(0).getKind()); + assertEquals(ERROR, spanItems.get(0).getStatus().getStatusCode()); + assertEquals(1, spanItems.get(0).getEvents().size()); + assertEquals("hello Uni", + ((ExceptionEventData) spanItems.get(0).getEvents().get(0)).getException().getMessage()); + } + + @Test + void spanMulti() { + assertEquals("hello Multi 2", spanBean.spanMulti().collect().last().await().atMost(Duration.ofSeconds(1))); + List spans = spanExporter.getFinishedSpanItems(1); + + final SpanData parent = getSpanByKindAndParentId(spans, INTERNAL, "0000000000000000"); + assertEquals("withSpanAndMulti", parent.getName()); + assertEquals(StatusCode.UNSET, parent.getStatus().getStatusCode()); + } + + @Test + void spanMultiWithException() { + try { + spanBean.spanMultiWithException().collect().last().await().atMost(Duration.ofSeconds(1)); + fail("Exception expected"); + } catch (Exception e) { + assertThrows(RuntimeException.class, () -> { + throw e; + }); + } + List spanItems = spanExporter.getFinishedSpanItems(1); + assertEquals("withSpanAndMulti", spanItems.get(0).getName()); + assertEquals(INTERNAL, spanItems.get(0).getKind()); + assertEquals(ERROR, spanItems.get(0).getStatus().getStatusCode()); + assertEquals(1, spanItems.get(0).getEvents().size()); + assertEquals("hello Multi", + ((ExceptionEventData) spanItems.get(0).getEvents().get(0)).getException().getMessage()); + } + @ApplicationScoped public static class SpanBean { @WithSpan @@ -179,6 +255,26 @@ public void spanChild() { public void spanRestClient() { spanRestClient.spanRestClient(); } + + @WithSpan(value = "withSpanAndUni") + public Uni spanUni() { + return Uni.createFrom().item("hello Uni"); + } + + @WithSpan(value = "withSpanAndUni") + public Uni spanUniWithException() { + return Uni.createFrom().failure(new RuntimeException("hello Uni")); + } + + @WithSpan(value = "withSpanAndMulti") + public Multi spanMulti() { + return Multi.createFrom().items("hello Multi 1", "hello Multi 2"); + } + + @WithSpan(value = "withSpanAndMulti") + public Multi spanMultiWithException() { + return Multi.createFrom().failure(new RuntimeException("hello Multi")); + } } @ApplicationScoped diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/propagation/OpenTelemetryMpContextPropagationProvider.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/propagation/OpenTelemetryMpContextPropagationProvider.java index 9aef2ded6744a4..59dc09a1a22ba7 100644 --- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/propagation/OpenTelemetryMpContextPropagationProvider.java +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/propagation/OpenTelemetryMpContextPropagationProvider.java @@ -6,6 +6,7 @@ import org.eclipse.microprofile.context.spi.ThreadContextProvider; import org.eclipse.microprofile.context.spi.ThreadContextSnapshot; +import io.opentelemetry.api.trace.Span; import io.quarkus.opentelemetry.runtime.QuarkusContextStorage; public class OpenTelemetryMpContextPropagationProvider implements ThreadContextProvider { @@ -26,7 +27,10 @@ public ThreadContextController begin() { return new ThreadContextController() { @Override public void endContext() throws IllegalStateException { - QuarkusContextStorage.INSTANCE.attach(currentContext); + Span span = Span.fromContext(currentContext); + if (span != null && span.isRecording()) { + QuarkusContextStorage.INSTANCE.attach(currentContext); + } } }; } diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/cdi/WithSpanInterceptor.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/cdi/WithSpanInterceptor.java index 05593746998cc7..8f69d46c6304d9 100644 --- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/cdi/WithSpanInterceptor.java +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/cdi/WithSpanInterceptor.java @@ -6,7 +6,12 @@ import java.lang.reflect.Method; import java.lang.reflect.Parameter; import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletionStage; +import java.util.function.BiConsumer; +import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; +import io.smallrye.mutiny.tuples.Functions; import jakarta.annotation.Priority; import jakarta.interceptor.AroundInvoke; import jakarta.interceptor.Interceptor; @@ -24,6 +29,8 @@ import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor; import io.opentelemetry.instrumentation.api.instrumenter.util.SpanNames; import io.quarkus.arc.ArcInvocationContext; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; @SuppressWarnings("CdiInterceptorInspection") @Interceptor @@ -43,7 +50,12 @@ public WithSpanInterceptor(final OpenTelemetry openTelemetry) { MethodRequest::getArgs); this.instrumenter = builder.addAttributesExtractor(attributesExtractor) - .buildInstrumenter(methodRequest -> spanKindFromMethod(methodRequest.getAnnotationBindings())); + .buildInstrumenter(new SpanKindExtractor() { + @Override + public SpanKind extract(MethodRequest methodRequest) { + return spanKindFromMethod(methodRequest.getAnnotationBindings()); + } + }); } @AroundInvoke @@ -53,35 +65,106 @@ public Object span(final ArcInvocationContext invocationContext) throws Exceptio invocationContext.getParameters(), invocationContext.getInterceptorBindings()); + final Class returnType = invocationContext.getMethod().getReturnType(); Context parentContext = Context.current(); - Context spanContext = null; - Scope scope = null; boolean shouldStart = instrumenter.shouldStart(parentContext, methodRequest); - if (shouldStart) { - spanContext = instrumenter.start(parentContext, methodRequest); - scope = spanContext.makeCurrent(); - } - - try { - Object result = invocationContext.proceed(); - if (shouldStart) { - instrumenter.end(spanContext, methodRequest, null, null); - } + if (!shouldStart) { + return invocationContext.proceed(); + } - return result; - } catch (Throwable t) { - if (shouldStart) { - instrumenter.end(spanContext, methodRequest, null, t); - } - throw t; - } finally { - if (scope != null) { - scope.close(); + if (isUni(returnType)) { + final Context currentSpanContext = instrumenter.start(parentContext, methodRequest); + final Scope currentScope = currentSpanContext.makeCurrent(); + return ((Uni) invocationContext.proceed()).onTermination().invoke(new Functions.TriConsumer() { + @Override + public void accept(Object o, Throwable throwable, Boolean aCancelled) { + try { + if (aCancelled) { + instrumenter.end(currentSpanContext, methodRequest, null, new CancellationException("Cancelled")); + } else if (throwable != null) { + instrumenter.end(currentSpanContext, methodRequest, null, throwable); + } else { + instrumenter.end(currentSpanContext, methodRequest, null, null); + } + } finally { + if (currentScope != null) { + currentScope.close(); + } + } + } + }); + } else if (isMulti(returnType)) { + final Context currentSpanContext = instrumenter.start(parentContext, methodRequest); + final Scope currentScope = currentSpanContext.makeCurrent(); + + return ((Multi) invocationContext.proceed()).onTermination().invoke(new BiConsumer() { + @Override + public void accept(Throwable throwable, Boolean aCanceled) { + try { + if (aCanceled) { + instrumenter.end(currentSpanContext, methodRequest, null, new CancellationException("Cancelled")); + } else if (throwable != null) { + instrumenter.end(currentSpanContext, methodRequest, null, throwable); + } else { + instrumenter.end(currentSpanContext, methodRequest, null, null); + } + } finally { + if (currentScope != null) { + currentScope.close(); + } + } + } + }); + } else if (isCompletionStage(returnType)) { + final Context currentSpanContext = instrumenter.start(parentContext, methodRequest); + final Scope currentScope = currentSpanContext.makeCurrent(); + return ((CompletionStage) invocationContext.proceed()).whenComplete(new BiConsumer() { + @Override + public void accept(Object o, Throwable throwable) { + try { + if (throwable != null) { + instrumenter.end(currentSpanContext, methodRequest, null, throwable); + } else { + instrumenter.end(currentSpanContext, methodRequest, null, null); + } + } finally { + if (currentScope != null) { + currentScope.close(); + } + } + } + }); + } else { + final Context currentSpanContext = instrumenter.start(parentContext, methodRequest); + final Scope currentScope = currentSpanContext.makeCurrent(); + try { + Object result = invocationContext.proceed(); + instrumenter.end(currentSpanContext, methodRequest, null, null); + return result; + } catch (Throwable t) { + instrumenter.end(currentSpanContext, methodRequest, null, t); + throw t; + } finally { + if (currentScope != null) { + currentScope.close(); + } } } } + private static boolean isUni(Class clazz) { + return Uni.class.isAssignableFrom(clazz); + } + + private static boolean isMulti(Class clazz) { + return Multi.class.isAssignableFrom(clazz); + } + + private static boolean isCompletionStage(Class clazz) { + return CompletionStage.class.isAssignableFrom(clazz); + } + private static SpanKind spanKindFromMethod(Set annotations) { SpanKind spanKind = null; for (Annotation annotation : annotations) { diff --git a/integration-tests/opentelemetry-reactive/src/main/java/io/quarkus/it/opentelemetry/reactive/ReactiveResource.java b/integration-tests/opentelemetry-reactive/src/main/java/io/quarkus/it/opentelemetry/reactive/ReactiveResource.java index 02f94b5493f598..f8e4c576301c1d 100644 --- a/integration-tests/opentelemetry-reactive/src/main/java/io/quarkus/it/opentelemetry/reactive/ReactiveResource.java +++ b/integration-tests/opentelemetry-reactive/src/main/java/io/quarkus/it/opentelemetry/reactive/ReactiveResource.java @@ -1,7 +1,10 @@ package io.quarkus.it.opentelemetry.reactive; import java.time.Duration; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import jakarta.annotation.PostConstruct; import jakarta.inject.Inject; import jakarta.ws.rs.GET; import jakarta.ws.rs.POST; @@ -12,23 +15,47 @@ import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.instrumentation.annotations.WithSpan; import io.smallrye.mutiny.Uni; @Path("/reactive") public class ReactiveResource { + public static final int MILLISECONDS_DELAY = 100; @Inject Tracer tracer; @Inject @RestClient ReactiveRestClient client; + private ScheduledExecutorService executor; + + @PostConstruct + public void init() { + executor = Executors.newScheduledThreadPool(2); + } + @GET public Uni helloGet(@QueryParam("name") String name) { Span span = tracer.spanBuilder("helloGet").startSpan(); - return Uni.createFrom().item("Hello " + name).onItem().delayIt().by(Duration.ofSeconds(2)) + return Uni.createFrom().item("Hello " + name).onItem().delayIt().by(Duration.ofMillis(MILLISECONDS_DELAY)) .eventually((Runnable) span::end); } + @GET + @Path("/hello-get-uni-delay") + @WithSpan("helloGetUniDelay") + public Uni helloGetUniDelay() { + return Uni.createFrom().item("helloGetUniDelay").onItem().delayIt().by(Duration.ofMillis(MILLISECONDS_DELAY)); + } + + @GET + @Path("/hello-get-uni-executor") + @WithSpan("helloGetUniExecutor") + public Uni helloGetUniExecutor() { + return Uni.createFrom().item("helloGetUniExecutor") + .onItem().delayIt().onExecutor(executor).by(Duration.ofMillis(MILLISECONDS_DELAY)); + } + @GET @Path("/multiple-chain") public Uni helloMultipleUsingChain() { @@ -48,7 +75,7 @@ public Uni helloMultipleUsingCombine() { @POST public Uni helloPost(String body) { Span span = tracer.spanBuilder("helloPost").startSpan(); - return Uni.createFrom().item("Hello " + body).onItem().delayIt().by(Duration.ofSeconds(2)) + return Uni.createFrom().item("Hello " + body).onItem().delayIt().by(Duration.ofMillis(MILLISECONDS_DELAY)) .eventually((Runnable) span::end); } diff --git a/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveTest.java b/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveTest.java index cdaba582dcb6ec..1d5e796ef14370 100644 --- a/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveTest.java +++ b/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveTest.java @@ -56,6 +56,44 @@ void get() { assertEquals(spans.get(0).get("traceId"), spans.get(1).get("traceId")); } + @Test + void helloGetUniDelayTest() { + given() + .when() + .get("/reactive/hello-get-uni-delay") + .then() + .statusCode(200) + .body(equalTo("helloGetUniDelay")); + + await().atMost(5, SECONDS).until(() -> getSpans().size() == 2); + Map parent = getSpanByKindAndParentId(getSpans(), SERVER, "0000000000000000"); + assertEquals("GET /reactive/hello-get-uni-delay", parent.get("name")); + + Map child = getSpanByKindAndParentId(getSpans(), INTERNAL, parent.get("spanId")); + assertEquals("helloGetUniDelay", child.get("name")); + + assertEquals(child.get("traceId"), parent.get("traceId")); + } + + @Test + void helloGetUniExecutorTest() { + given() + .when() + .get("/reactive/hello-get-uni-executor") + .then() + .statusCode(200) + .body(equalTo("helloGetUniExecutor")); + + await().atMost(5, SECONDS).until(() -> getSpans().size() == 2); + Map parent = getSpanByKindAndParentId(getSpans(), SERVER, "0000000000000000"); + assertEquals("GET /reactive/hello-get-uni-executor", parent.get("name")); + + Map child = getSpanByKindAndParentId(getSpans(), INTERNAL, parent.get("spanId")); + assertEquals("helloGetUniExecutor", child.get("name")); + + assertEquals(child.get("traceId"), parent.get("traceId")); + } + @Test void blockingException() { given()