Skip to content

Commit

Permalink
Always create a duplicated context in OpenTelemetry when executing cl…
Browse files Browse the repository at this point in the history
…ient requests
  • Loading branch information
radcortez committed Jun 28, 2022
1 parent bddfa06 commit debf3e2
Show file tree
Hide file tree
Showing 14 changed files with 98 additions and 21 deletions.
2 changes: 1 addition & 1 deletion bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
<microprofile-rest-client.version>2.0</microprofile-rest-client.version>
<microprofile-jwt.version>1.2</microprofile-jwt.version>
<microprofile-lra.version>1.0</microprofile-lra.version>
<smallrye-common.version>1.12.0</smallrye-common.version>
<smallrye-common.version>1.13.0</smallrye-common.version>
<smallrye-config.version>2.10.1</smallrye-config.version>
<smallrye-health.version>3.2.1</smallrye-health.version>
<smallrye-metrics.version>3.0.5</smallrye-metrics.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_TARGET;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_URL;
import static java.net.HttpURLConnection.HTTP_OK;
import static java.util.stream.Collectors.toSet;
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.net.URI;
Expand All @@ -27,6 +28,8 @@
import io.quarkus.runtime.StartupEvent;
import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.Router;
Expand Down Expand Up @@ -114,14 +117,40 @@ void path() throws Exception {
assertEquals(server.getParentSpanId(), client.getSpanId());
}

@Test
void multiple() throws Exception {
HttpResponse<Buffer> response = WebClient.create(vertx)
.get(uri.getPort(), uri.getHost(), "/multiple")
.putHeader("host", uri.getHost())
.putHeader("port", uri.getPort() + "")
.send()
.toCompletionStage().toCompletableFuture()
.get();

assertEquals(HTTP_OK, response.statusCode());

List<SpanData> spans = spanExporter.getFinishedSpanItems(6);
assertEquals(1, spans.stream().map(SpanData::getTraceId).collect(toSet()).size());
}

@ApplicationScoped
public static class HelloRouter {
@Inject
Router router;
@Inject
Vertx vertx;

public void register(@Observes StartupEvent ev) {
router.get("/hello").handler(rc -> rc.response().end("hello"));
router.get("/hello/:name").handler(rc -> rc.response().end("hello " + rc.pathParam("name")));
router.get("/multiple").handler(rc -> {
String host = rc.request().getHeader("host");
int port = Integer.parseInt(rc.request().getHeader("port"));
WebClient webClient = WebClient.create(vertx);
Future<HttpResponse<Buffer>> one = webClient.get(port, host, "/hello/naruto").send();
Future<HttpResponse<Buffer>> two = webClient.get(port, host, "/hello/goku").send();
CompositeFuture.join(one, two).onComplete(event -> rc.response().end());
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,14 @@ public OpenTelemetryClientFilter(final OpenTelemetry openTelemetry) {

@Override
public void filter(final ClientRequestContext request) {
Context parentContext = Context.current();
io.vertx.core.Context vertxContext = getVertxContext(request);
io.opentelemetry.context.Context parentContext = QuarkusContextStorage.getContext(vertxContext);
if (parentContext == null) {
parentContext = io.opentelemetry.context.Context.current();
}
if (instrumenter.shouldStart(parentContext, request)) {
Context spanContext = instrumenter.start(parentContext, request);
Scope scope = QuarkusContextStorage.INSTANCE.attach(getVertxContext(request), spanContext);
Scope scope = QuarkusContextStorage.INSTANCE.attach(vertxContext, spanContext);
request.setProperty(REST_CLIENT_OTEL_SPAN_CLIENT_CONTEXT, spanContext);
request.setProperty(REST_CLIENT_OTEL_SPAN_CLIENT_PARENT_CONTEXT, parentContext);
request.setProperty(REST_CLIENT_OTEL_SPAN_CLIENT_SCOPE, scope);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ default <R> SpanOperation sendRequest(
if (instrumenter.shouldStart(parentContext, (REQ) request)) {
io.opentelemetry.context.Context spanContext = instrumenter.start(parentContext,
writableHeaders((REQ) request, headers));
Context duplicatedContext = VertxContext.getOrCreateDuplicatedContext(context);
Context duplicatedContext = VertxContext.createNewDuplicatedContext(context);
setContextSafe(duplicatedContext, true);
Scope scope = QuarkusContextStorage.INSTANCE.attach(duplicatedContext, spanContext);
return spanOperation(duplicatedContext, (REQ) request, toMultiMap(headers), spanContext, scope);
Expand Down
2 changes: 1 addition & 1 deletion independent-projects/bootstrap/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
<slf4j-api.version>1.7.36</slf4j-api.version>
<graal-sdk.version>22.1.0</graal-sdk.version>
<plexus-classworlds.version>2.6.0</plexus-classworlds.version> <!-- not actually used but ClassRealm class is referenced from the API used in BootstrapWagonConfigurator -->
<smallrye-common.version>1.12.0</smallrye-common.version>
<smallrye-common.version>1.13.0</smallrye-common.version>
<gradle-tooling.version>7.4.2</gradle-tooling.version>
<quarkus-fs-util.version>0.0.9</quarkus-fs-util.version>
<org-crac.version>0.1.1</org-crac.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public ClientRequestContextImpl(RestClientRequestContext restClientRequestContex

// Capture or create a duplicated context, and store it.
Context current = client.vertx.getOrCreateContext();
this.context = VertxContext.getOrCreateDuplicatedContext(current);
this.context = VertxContext.createNewDuplicatedContext(current);
restClientRequestContext.properties.put(VERTX_CONTEXT_PROPERTY, context);
}

Expand Down
2 changes: 1 addition & 1 deletion independent-projects/resteasy-reactive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
<jboss-jaxrs-api_2.1_spec.version>2.0.1.Final</jboss-jaxrs-api_2.1_spec.version>
<jakarta.json.version>1.1.6</jakarta.json.version>
<mutiny.version>1.6.0</mutiny.version>
<smallrye-common.version>1.12.0</smallrye-common.version>
<smallrye-common.version>1.13.0</smallrye-common.version>
<vertx.version>4.3.1</vertx.version>
<rest-assured.version>4.5.1</rest-assured.version>
<commons-logging-jboss-logging.version>1.0.0.Final</commons-logging-jboss-logging.version>
Expand Down
2 changes: 1 addition & 1 deletion independent-projects/tools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
<quarkus.version>999-SNAPSHOT</quarkus.version>
<maven-model-helper.version>20</maven-model-helper.version>
<commons-io.version>2.11.0</commons-io.version>
<smallrye-common.version>1.12.0</smallrye-common.version>
<smallrye-common.version>1.13.0</smallrye-common.version>
<jandex-maven-plugin.version>1.2.2</jandex-maven-plugin.version>
</properties>
<modules>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import javax.ws.rs.Path;
import javax.ws.rs.QueryParam;

import org.eclipse.microprofile.rest.client.inject.RestClient;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.smallrye.mutiny.Uni;
Expand All @@ -16,6 +18,9 @@
public class ReactiveResource {
@Inject
Tracer tracer;
@Inject
@RestClient
ReactiveRestClient client;

@GET
public Uni<String> helloGet(@QueryParam("name") String name) {
Expand All @@ -24,6 +29,13 @@ public Uni<String> helloGet(@QueryParam("name") String name) {
.eventually((Runnable) span::end);
}

@GET
@Path("/multiple")
public Uni<String> helloMultiple() {
return Uni.combine().all().unis(client.helloGet("Naruto"), client.helloGet("Goku"))
.combinedWith((s, s2) -> s + " and " + s2);
}

@POST
public Uni<String> helloPost(String body) {
Span span = tracer.spanBuilder("helloPost").startSpan();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.quarkus.it.opentelemetry.reactive;

import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.QueryParam;

import org.eclipse.microprofile.rest.client.inject.RegisterRestClient;

import io.smallrye.mutiny.Uni;

@RegisterRestClient(configKey = "client")
@Path("/reactive")
interface ReactiveRestClient {
@GET
Uni<String> helloGet(@QueryParam("name") String name);

@POST
Uni<String> helloPost(String body);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.QueryParam;

import org.eclipse.microprofile.rest.client.inject.RegisterRestClient;
import org.eclipse.microprofile.rest.client.inject.RestClient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -111,16 +108,6 @@ void post() {
assertEquals(HttpMethod.POST.name(), ((Map<?, ?>) client.get("attributes")).get(HTTP_METHOD.getKey()));
}

@RegisterRestClient(configKey = "client")
@Path("/reactive")
interface ReactiveRestClient {
@GET
Uni<String> helloGet(@QueryParam("name") String name);

@POST
Uni<String> helloPost(String body);
}

private static List<Map<String, Object>> getSpans() {
return when().get("/export").body().as(new TypeRef<>() {
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.quarkus.it.opentelemetry.reactive;

import io.quarkus.test.junit.QuarkusIntegrationTest;

@QuarkusIntegrationTest
public class OpenTelemetryReactiveIT extends OpenTelemetryReactiveTest {
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static io.restassured.RestAssured.given;
import static io.restassured.RestAssured.when;
import static java.net.HttpURLConnection.HTTP_OK;
import static java.util.stream.Collectors.toSet;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -59,6 +60,23 @@ void post() {
assertEquals(spans.get(0).get("traceId"), spans.get(1).get("traceId"));
}

@Test
void multiple() {
given()
.contentType("application/json")
.when()
.get("/reactive/multiple")
.then()
.statusCode(200)
.body(equalTo("Hello Naruto and Hello Goku"));

await().atMost(5, TimeUnit.SECONDS).until(() -> getSpans().size() == 7);

List<Map<String, Object>> spans = getSpans();
assertEquals(7, spans.size());
assertEquals(1, spans.stream().map(map -> map.get("traceId")).collect(toSet()).size());
}

private static List<Map<String, Object>> getSpans() {
return when().get("/export").body().as(new TypeRef<>() {
});
Expand Down

0 comments on commit debf3e2

Please sign in to comment.