Skip to content

Commit

Permalink
Merge pull request #39223 from brunobat/25453-WithSpan-uni-and-multy
Browse files Browse the repository at this point in the history
Fix WithSpan uni and multi
  • Loading branch information
brunobat authored Mar 8, 2024
2 parents 5738ebf + bda2602 commit d7c49ff
Show file tree
Hide file tree
Showing 5 changed files with 261 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;

import java.time.Duration;
import java.util.List;

import jakarta.enterprise.context.ApplicationScoped;
Expand All @@ -28,6 +29,7 @@
import org.junit.jupiter.api.extension.RegisterExtension;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.instrumentation.annotations.SpanAttribute;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import io.opentelemetry.sdk.trace.data.SpanData;
Expand All @@ -37,6 +39,8 @@
import io.quarkus.runtime.StartupEvent;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.config.SmallRyeConfig;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.ext.web.Router;

public class WithSpanInterceptorTest {
Expand Down Expand Up @@ -137,6 +141,64 @@ void spanWithException() {
((ExceptionEventData) spanItems.get(0).getEvents().get(0)).getException().getMessage());
}

@Test
void spanUni() {
assertEquals("hello Uni", spanBean.spanUni().await().atMost(Duration.ofSeconds(1)));
List<SpanData> spans = spanExporter.getFinishedSpanItems(1);

final SpanData parent = getSpanByKindAndParentId(spans, INTERNAL, "0000000000000000");
assertEquals("withSpanAndUni", parent.getName());
assertEquals(StatusCode.UNSET, parent.getStatus().getStatusCode());
}

@Test
void spanUniWithException() {
try {
spanBean.spanUniWithException().await().atMost(Duration.ofSeconds(1));
fail("Exception expected");
} catch (Exception e) {
assertThrows(RuntimeException.class, () -> {
throw e;
});
}
List<SpanData> spanItems = spanExporter.getFinishedSpanItems(1);
assertEquals("withSpanAndUni", spanItems.get(0).getName());
assertEquals(INTERNAL, spanItems.get(0).getKind());
assertEquals(ERROR, spanItems.get(0).getStatus().getStatusCode());
assertEquals(1, spanItems.get(0).getEvents().size());
assertEquals("hello Uni",
((ExceptionEventData) spanItems.get(0).getEvents().get(0)).getException().getMessage());
}

@Test
void spanMulti() {
assertEquals("hello Multi 2", spanBean.spanMulti().collect().last().await().atMost(Duration.ofSeconds(1)));
List<SpanData> spans = spanExporter.getFinishedSpanItems(1);

final SpanData parent = getSpanByKindAndParentId(spans, INTERNAL, "0000000000000000");
assertEquals("withSpanAndMulti", parent.getName());
assertEquals(StatusCode.UNSET, parent.getStatus().getStatusCode());
}

@Test
void spanMultiWithException() {
try {
spanBean.spanMultiWithException().collect().last().await().atMost(Duration.ofSeconds(1));
fail("Exception expected");
} catch (Exception e) {
assertThrows(RuntimeException.class, () -> {
throw e;
});
}
List<SpanData> spanItems = spanExporter.getFinishedSpanItems(1);
assertEquals("withSpanAndMulti", spanItems.get(0).getName());
assertEquals(INTERNAL, spanItems.get(0).getKind());
assertEquals(ERROR, spanItems.get(0).getStatus().getStatusCode());
assertEquals(1, spanItems.get(0).getEvents().size());
assertEquals("hello Multi",
((ExceptionEventData) spanItems.get(0).getEvents().get(0)).getException().getMessage());
}

@ApplicationScoped
public static class SpanBean {
@WithSpan
Expand Down Expand Up @@ -179,6 +241,26 @@ public void spanChild() {
public void spanRestClient() {
spanRestClient.spanRestClient();
}

@WithSpan(value = "withSpanAndUni")
public Uni<String> spanUni() {
return Uni.createFrom().item("hello Uni");
}

@WithSpan(value = "withSpanAndUni")
public Uni<String> spanUniWithException() {
return Uni.createFrom().failure(new RuntimeException("hello Uni"));
}

@WithSpan(value = "withSpanAndMulti")
public Multi<String> spanMulti() {
return Multi.createFrom().items("hello Multi 1", "hello Multi 2");
}

@WithSpan(value = "withSpanAndMulti")
public Multi<String> spanMultiWithException() {
return Multi.createFrom().failure(new RuntimeException("hello Multi"));
}
}

@ApplicationScoped
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.eclipse.microprofile.context.spi.ThreadContextProvider;
import org.eclipse.microprofile.context.spi.ThreadContextSnapshot;

import io.opentelemetry.api.trace.Span;
import io.quarkus.opentelemetry.runtime.QuarkusContextStorage;

public class OpenTelemetryMpContextPropagationProvider implements ThreadContextProvider {
Expand All @@ -26,7 +27,10 @@ public ThreadContextController begin() {
return new ThreadContextController() {
@Override
public void endContext() throws IllegalStateException {
QuarkusContextStorage.INSTANCE.attach(currentContext);
Span span = Span.fromContext(currentContext);
if (span != null && span.isRecording()) {
QuarkusContextStorage.INSTANCE.attach(currentContext);
}
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;

import jakarta.annotation.Priority;
import jakarta.interceptor.AroundInvoke;
Expand All @@ -21,9 +24,13 @@
import io.opentelemetry.instrumentation.api.annotation.support.ParameterAttributeNamesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.util.SpanNames;
import io.quarkus.arc.ArcInvocationContext;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.tuples.Functions;

@SuppressWarnings("CdiInterceptorInspection")
@Interceptor
Expand All @@ -43,7 +50,12 @@ public WithSpanInterceptor(final OpenTelemetry openTelemetry) {
MethodRequest::getArgs);

this.instrumenter = builder.addAttributesExtractor(attributesExtractor)
.buildInstrumenter(methodRequest -> spanKindFromMethod(methodRequest.getAnnotationBindings()));
.buildInstrumenter(new SpanKindExtractor<MethodRequest>() {
@Override
public SpanKind extract(MethodRequest methodRequest) {
return spanKindFromMethod(methodRequest.getAnnotationBindings());
}
});
}

@AroundInvoke
Expand All @@ -53,35 +65,108 @@ public Object span(final ArcInvocationContext invocationContext) throws Exceptio
invocationContext.getParameters(),
invocationContext.getInterceptorBindings());

final Class<?> returnType = invocationContext.getMethod().getReturnType();
Context parentContext = Context.current();
Context spanContext = null;
Scope scope = null;
boolean shouldStart = instrumenter.shouldStart(parentContext, methodRequest);
if (shouldStart) {
spanContext = instrumenter.start(parentContext, methodRequest);
scope = spanContext.makeCurrent();
}

try {
Object result = invocationContext.proceed();

if (shouldStart) {
instrumenter.end(spanContext, methodRequest, null, null);
}
if (!shouldStart) {
return invocationContext.proceed();
}

return result;
} catch (Throwable t) {
if (shouldStart) {
instrumenter.end(spanContext, methodRequest, null, t);
}
throw t;
} finally {
if (scope != null) {
scope.close();
if (isUni(returnType)) {
final Context currentSpanContext = instrumenter.start(parentContext, methodRequest);
final Scope currentScope = currentSpanContext.makeCurrent();
return ((Uni<Object>) invocationContext.proceed()).onTermination()
.invoke(new Functions.TriConsumer<Object, Throwable, Boolean>() {
@Override
public void accept(Object o, Throwable throwable, Boolean isCancelled) {
try {
if (isCancelled) {
instrumenter.end(currentSpanContext, methodRequest, null,
new CancellationException());
} else if (throwable != null) {
instrumenter.end(currentSpanContext, methodRequest, null, throwable);
} else {
instrumenter.end(currentSpanContext, methodRequest, null, null);
}
} finally {
if (currentScope != null) {
currentScope.close();
}
}
}
});
} else if (isMulti(returnType)) {
final Context currentSpanContext = instrumenter.start(parentContext, methodRequest);
final Scope currentScope = currentSpanContext.makeCurrent();

return ((Multi<Object>) invocationContext.proceed()).onTermination().invoke(new BiConsumer<Throwable, Boolean>() {
@Override
public void accept(Throwable throwable, Boolean isCancelled) {
try {
if (isCancelled) {
instrumenter.end(currentSpanContext, methodRequest, null, new CancellationException());
} else if (throwable != null) {
instrumenter.end(currentSpanContext, methodRequest, null, throwable);
} else {
instrumenter.end(currentSpanContext, methodRequest, null, null);
}
} finally {
if (currentScope != null) {
currentScope.close();
}
}
}
});
} else if (isCompletionStage(returnType)) {
final Context currentSpanContext = instrumenter.start(parentContext, methodRequest);
final Scope currentScope = currentSpanContext.makeCurrent();
return ((CompletionStage<?>) invocationContext.proceed()).whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public void accept(Object o, Throwable throwable) {
try {
if (throwable != null) {
instrumenter.end(currentSpanContext, methodRequest, null, throwable);
} else {
instrumenter.end(currentSpanContext, methodRequest, null, null);
}
} finally {
if (currentScope != null) {
currentScope.close();
}
}
}
});
} else {
final Context currentSpanContext = instrumenter.start(parentContext, methodRequest);
final Scope currentScope = currentSpanContext.makeCurrent();
try {
Object result = invocationContext.proceed();
instrumenter.end(currentSpanContext, methodRequest, null, null);
return result;
} catch (Throwable t) {
instrumenter.end(currentSpanContext, methodRequest, null, t);
throw t;
} finally {
if (currentScope != null) {
currentScope.close();
}
}
}
}

private static boolean isUni(Class<?> clazz) {
return Uni.class.isAssignableFrom(clazz);
}

private static boolean isMulti(Class<?> clazz) {
return Multi.class.isAssignableFrom(clazz);
}

private static boolean isCompletionStage(Class<?> clazz) {
return CompletionStage.class.isAssignableFrom(clazz);
}

private static SpanKind spanKindFromMethod(Set<Annotation> annotations) {
SpanKind spanKind = null;
for (Annotation annotation : annotations) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package io.quarkus.it.opentelemetry.reactive;

import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
Expand All @@ -12,23 +15,47 @@

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import io.smallrye.mutiny.Uni;

@Path("/reactive")
public class ReactiveResource {
public static final int MILLISECONDS_DELAY = 100;
@Inject
Tracer tracer;
@Inject
@RestClient
ReactiveRestClient client;

private ScheduledExecutorService executor;

@PostConstruct
public void init() {
executor = Executors.newScheduledThreadPool(2);
}

@GET
public Uni<String> helloGet(@QueryParam("name") String name) {
Span span = tracer.spanBuilder("helloGet").startSpan();
return Uni.createFrom().item("Hello " + name).onItem().delayIt().by(Duration.ofSeconds(2))
return Uni.createFrom().item("Hello " + name).onItem().delayIt().by(Duration.ofMillis(MILLISECONDS_DELAY))
.eventually((Runnable) span::end);
}

@GET
@Path("/hello-get-uni-delay")
@WithSpan("helloGetUniDelay")
public Uni<String> helloGetUniDelay() {
return Uni.createFrom().item("helloGetUniDelay").onItem().delayIt().by(Duration.ofMillis(MILLISECONDS_DELAY));
}

@GET
@Path("/hello-get-uni-executor")
@WithSpan("helloGetUniExecutor")
public Uni<String> helloGetUniExecutor() {
return Uni.createFrom().item("helloGetUniExecutor")
.onItem().delayIt().onExecutor(executor).by(Duration.ofMillis(MILLISECONDS_DELAY));
}

@GET
@Path("/multiple-chain")
public Uni<String> helloMultipleUsingChain() {
Expand All @@ -48,7 +75,7 @@ public Uni<String> helloMultipleUsingCombine() {
@POST
public Uni<String> helloPost(String body) {
Span span = tracer.spanBuilder("helloPost").startSpan();
return Uni.createFrom().item("Hello " + body).onItem().delayIt().by(Duration.ofSeconds(2))
return Uni.createFrom().item("Hello " + body).onItem().delayIt().by(Duration.ofMillis(MILLISECONDS_DELAY))
.eventually((Runnable) span::end);
}

Expand Down
Loading

0 comments on commit d7c49ff

Please sign in to comment.