Skip to content

Commit

Permalink
Merge pull request #34851 from brunobat/bump-otel-1-28
Browse files Browse the repository at this point in the history
Bump to OTel 1.28 and SR reactive-messaging to 4.9.0
  • Loading branch information
geoand authored Aug 2, 2023
2 parents 1e60d82 + 3033857 commit fa469e3
Show file tree
Hide file tree
Showing 17 changed files with 235 additions and 175 deletions.
6 changes: 3 additions & 3 deletions bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
<opentracing-jdbc.version>0.2.4</opentracing-jdbc.version>
<opentracing-kafka.version>0.1.15</opentracing-kafka.version>
<opentracing-mongo.version>0.1.5</opentracing-mongo.version>
<opentelemetry.version>1.25.0</opentelemetry.version>
<opentelemetry-alpha.version>1.25.0-alpha</opentelemetry-alpha.version>
<opentelemetry.version>1.28.0</opentelemetry.version>
<opentelemetry-alpha.version>1.28.0-alpha</opentelemetry-alpha.version>
<jaeger.version>1.8.1</jaeger.version>
<quarkus-http.version>5.0.2.Final</quarkus-http.version>
<micrometer.version>1.11.1</micrometer.version><!-- keep in sync with hdrhistogram -->
Expand Down Expand Up @@ -68,7 +68,7 @@
<smallrye-reactive-streams-operators.version>1.0.13</smallrye-reactive-streams-operators.version>
<smallrye-reactive-types-converter.version>3.0.0</smallrye-reactive-types-converter.version>
<smallrye-mutiny-vertx-binding.version>3.5.0</smallrye-mutiny-vertx-binding.version>
<smallrye-reactive-messaging.version>4.8.0</smallrye-reactive-messaging.version>
<smallrye-reactive-messaging.version>4.9.0</smallrye-reactive-messaging.version>
<smallrye-stork.version>2.3.1</smallrye-stork.version>
<jakarta.activation.version>2.1.2</jakarta.activation.version>
<jakarta.annotation-api.version>2.1.1</jakarta.annotation-api.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,40 @@ void path() throws Exception {
assertEquals(client.getTraceId(), server.getTraceId());
}

@Test
void query() throws Exception {
HttpResponse<Buffer> response = WebClient.create(vertx)
.get(uri.getPort(), uri.getHost(), "/hello?name=foo")
.send()
.toCompletionStage().toCompletableFuture()
.get();

assertEquals(HTTP_OK, response.statusCode());

List<SpanData> spans = spanExporter.getFinishedSpanItems(2);

SpanData client = getSpanByKindAndParentId(spans, CLIENT, "0000000000000000");
assertEquals(CLIENT, client.getKind());
assertEquals("GET", client.getName());
assertEquals(HTTP_OK, client.getAttributes().get(HTTP_STATUS_CODE));
assertEquals(HttpMethod.GET, client.getAttributes().get(HTTP_METHOD));
assertEquals(uri.toString() + "hello?name=foo", client.getAttributes().get(HTTP_URL));
assertEquals(uri.getHost(), client.getAttributes().get(NET_PEER_NAME));
assertEquals(uri.getPort(), client.getAttributes().get(NET_PEER_PORT));

SpanData server = getSpanByKindAndParentId(spans, SERVER, client.getSpanId());
assertEquals(SERVER, server.getKind());
assertEquals("GET /hello", server.getName());
assertEquals(HTTP_OK, server.getAttributes().get(HTTP_STATUS_CODE));
assertEquals(HttpMethod.GET, server.getAttributes().get(HTTP_METHOD));
assertEquals("/hello", server.getAttributes().get(HTTP_ROUTE));
assertEquals(uri.getHost(), server.getAttributes().get(NET_HOST_NAME));
assertEquals(uri.getPort(), server.getAttributes().get(NET_HOST_PORT));
assertEquals(uri.getPath() + "hello?name=foo", server.getAttributes().get(HTTP_TARGET));

assertEquals(client.getTraceId(), server.getTraceId());
}

@Test
void multiple() throws Exception {
HttpResponse<Buffer> response = WebClient.create(vertx)
Expand Down Expand Up @@ -165,6 +199,7 @@ public void register(@Observes StartupEvent ev) {
Future<HttpResponse<Buffer>> two = webClient.get(port, host, "/hello/goku").send();
CompositeFuture.join(one, two).onComplete(event -> rc.response().end());
});
router.get("/hello?name=foo").handler(rc -> rc.response().end("hello foo"));
}
}
}
4 changes: 4 additions & 0 deletions extensions/opentelemetry/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-semconv</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api-events</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-instrumentation-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.events.GlobalEventEmitterProvider;
import io.opentelemetry.api.logs.GlobalLoggerProvider;
import io.opentelemetry.context.ContextStorage;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
import io.quarkus.arc.SyntheticCreationalContext;
Expand All @@ -32,7 +31,6 @@ public class OpenTelemetryRecorder {
/* STATIC INIT */
public void resetGlobalOpenTelemetryForDevMode() {
GlobalOpenTelemetry.resetForTest();
GlobalLoggerProvider.resetForTest();
GlobalEventEmitterProvider.resetForTest();
}

Expand Down Expand Up @@ -60,16 +58,16 @@ public OpenTelemetry apply(SyntheticCreationalContext<OpenTelemetry> context) {

if (oTelRuntimeConfig.sdkDisabled()) {
return AutoConfiguredOpenTelemetrySdk.builder()
.setResultAsGlobal(true)
.registerShutdownHook(false)
.setResultAsGlobal()
.disableShutdownHook()
.addPropertiesSupplier(() -> oTelConfigs)
.build()
.getOpenTelemetrySdk();
}

var builder = AutoConfiguredOpenTelemetrySdk.builder()
.setResultAsGlobal(true)
.registerShutdownHook(false)
.setResultAsGlobal()
.disableShutdownHook()
.addPropertiesSupplier(() -> oTelConfigs)
.setServiceClassLoader(Thread.currentThread().getContextClassLoader());
for (var customizer : builderCustomizers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.internal.ExporterBuilderUtil;
import io.opentelemetry.exporter.internal.otlp.OtlpUserAgent;
import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessorBuilder;
import io.opentelemetry.sdk.trace.export.SpanExporter;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public WithSpanInterceptor(final OpenTelemetry openTelemetry) {
INSTRUMENTATION_NAME,
new MethodRequestSpanNameExtractor());

MethodSpanAttributesExtractor<MethodRequest, Void> attributesExtractor = MethodSpanAttributesExtractor.newInstance(
MethodSpanAttributesExtractor<MethodRequest, Void> attributesExtractor = MethodSpanAttributesExtractor.create(
MethodRequest::getMethod,
new WithSpanParameterAttributeNamesExtractor(),
MethodRequest::getArgs);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,48 @@
package io.quarkus.opentelemetry.runtime.tracing.intrumentation.grpc;

import java.net.SocketAddress;

import io.grpc.Attributes;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;

public class GrpcRequest {

public static GrpcRequest server(
final MethodDescriptor<?, ?> methodDescriptor,
final Metadata metadata,
final Attributes attributes,
final String authority) {
return new GrpcRequest(methodDescriptor, metadata, attributes, null, authority);
}

public static GrpcRequest client(final MethodDescriptor<?, ?> methodDescriptor, String authority) {
return new GrpcRequest(methodDescriptor, null, null, null, authority);
}

public static GrpcRequest client(final MethodDescriptor<?, ?> methodDescriptor, final Metadata metadata) {
return new GrpcRequest(methodDescriptor, metadata, null, null, null);
}

private final MethodDescriptor<?, ?> methodDescriptor;
private final Metadata metadata;
private Metadata metadata;
private final Attributes attributes;

private volatile String logicalHost;
private volatile int logicalPort = -1;
private volatile SocketAddress peerSocketAddress;

private GrpcRequest(
final MethodDescriptor<?, ?> methodDescriptor,
final Metadata metadata,
final Attributes attributes) {
final Attributes attributes,
final SocketAddress peerSocketAddress,
final String authority) {
this.methodDescriptor = methodDescriptor;
this.metadata = metadata;
this.attributes = attributes;
this.peerSocketAddress = peerSocketAddress;
setLogicalAddress(authority);
}

public MethodDescriptor<?, ?> getMethodDescriptor() {
Expand All @@ -26,22 +53,44 @@ public Metadata getMetadata() {
return metadata;
}

public Attributes getAttributes() {
return attributes;
void setMetadata(Metadata metadata) {
this.metadata = metadata;
}

public static GrpcRequest server(
final MethodDescriptor<?, ?> methodDescriptor,
final Metadata metadata,
final Attributes attributes) {
return new GrpcRequest(methodDescriptor, metadata, attributes);
public String getLogicalHost() {
return logicalHost;
}

public static GrpcRequest client(final MethodDescriptor<?, ?> methodDescriptor) {
return new GrpcRequest(methodDescriptor, null, null);
public int getLogicalPort() {
return logicalPort;
}

public static GrpcRequest client(final MethodDescriptor<?, ?> methodDescriptor, final Metadata metadata) {
return new GrpcRequest(methodDescriptor, metadata, null);
public SocketAddress getPeerSocketAddress() {
return peerSocketAddress;
}

void setPeerSocketAddress(SocketAddress peerSocketAddress) {
this.peerSocketAddress = peerSocketAddress;
}

public Attributes getAttributes() {
return attributes;
}

private void setLogicalAddress(String authority) {
if (authority == null) {
return;
}
int index = authority.indexOf(':');
if (index == -1) {
logicalHost = authority;
} else {
logicalHost = authority.substring(0, index);
try {
logicalPort = Integer.parseInt(authority.substring(index + 1));
} catch (NumberFormatException e) {
// ignore
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public GrpcTracingClientInterceptor(final OpenTelemetry openTelemetry) {
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
final MethodDescriptor<ReqT, RespT> method, final CallOptions callOptions, final Channel next) {

GrpcRequest grpcRequest = GrpcRequest.client(method);
GrpcRequest grpcRequest = GrpcRequest.client(method, callOptions.getAuthority());
Context parentContext = Context.current();
boolean shouldStart = instrumenter.shouldStart(parentContext, grpcRequest);
if (shouldStart) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.net.InetSocketAddressNetServerAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.net.NetServerAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.net.NetServerAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.rpc.RpcServerAttributesExtractor;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import io.quarkus.grpc.GlobalInterceptor;
Expand Down Expand Up @@ -51,7 +51,8 @@ public GrpcTracingServerInterceptor(final OpenTelemetry openTelemetry) {
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
final ServerCall<ReqT, RespT> call, final Metadata headers, final ServerCallHandler<ReqT, RespT> next) {

GrpcRequest grpcRequest = GrpcRequest.server(call.getMethodDescriptor(), headers, call.getAttributes());
GrpcRequest grpcRequest = GrpcRequest.server(call.getMethodDescriptor(), headers, call.getAttributes(),
call.getAuthority());
Context parentContext = Context.current();
boolean shouldStart = instrumenter.shouldStart(parentContext, grpcRequest);
if (shouldStart) {
Expand All @@ -64,24 +65,24 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
return next.startCall(call, headers);
}

private static class GrpcServerNetServerAttributesGetter extends InetSocketAddressNetServerAttributesGetter<GrpcRequest> {
private static class GrpcServerNetServerAttributesGetter implements NetServerAttributesGetter<GrpcRequest, Status> {
@Override
public String getTransport(final GrpcRequest grpcRequest) {
return SemanticAttributes.NetTransportValues.IP_TCP;
}

@Override
public String getHostName(GrpcRequest grpcRequest) {
return null;
public String getServerAddress(GrpcRequest grpcRequest) {
return grpcRequest.getLogicalHost();
}

@Override
public Integer getHostPort(GrpcRequest grpcRequest) {
return null;
public Integer getServerPort(GrpcRequest grpcRequest) {
return grpcRequest.getLogicalPort();
}

@Override
protected InetSocketAddress getPeerSocketAddress(GrpcRequest grpcRequest) {
public InetSocketAddress getClientInetSocketAddress(GrpcRequest grpcRequest, Status status) {
SocketAddress socketAddress = grpcRequest.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
if (socketAddress instanceof InetSocketAddress) {
return (InetSocketAddress) socketAddress;
Expand All @@ -90,7 +91,7 @@ protected InetSocketAddress getPeerSocketAddress(GrpcRequest grpcRequest) {
}

@Override
protected InetSocketAddress getHostSocketAddress(GrpcRequest grpcRequest) {
public InetSocketAddress getServerInetSocketAddress(GrpcRequest grpcRequest, Status status) {
SocketAddress socketAddress = grpcRequest.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
if (socketAddress instanceof InetSocketAddress) {
return (InetSocketAddress) socketAddress;
Expand Down
Loading

0 comments on commit fa469e3

Please sign in to comment.