From 3033857f07b4a724357c842690b96e1fd90442c9 Mon Sep 17 00:00:00 2001 From: brunobat Date: Wed, 19 Jul 2023 11:07:24 +0100 Subject: [PATCH] Bump to OTel 1.28 Fix for RM Kafka HR test: Before RM 4.9.0 Blocking method completion wasn't handled properly. Which lead to this test to be able to abuse and use HR SessionFactory and Panache Reactive in the same message context, one after the other. This change keeps Panache Reactive using WithTransaction annotation. The Blocking method is kept to verify that message dispatch continues on duplicated context. --- bom/application/pom.xml | 6 +- .../VertxClientOpenTelemetryTest.java | 35 ++++++ extensions/opentelemetry/runtime/pom.xml | 4 + .../runtime/OpenTelemetryRecorder.java | 10 +- .../runtime/exporter/otlp/OtlpRecorder.java | 2 +- ...otlp_internal_grpc_ManagedChannelUtil.java | 44 ------- .../tracing/cdi/WithSpanInterceptor.java | 2 +- .../intrumentation/grpc/GrpcRequest.java | 75 +++++++++-- .../grpc/GrpcTracingClientInterceptor.java | 2 +- .../grpc/GrpcTracingServerInterceptor.java | 19 +-- .../restclient/OpenTelemetryClientFilter.java | 31 +++-- .../EventBusInstrumenterVertxTracer.java | 11 +- .../vertx/HttpInstrumenterVertxTracer.java | 118 +++++++++++------- .../grpc/OpenTelemetryGrpcTest.java | 13 +- .../opentelemetry/OpenTelemetryTestCase.java | 5 +- .../opentelemetry/vertx/HelloRouterTest.java | 3 - .../io/quarkus/it/kafka/KafkaReceivers.java | 30 ++--- 17 files changed, 235 insertions(+), 175 deletions(-) delete mode 100644 extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/graal/Target_io_opentelemetry_exporter_otlp_internal_grpc_ManagedChannelUtil.java diff --git a/bom/application/pom.xml b/bom/application/pom.xml index a083968049cbe..4d8bfb583c029 100644 --- a/bom/application/pom.xml +++ b/bom/application/pom.xml @@ -36,8 +36,8 @@ 0.2.4 0.1.15 0.1.5 - 1.25.0 - 1.25.0-alpha + 1.28.0 + 1.28.0-alpha 1.8.1 5.0.2.Final 1.11.1 @@ -68,7 +68,7 @@ 1.0.13 3.0.0 3.5.0 - 4.8.0 + 4.9.0 2.3.1 2.1.2 2.1.1 diff --git a/extensions/opentelemetry/deployment/src/test/java/io/quarkus/opentelemetry/deployment/instrumentation/VertxClientOpenTelemetryTest.java b/extensions/opentelemetry/deployment/src/test/java/io/quarkus/opentelemetry/deployment/instrumentation/VertxClientOpenTelemetryTest.java index 97419bbf5426c..c20222d95aca3 100644 --- a/extensions/opentelemetry/deployment/src/test/java/io/quarkus/opentelemetry/deployment/instrumentation/VertxClientOpenTelemetryTest.java +++ b/extensions/opentelemetry/deployment/src/test/java/io/quarkus/opentelemetry/deployment/instrumentation/VertxClientOpenTelemetryTest.java @@ -131,6 +131,40 @@ void path() throws Exception { assertEquals(client.getTraceId(), server.getTraceId()); } + @Test + void query() throws Exception { + HttpResponse response = WebClient.create(vertx) + .get(uri.getPort(), uri.getHost(), "/hello?name=foo") + .send() + .toCompletionStage().toCompletableFuture() + .get(); + + assertEquals(HTTP_OK, response.statusCode()); + + List spans = spanExporter.getFinishedSpanItems(2); + + SpanData client = getSpanByKindAndParentId(spans, CLIENT, "0000000000000000"); + assertEquals(CLIENT, client.getKind()); + assertEquals("GET", client.getName()); + assertEquals(HTTP_OK, client.getAttributes().get(HTTP_STATUS_CODE)); + assertEquals(HttpMethod.GET, client.getAttributes().get(HTTP_METHOD)); + assertEquals(uri.toString() + "hello?name=foo", client.getAttributes().get(HTTP_URL)); + assertEquals(uri.getHost(), client.getAttributes().get(NET_PEER_NAME)); + assertEquals(uri.getPort(), client.getAttributes().get(NET_PEER_PORT)); + + SpanData server = getSpanByKindAndParentId(spans, SERVER, client.getSpanId()); + assertEquals(SERVER, server.getKind()); + assertEquals("GET /hello", server.getName()); + assertEquals(HTTP_OK, server.getAttributes().get(HTTP_STATUS_CODE)); + assertEquals(HttpMethod.GET, server.getAttributes().get(HTTP_METHOD)); + assertEquals("/hello", server.getAttributes().get(HTTP_ROUTE)); + assertEquals(uri.getHost(), server.getAttributes().get(NET_HOST_NAME)); + assertEquals(uri.getPort(), server.getAttributes().get(NET_HOST_PORT)); + assertEquals(uri.getPath() + "hello?name=foo", server.getAttributes().get(HTTP_TARGET)); + + assertEquals(client.getTraceId(), server.getTraceId()); + } + @Test void multiple() throws Exception { HttpResponse response = WebClient.create(vertx) @@ -165,6 +199,7 @@ public void register(@Observes StartupEvent ev) { Future> two = webClient.get(port, host, "/hello/goku").send(); CompositeFuture.join(one, two).onComplete(event -> rc.response().end()); }); + router.get("/hello?name=foo").handler(rc -> rc.response().end("hello foo")); } } } diff --git a/extensions/opentelemetry/runtime/pom.xml b/extensions/opentelemetry/runtime/pom.xml index 0e0dcf59af816..57677019dddab 100644 --- a/extensions/opentelemetry/runtime/pom.xml +++ b/extensions/opentelemetry/runtime/pom.xml @@ -83,6 +83,10 @@ io.opentelemetry opentelemetry-semconv + + io.opentelemetry + opentelemetry-api-events + io.opentelemetry.instrumentation opentelemetry-instrumentation-api diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/OpenTelemetryRecorder.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/OpenTelemetryRecorder.java index dab2f72463113..bf4e3be04657c 100644 --- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/OpenTelemetryRecorder.java +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/OpenTelemetryRecorder.java @@ -13,7 +13,6 @@ import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.events.GlobalEventEmitterProvider; -import io.opentelemetry.api.logs.GlobalLoggerProvider; import io.opentelemetry.context.ContextStorage; import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; import io.quarkus.arc.SyntheticCreationalContext; @@ -32,7 +31,6 @@ public class OpenTelemetryRecorder { /* STATIC INIT */ public void resetGlobalOpenTelemetryForDevMode() { GlobalOpenTelemetry.resetForTest(); - GlobalLoggerProvider.resetForTest(); GlobalEventEmitterProvider.resetForTest(); } @@ -60,16 +58,16 @@ public OpenTelemetry apply(SyntheticCreationalContext context) { if (oTelRuntimeConfig.sdkDisabled()) { return AutoConfiguredOpenTelemetrySdk.builder() - .setResultAsGlobal(true) - .registerShutdownHook(false) + .setResultAsGlobal() + .disableShutdownHook() .addPropertiesSupplier(() -> oTelConfigs) .build() .getOpenTelemetrySdk(); } var builder = AutoConfiguredOpenTelemetrySdk.builder() - .setResultAsGlobal(true) - .registerShutdownHook(false) + .setResultAsGlobal() + .disableShutdownHook() .addPropertiesSupplier(() -> oTelConfigs) .setServiceClassLoader(Thread.currentThread().getContextClassLoader()); for (var customizer : builderCustomizers) { diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/OtlpRecorder.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/OtlpRecorder.java index c33d1c50d1a84..8f89f33608e47 100644 --- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/OtlpRecorder.java +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/OtlpRecorder.java @@ -16,7 +16,7 @@ import io.opentelemetry.api.metrics.MeterProvider; import io.opentelemetry.exporter.internal.ExporterBuilderUtil; -import io.opentelemetry.exporter.internal.otlp.OtlpUserAgent; +import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent; import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; import io.opentelemetry.sdk.trace.export.BatchSpanProcessorBuilder; import io.opentelemetry.sdk.trace.export.SpanExporter; diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/graal/Target_io_opentelemetry_exporter_otlp_internal_grpc_ManagedChannelUtil.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/graal/Target_io_opentelemetry_exporter_otlp_internal_grpc_ManagedChannelUtil.java deleted file mode 100644 index d5b554973d301..0000000000000 --- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/graal/Target_io_opentelemetry_exporter_otlp_internal_grpc_ManagedChannelUtil.java +++ /dev/null @@ -1,44 +0,0 @@ -package io.quarkus.opentelemetry.runtime.exporter.otlp.graal; - -import static java.util.Objects.requireNonNull; - -import javax.net.ssl.SSLException; -import javax.net.ssl.X509KeyManager; -import javax.net.ssl.X509TrustManager; - -import com.oracle.svm.core.annotate.Substitute; -import com.oracle.svm.core.annotate.TargetClass; - -import io.grpc.ManagedChannelBuilder; -import io.grpc.netty.GrpcSslContexts; -import io.grpc.netty.NettyChannelBuilder; - -/** - * Replace the {@code setTrustedCertificatesPem()} method in native because the upstream code supports using - * either the grpc-netty or grpc-netty-shaded dependencies, but Quarkus only supports the former. - */ -@TargetClass(className = "io.opentelemetry.exporter.internal.grpc.ManagedChannelUtil") -final class Target_io_opentelemetry_exporter_otlp_internal_grpc_ManagedChannelUtil { - - @Substitute - public static void setClientKeysAndTrustedCertificatesPem( - ManagedChannelBuilder managedChannelBuilder, - X509TrustManager tmf, - X509KeyManager kmf) - throws SSLException { - requireNonNull(managedChannelBuilder, "managedChannelBuilder"); - requireNonNull(tmf, "X509TrustManager"); - - // gRPC does not abstract TLS configuration so we need to check the implementation and act - // accordingly. - if (managedChannelBuilder.getClass().getName().equals("io.grpc.netty.NettyChannelBuilder")) { - NettyChannelBuilder nettyBuilder = (NettyChannelBuilder) managedChannelBuilder; - nettyBuilder.sslContext( - GrpcSslContexts.forClient().keyManager(kmf).trustManager(tmf).build()); - } else { - throw new SSLException( - "TLS certificate configuration not supported for unrecognized ManagedChannelBuilder " - + managedChannelBuilder.getClass().getName()); - } - } -} diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/cdi/WithSpanInterceptor.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/cdi/WithSpanInterceptor.java index 2948f09f273df..05593746998cc 100644 --- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/cdi/WithSpanInterceptor.java +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/cdi/WithSpanInterceptor.java @@ -37,7 +37,7 @@ public WithSpanInterceptor(final OpenTelemetry openTelemetry) { INSTRUMENTATION_NAME, new MethodRequestSpanNameExtractor()); - MethodSpanAttributesExtractor attributesExtractor = MethodSpanAttributesExtractor.newInstance( + MethodSpanAttributesExtractor attributesExtractor = MethodSpanAttributesExtractor.create( MethodRequest::getMethod, new WithSpanParameterAttributeNamesExtractor(), MethodRequest::getArgs); diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/grpc/GrpcRequest.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/grpc/GrpcRequest.java index 13799215978cb..c1fcdc9a55f6c 100644 --- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/grpc/GrpcRequest.java +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/grpc/GrpcRequest.java @@ -1,21 +1,48 @@ package io.quarkus.opentelemetry.runtime.tracing.intrumentation.grpc; +import java.net.SocketAddress; + import io.grpc.Attributes; import io.grpc.Metadata; import io.grpc.MethodDescriptor; public class GrpcRequest { + + public static GrpcRequest server( + final MethodDescriptor methodDescriptor, + final Metadata metadata, + final Attributes attributes, + final String authority) { + return new GrpcRequest(methodDescriptor, metadata, attributes, null, authority); + } + + public static GrpcRequest client(final MethodDescriptor methodDescriptor, String authority) { + return new GrpcRequest(methodDescriptor, null, null, null, authority); + } + + public static GrpcRequest client(final MethodDescriptor methodDescriptor, final Metadata metadata) { + return new GrpcRequest(methodDescriptor, metadata, null, null, null); + } + private final MethodDescriptor methodDescriptor; - private final Metadata metadata; + private Metadata metadata; private final Attributes attributes; + private volatile String logicalHost; + private volatile int logicalPort = -1; + private volatile SocketAddress peerSocketAddress; + private GrpcRequest( final MethodDescriptor methodDescriptor, final Metadata metadata, - final Attributes attributes) { + final Attributes attributes, + final SocketAddress peerSocketAddress, + final String authority) { this.methodDescriptor = methodDescriptor; this.metadata = metadata; this.attributes = attributes; + this.peerSocketAddress = peerSocketAddress; + setLogicalAddress(authority); } public MethodDescriptor getMethodDescriptor() { @@ -26,22 +53,44 @@ public Metadata getMetadata() { return metadata; } - public Attributes getAttributes() { - return attributes; + void setMetadata(Metadata metadata) { + this.metadata = metadata; } - public static GrpcRequest server( - final MethodDescriptor methodDescriptor, - final Metadata metadata, - final Attributes attributes) { - return new GrpcRequest(methodDescriptor, metadata, attributes); + public String getLogicalHost() { + return logicalHost; } - public static GrpcRequest client(final MethodDescriptor methodDescriptor) { - return new GrpcRequest(methodDescriptor, null, null); + public int getLogicalPort() { + return logicalPort; } - public static GrpcRequest client(final MethodDescriptor methodDescriptor, final Metadata metadata) { - return new GrpcRequest(methodDescriptor, metadata, null); + public SocketAddress getPeerSocketAddress() { + return peerSocketAddress; + } + + void setPeerSocketAddress(SocketAddress peerSocketAddress) { + this.peerSocketAddress = peerSocketAddress; + } + + public Attributes getAttributes() { + return attributes; + } + + private void setLogicalAddress(String authority) { + if (authority == null) { + return; + } + int index = authority.indexOf(':'); + if (index == -1) { + logicalHost = authority; + } else { + logicalHost = authority.substring(0, index); + try { + logicalPort = Integer.parseInt(authority.substring(index + 1)); + } catch (NumberFormatException e) { + // ignore + } + } } } diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/grpc/GrpcTracingClientInterceptor.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/grpc/GrpcTracingClientInterceptor.java index 90224d298bd71..d66ec95ff92c7 100644 --- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/grpc/GrpcTracingClientInterceptor.java +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/grpc/GrpcTracingClientInterceptor.java @@ -47,7 +47,7 @@ public GrpcTracingClientInterceptor(final OpenTelemetry openTelemetry) { public ClientCall interceptCall( final MethodDescriptor method, final CallOptions callOptions, final Channel next) { - GrpcRequest grpcRequest = GrpcRequest.client(method); + GrpcRequest grpcRequest = GrpcRequest.client(method, callOptions.getAuthority()); Context parentContext = Context.current(); boolean shouldStart = instrumenter.shouldStart(parentContext, grpcRequest); if (shouldStart) { diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/grpc/GrpcTracingServerInterceptor.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/grpc/GrpcTracingServerInterceptor.java index ed6fc1801ed8d..bc6f4e7343cba 100644 --- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/grpc/GrpcTracingServerInterceptor.java +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/grpc/GrpcTracingServerInterceptor.java @@ -22,8 +22,8 @@ import io.opentelemetry.context.propagation.TextMapGetter; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; -import io.opentelemetry.instrumentation.api.instrumenter.net.InetSocketAddressNetServerAttributesGetter; import io.opentelemetry.instrumentation.api.instrumenter.net.NetServerAttributesExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.net.NetServerAttributesGetter; import io.opentelemetry.instrumentation.api.instrumenter.rpc.RpcServerAttributesExtractor; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; import io.quarkus.grpc.GlobalInterceptor; @@ -51,7 +51,8 @@ public GrpcTracingServerInterceptor(final OpenTelemetry openTelemetry) { public ServerCall.Listener interceptCall( final ServerCall call, final Metadata headers, final ServerCallHandler next) { - GrpcRequest grpcRequest = GrpcRequest.server(call.getMethodDescriptor(), headers, call.getAttributes()); + GrpcRequest grpcRequest = GrpcRequest.server(call.getMethodDescriptor(), headers, call.getAttributes(), + call.getAuthority()); Context parentContext = Context.current(); boolean shouldStart = instrumenter.shouldStart(parentContext, grpcRequest); if (shouldStart) { @@ -64,24 +65,24 @@ public ServerCall.Listener interceptCall( return next.startCall(call, headers); } - private static class GrpcServerNetServerAttributesGetter extends InetSocketAddressNetServerAttributesGetter { + private static class GrpcServerNetServerAttributesGetter implements NetServerAttributesGetter { @Override public String getTransport(final GrpcRequest grpcRequest) { return SemanticAttributes.NetTransportValues.IP_TCP; } @Override - public String getHostName(GrpcRequest grpcRequest) { - return null; + public String getServerAddress(GrpcRequest grpcRequest) { + return grpcRequest.getLogicalHost(); } @Override - public Integer getHostPort(GrpcRequest grpcRequest) { - return null; + public Integer getServerPort(GrpcRequest grpcRequest) { + return grpcRequest.getLogicalPort(); } @Override - protected InetSocketAddress getPeerSocketAddress(GrpcRequest grpcRequest) { + public InetSocketAddress getClientInetSocketAddress(GrpcRequest grpcRequest, Status status) { SocketAddress socketAddress = grpcRequest.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); if (socketAddress instanceof InetSocketAddress) { return (InetSocketAddress) socketAddress; @@ -90,7 +91,7 @@ protected InetSocketAddress getPeerSocketAddress(GrpcRequest grpcRequest) { } @Override - protected InetSocketAddress getHostSocketAddress(GrpcRequest grpcRequest) { + public InetSocketAddress getServerInetSocketAddress(GrpcRequest grpcRequest, Status status) { SocketAddress socketAddress = grpcRequest.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR); if (socketAddress instanceof InetSocketAddress) { return (InetSocketAddress) socketAddress; diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/restclient/OpenTelemetryClientFilter.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/restclient/OpenTelemetryClientFilter.java index 1a387a99ad47c..15f72ed603900 100644 --- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/restclient/OpenTelemetryClientFilter.java +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/restclient/OpenTelemetryClientFilter.java @@ -147,7 +147,7 @@ private static class ClientAttributesExtractor implements HttpClientAttributesGetter { @Override - public String getUrl(final ClientRequestContext request) { + public String getUrlFull(final ClientRequestContext request) { URI uri = request.getUri(); if (uri.getUserInfo() != null) { return UriBuilder.fromUri(uri).userInfo(null).build().toString(); @@ -156,28 +156,23 @@ public String getUrl(final ClientRequestContext request) { } @Override - public String getFlavor(final ClientRequestContext request, final ClientResponseContext response) { - return null; - } - - @Override - public String getMethod(final ClientRequestContext request) { + public String getHttpRequestMethod(final ClientRequestContext request) { return request.getMethod(); } @Override - public List getRequestHeader(final ClientRequestContext request, final String name) { + public List getHttpRequestHeader(final ClientRequestContext request, final String name) { return request.getStringHeaders().getOrDefault(name, emptyList()); } @Override - public Integer getStatusCode(ClientRequestContext clientRequestContext, + public Integer getHttpResponseStatusCode(ClientRequestContext clientRequestContext, ClientResponseContext clientResponseContext, Throwable error) { return clientResponseContext.getStatus(); } @Override - public List getResponseHeader(final ClientRequestContext request, final ClientResponseContext response, + public List getHttpResponseHeader(final ClientRequestContext request, final ClientResponseContext response, final String name) { return response.getHeaders().getOrDefault(name, emptyList()); } @@ -192,13 +187,25 @@ public String getTransport(ClientRequestContext clientRequestContext, ClientResp } @Override - public String getPeerName(ClientRequestContext clientRequestContext) { + public String getServerAddress(ClientRequestContext clientRequestContext) { return clientRequestContext.getUri().getHost(); } @Override - public Integer getPeerPort(ClientRequestContext clientRequestContext) { + public Integer getServerPort(ClientRequestContext clientRequestContext) { return clientRequestContext.getUri().getPort(); } + + @Override + public String getNetworkProtocolName(ClientRequestContext clientRequestContext, + ClientResponseContext clientResponseContext) { + return "http"; + } + + @Override + public String getNetworkProtocolVersion(ClientRequestContext clientRequestContext, + ClientResponseContext clientResponseContext) { + return null; + } } } diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/vertx/EventBusInstrumenterVertxTracer.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/vertx/EventBusInstrumenterVertxTracer.java index daec7431bd99a..286c38c8aa617 100644 --- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/vertx/EventBusInstrumenterVertxTracer.java +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/vertx/EventBusInstrumenterVertxTracer.java @@ -1,7 +1,7 @@ package io.quarkus.opentelemetry.runtime.tracing.intrumentation.vertx; +import static io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation.PUBLISH; import static io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation.RECEIVE; -import static io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation.SEND; import static io.quarkus.opentelemetry.runtime.config.build.OTelBuildConfig.INSTRUMENTATION_NAME; import io.opentelemetry.api.OpenTelemetry; @@ -75,10 +75,10 @@ public String get(final Message message, final String key) { private static Instrumenter getProducerInstrumenter(final OpenTelemetry openTelemetry) { InstrumenterBuilder serverBuilder = Instrumenter.builder( openTelemetry, - INSTRUMENTATION_NAME, MessagingSpanNameExtractor.create(EventBusAttributesGetter.INSTANCE, SEND)); + INSTRUMENTATION_NAME, MessagingSpanNameExtractor.create(EventBusAttributesGetter.INSTANCE, PUBLISH)); return serverBuilder - .addAttributesExtractor(MessagingAttributesExtractor.create(EventBusAttributesGetter.INSTANCE, SEND)) + .addAttributesExtractor(MessagingAttributesExtractor.create(EventBusAttributesGetter.INSTANCE, PUBLISH)) .buildProducerInstrumenter((message, key, value) -> { if (message != null) { message.headers().set(key, value); @@ -94,11 +94,6 @@ public String getSystem(final Message message) { return "vert.x"; } - @Override - public String getDestinationKind(final Message message) { - return message.isSend() ? "queue" : "topic"; - } - @Override public String getDestination(final Message message) { return message.address(); diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/vertx/HttpInstrumenterVertxTracer.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/vertx/HttpInstrumenterVertxTracer.java index 78e33ea8d8abf..7977b6f73de22 100644 --- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/vertx/HttpInstrumenterVertxTracer.java +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/vertx/HttpInstrumenterVertxTracer.java @@ -4,11 +4,13 @@ import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_CLIENT_IP; import static io.quarkus.opentelemetry.runtime.config.build.OTelBuildConfig.INSTRUMENTATION_NAME; +import java.net.URI; +import java.net.URISyntaxException; import java.util.Collections; import java.util.List; import java.util.function.BiConsumer; -import jakarta.annotation.Nullable; +import javax.annotation.Nullable; import io.netty.handler.codec.http.HttpResponseStatus; import io.opentelemetry.api.OpenTelemetry; @@ -170,39 +172,32 @@ public String get(final io.opentelemetry.context.Context context, final HttpRequ private static class ServerAttributesExtractor implements HttpServerAttributesGetter { @Override - public String getFlavor(final HttpRequest request) { - if (request instanceof HttpServerRequest) { - HttpVersion version = ((HttpServerRequest) request).version(); - if (version != null) { - switch (version) { - case HTTP_1_0: - return "1.0"; - case HTTP_1_1: - return "1.1"; - case HTTP_2: - return "2.0"; - default: - // Will be executed once Vert.x supports other versions - // At that point version transformation will be needed for OTel semantics - return version.alpnName(); - } - } + public String getUrlPath(final HttpRequest request) { + try { + URI uri = new URI(request.uri()); + return uri.getPath(); + } catch (URISyntaxException e) { + return null; } - return null; } @Override - public String getTarget(final HttpRequest request) { - return request.uri(); + public String getUrlQuery(HttpRequest request) { + try { + URI uri = new URI(request.uri()); + return uri.getQuery(); + } catch (URISyntaxException e) { + return null; + } } @Override - public String getRoute(final HttpRequest request) { + public String getHttpRoute(final HttpRequest request) { return null; } @Override - public String getScheme(final HttpRequest request) { + public String getUrlScheme(final HttpRequest request) { if (request instanceof HttpServerRequest) { return ((HttpServerRequest) request).scheme(); } @@ -210,22 +205,22 @@ public String getScheme(final HttpRequest request) { } @Override - public String getMethod(final HttpRequest request) { + public String getHttpRequestMethod(final HttpRequest request) { return request.method().name(); } @Override - public List getRequestHeader(final HttpRequest request, final String name) { + public List getHttpRequestHeader(final HttpRequest request, final String name) { return request.headers().getAll(name); } @Override - public Integer getStatusCode(HttpRequest httpRequest, HttpResponse httpResponse, Throwable error) { + public Integer getHttpResponseStatusCode(HttpRequest httpRequest, HttpResponse httpResponse, Throwable error) { return httpResponse != null ? httpResponse.statusCode() : null; } @Override - public List getResponseHeader(final HttpRequest request, final HttpResponse response, final String name) { + public List getHttpResponseHeader(final HttpRequest request, final HttpResponse response, final String name) { return response != null ? response.headers().getAll(name) : Collections.emptyList(); } @@ -269,25 +264,22 @@ public void onEnd( } } - private static class HttpServerNetAttributesGetter implements NetServerAttributesGetter { - @Nullable + private static class HttpServerNetAttributesGetter implements NetServerAttributesGetter { @Override public String getTransport(HttpRequest httpRequest) { return null; } - @Nullable @Override - public String getHostName(HttpRequest httpRequest) { + public String getServerAddress(HttpRequest httpRequest) { if (httpRequest instanceof HttpServerRequest) { return VertxUtil.extractRemoteHostname((HttpServerRequest) httpRequest); } return null; } - @Nullable @Override - public Integer getHostPort(HttpRequest httpRequest) { + public Integer getServerPort(HttpRequest httpRequest) { if (httpRequest instanceof HttpServerRequest) { Long remoteHostPort = VertxUtil.extractRemoteHostPort((HttpServerRequest) httpRequest); if (remoteHostPort == null) { @@ -297,6 +289,16 @@ public Integer getHostPort(HttpRequest httpRequest) { } return null; } + + @Override + public String getNetworkProtocolName(HttpRequest request, HttpResponse response) { + return "http"; + } + + @Override + public String getNetworkProtocolVersion(HttpRequest request, HttpResponse response) { + return getHttpVersion(request); + } } private static class HttpClientNetAttributeGetter implements NetClientAttributesGetter { @@ -307,15 +309,25 @@ public String getTransport(HttpRequest httpClientRequest, HttpResponse httpClien @javax.annotation.Nullable @Override - public String getPeerName(HttpRequest httpRequest) { + public String getServerAddress(HttpRequest httpRequest) { return httpRequest.remoteAddress().hostName(); } @javax.annotation.Nullable @Override - public Integer getPeerPort(HttpRequest httpRequest) { + public Integer getServerPort(HttpRequest httpRequest) { return httpRequest.remoteAddress().port(); } + + @Override + public String getNetworkProtocolName(final HttpRequest request, final HttpResponse response) { + return "http"; + } + + @Override + public String getNetworkProtocolVersion(final HttpRequest request, final HttpResponse response) { + return getHttpVersion(request); + } } private static class HttpRequestTextMapGetter implements TextMapGetter { @@ -336,32 +348,27 @@ public String get(final HttpRequest carrier, final String key) { private static class ClientAttributesExtractor implements HttpClientAttributesGetter { @Override - public String getUrl(final HttpRequest request) { + public String getUrlFull(final HttpRequest request) { return request.absoluteURI(); } @Override - public String getFlavor(final HttpRequest request, final HttpResponse response) { - return null; - } - - @Override - public String getMethod(final HttpRequest request) { + public String getHttpRequestMethod(final HttpRequest request) { return request.method().name(); } @Override - public List getRequestHeader(final HttpRequest request, final String name) { + public List getHttpRequestHeader(final HttpRequest request, final String name) { return request.headers().getAll(name); } @Override - public Integer getStatusCode(HttpRequest httpRequest, HttpResponse httpResponse, Throwable error) { + public Integer getHttpResponseStatusCode(HttpRequest httpRequest, HttpResponse httpResponse, Throwable error) { return httpResponse.statusCode(); } @Override - public List getResponseHeader(final HttpRequest request, final HttpResponse response, final String name) { + public List getHttpResponseHeader(final HttpRequest request, final HttpResponse response, final String name) { return response.headers().getAll(name); } } @@ -528,4 +535,25 @@ static WriteHeadersHttpRequest request(HttpRequest httpRequest, BiConsumer spanData, Assertions.assertNotNull(spanData.get("spanId")); verifyResource(spanData, parentSpanData); - Assertions.assertEquals(topic + " send", spanData.get("name")); + Assertions.assertEquals(topic + " " + PUBLISH.name().toLowerCase(), spanData.get("name")); Assertions.assertEquals(SpanKind.PRODUCER.toString(), spanData.get("kind")); Assertions.assertTrue((Boolean) spanData.get("ended")); Assertions.assertFalse((Boolean) spanData.get("parent_remote")); - Assertions.assertEquals("topic", spanData.get("attr_messaging.destination.kind")); Assertions.assertEquals("kafka", spanData.get("attr_messaging.system")); Assertions.assertEquals(topic, spanData.get("attr_messaging.destination.name")); } @@ -157,7 +157,6 @@ private static void verifyConsumer(Map spanData, Assertions.assertTrue((Boolean) spanData.get("ended")); Assertions.assertEquals(parentRemote, spanData.get("parent_remote")); - Assertions.assertEquals("topic", spanData.get("attr_messaging.destination.kind")); Assertions.assertEquals("opentelemetry-integration-test - kafka-consumer-" + channel, spanData.get("attr_messaging.consumer.id")); Assertions.assertEquals("kafka", spanData.get("attr_messaging.system")); diff --git a/integration-tests/opentelemetry-vertx/src/test/java/io/quarkus/it/opentelemetry/vertx/HelloRouterTest.java b/integration-tests/opentelemetry-vertx/src/test/java/io/quarkus/it/opentelemetry/vertx/HelloRouterTest.java index d3069e9bed3a8..0bb1b5455a464 100644 --- a/integration-tests/opentelemetry-vertx/src/test/java/io/quarkus/it/opentelemetry/vertx/HelloRouterTest.java +++ b/integration-tests/opentelemetry-vertx/src/test/java/io/quarkus/it/opentelemetry/vertx/HelloRouterTest.java @@ -8,7 +8,6 @@ import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_ROUTE; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_STATUS_CODE; -import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_DESTINATION_KIND; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_DESTINATION_NAME; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_OPERATION; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_SYSTEM; @@ -135,14 +134,12 @@ void bus() { Map producer = getSpanByKindAndParentId(spans, PRODUCER, server.get("spanId")); assertEquals(PRODUCER.toString(), producer.get("kind")); assertEquals("vert.x", ((Map) producer.get("attributes")).get(MESSAGING_SYSTEM.toString())); - assertEquals("topic", ((Map) producer.get("attributes")).get(MESSAGING_DESTINATION_KIND.toString())); assertEquals("bus", ((Map) producer.get("attributes")).get(MESSAGING_DESTINATION_NAME.toString())); assertEquals(producer.get("parentSpanId"), server.get("spanId")); Map consumer = getSpanByKindAndParentId(spans, CONSUMER, producer.get("spanId")); assertEquals(CONSUMER.toString(), consumer.get("kind")); assertEquals("vert.x", ((Map) consumer.get("attributes")).get(MESSAGING_SYSTEM.toString())); - assertEquals("topic", ((Map) consumer.get("attributes")).get(MESSAGING_DESTINATION_KIND.toString())); assertEquals("bus", ((Map) consumer.get("attributes")).get(MESSAGING_DESTINATION_NAME.toString())); assertEquals(MessageOperation.RECEIVE.toString().toLowerCase(Locale.ROOT), ((Map) consumer.get("attributes")).get(MESSAGING_OPERATION.toString())); diff --git a/integration-tests/reactive-messaging-hibernate-reactive/src/main/java/io/quarkus/it/kafka/KafkaReceivers.java b/integration-tests/reactive-messaging-hibernate-reactive/src/main/java/io/quarkus/it/kafka/KafkaReceivers.java index c42b69dfd641e..09e8541924de3 100644 --- a/integration-tests/reactive-messaging-hibernate-reactive/src/main/java/io/quarkus/it/kafka/KafkaReceivers.java +++ b/integration-tests/reactive-messaging-hibernate-reactive/src/main/java/io/quarkus/it/kafka/KafkaReceivers.java @@ -6,16 +6,13 @@ import java.util.concurrent.CopyOnWriteArrayList; import jakarta.enterprise.context.ApplicationScoped; -import jakarta.enterprise.context.control.ActivateRequestContext; -import jakarta.inject.Inject; import org.eclipse.microprofile.reactive.messaging.Incoming; import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.Outgoing; -import org.hibernate.reactive.mutiny.Mutiny; -import io.quarkus.hibernate.reactive.panache.Panache; import io.quarkus.hibernate.reactive.panache.common.WithSession; +import io.quarkus.hibernate.reactive.panache.common.WithTransaction; import io.smallrye.common.vertx.ContextLocals; import io.smallrye.common.vertx.VertxContext; import io.smallrye.mutiny.Uni; @@ -27,34 +24,25 @@ public class KafkaReceivers { private final List people = new CopyOnWriteArrayList<>(); - @Inject - Mutiny.SessionFactory sf; - @Incoming("fruits-in") @Outgoing("fruits-persisted") + @WithTransaction public Uni> persistFruit(Message fruit) { assert VertxContext.isOnDuplicatedContext(); Fruit payload = fruit.getPayload(); - return sf.withTransaction(session -> { - return session.persist(payload).chain(x -> session.fetch(payload).map(p -> { - // ContextLocals is only callable on duplicated context - ContextLocals.put("fruit-id", p.id); - return fruit.withPayload(payload); - })); + payload.name = "fruit-" + payload.name; + return payload.persist().map(x -> { + // ContextLocals is only callable on duplicated context; + ContextLocals.put("fruit-id", payload.id); + return fruit; }); } @Blocking @Incoming("fruits-persisted") - @ActivateRequestContext - public Uni consumeFruit(Message fruit) { + public void consumeFruit(Fruit fruit) { assert VertxContext.isOnDuplicatedContext(); - Fruit payload = fruit.getPayload(); - assert Objects.equals(ContextLocals.get("fruit-id").get(), payload.id); - return Panache.withTransaction(() -> { - payload.name = "fruit-" + payload.name; - return payload.persist().chain(() -> Uni.createFrom().completionStage(fruit.ack())); - }); + assert Objects.equals(ContextLocals.get("fruit-id").get(), fruit.id); } @Incoming("people-in")