Skip to content

Commit

Permalink
Integrate OTel with WS Next
Browse files Browse the repository at this point in the history
  • Loading branch information
michalvavrik committed Oct 25, 2024
1 parent e01bb78 commit d11c424
Show file tree
Hide file tree
Showing 34 changed files with 1,270 additions and 14 deletions.
1 change: 1 addition & 0 deletions docs/src/main/asciidoc/opentelemetry-tracing.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ See the main xref:opentelemetry.adoc#exporters[OpenTelemetry Guide exporters] se
** Kafka
** Pulsar
* https://quarkus.io/guides/vertx[`quarkus-vertx`] (http requests)
* xref:websockets-next-reference.adoc[`websockets-next`]


=== Disable parts of the automatic tracing
Expand Down
13 changes: 13 additions & 0 deletions docs/src/main/asciidoc/websockets-next-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1131,6 +1131,19 @@ quarkus.log.category."io.quarkus.websockets.next.traffic".level=DEBUG <3>
<2> Set the number of characters of a text message payload which will be logged.
<3> Enable `DEBUG` level is for the logger `io.quarkus.websockets.next.traffic`.

[[telemetry]]
== Telemetry

When the OpenTelemetry extension is present, traces for opened and closed WebSocket connections are collected by default.
If you do not require WebSocket traces, you can disable collecting of traces like in the example below:

[source, properties]
----
quarkus.websockets-next.server.traces.enabled=false
quarkus.websockets-next.client.traces.enabled=false
----

NOTE: Telemetry for the `BasicWebSocketConnector` is currently not supported.

[[websocket-next-configuration-reference]]
== Configuration reference
Expand Down
11 changes: 11 additions & 0 deletions extensions/websockets-next/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,17 @@
<artifactId>mutiny-kotlin</artifactId>
<scope>test</scope>
</dependency>
<!-- Needed for InMemorySpanExporter to verify captured traces -->
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry.semconv</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import jakarta.enterprise.context.SessionScoped;
import jakarta.enterprise.invoke.Invoker;
import jakarta.inject.Singleton;

import org.jboss.jandex.AnnotationInstance;
import org.jboss.jandex.AnnotationTransformation;
Expand Down Expand Up @@ -123,6 +124,9 @@
import io.quarkus.websockets.next.runtime.WebSocketSessionContext;
import io.quarkus.websockets.next.runtime.kotlin.ApplicationCoroutineScope;
import io.quarkus.websockets.next.runtime.kotlin.CoroutineInvoker;
import io.quarkus.websockets.next.runtime.telemetry.TracesBuilderCustomizer;
import io.quarkus.websockets.next.runtime.telemetry.WebSocketTelemetryRecorder;
import io.quarkus.websockets.next.runtime.telemetry.WebsocketTelemetryProvider;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniCreate;
Expand Down Expand Up @@ -459,7 +463,8 @@ public void registerRoutes(WebSocketServerRecorder recorder, List<WebSocketEndpo
.displayOnNotFoundPage("WebSocket Endpoint")
.handlerType(HandlerType.NORMAL)
.handler(recorder.createEndpointHandler(endpoint.generatedClassName, endpoint.endpointId,
activateRequestContext(config, endpoint.endpointId, endpoints, validationPhase.getBeanResolver())));
activateRequestContext(config, endpoint.endpointId, endpoints, validationPhase.getBeanResolver()),
endpoint.path));
routes.produce(builder.build());
}
}
Expand Down Expand Up @@ -636,6 +641,31 @@ void createSecurityHttpUpgradeCheck(Capabilities capabilities, BuildProducer<Syn
}
}

@BuildStep
void addTracesSupport(Capabilities capabilities,
BuildProducer<WebsocketTelemetryCustomizerBuildItem> builderProducer) {
if (capabilities.isPresent(Capability.OPENTELEMETRY_TRACER)) {
builderProducer.produce(new WebsocketTelemetryCustomizerBuildItem(new TracesBuilderCustomizer()));
}
}

@BuildStep
@Record(RUNTIME_INIT)
void createTelemetryProvider(BuildProducer<SyntheticBeanBuildItem> syntheticBeanProducer,
List<WebsocketTelemetryCustomizerBuildItem> builderCustomizerItems, WebSocketTelemetryRecorder recorder) {
var builderCustomizers = builderCustomizerItems.stream().map(i -> i.builderCustomizer).toList();
if (!builderCustomizers.isEmpty()) {
var syntheticBeanBuildItem = SyntheticBeanBuildItem
.configure(WebsocketTelemetryProvider.class)
.setRuntimeInit() // consumes runtime config: traces / metrics enabled
.unremovable()
.supplier(recorder.createTelemetryProvider(builderCustomizers))
.scope(Singleton.class)
.done();
syntheticBeanProducer.produce(syntheticBeanBuildItem);
}
}

private static Map<String, SecurityCheck> collectEndpointSecurityChecks(List<WebSocketEndpointBuildItem> endpoints,
ClassSecurityCheckStorageBuildItem storage, IndexView index) {
return endpoints
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.quarkus.websockets.next.deployment;

import java.util.function.Consumer;

import io.quarkus.builder.item.MultiBuildItem;
import io.quarkus.websockets.next.runtime.telemetry.WebsocketTelemetryProviderBuilder;

/**
* Provides a way to set up metrics and/or traces support in the WebSockets extension.
*/
final class WebsocketTelemetryCustomizerBuildItem extends MultiBuildItem {

final Consumer<WebsocketTelemetryProviderBuilder> builderCustomizer;

WebsocketTelemetryCustomizerBuildItem(Consumer<WebsocketTelemetryProviderBuilder> builderCustomizer) {
this.builderCustomizer = builderCustomizer;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.quarkus.websockets.next.test.telemetry;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Singleton;

import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;

@ApplicationScoped
public class InMemorySpanExporterProducer {

@Produces
@Singleton
InMemorySpanExporter inMemorySpanExporter() {
return InMemorySpanExporter.create();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package io.quarkus.websockets.next.test.telemetry;

import static io.opentelemetry.semconv.UrlAttributes.URL_PATH;
import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CONNECTION_CLIENT_ATTR_KEY;
import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CONNECTION_ENDPOINT_ATTR_KEY;
import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CONNECTION_ID_ATTR_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import jakarta.inject.Inject;

import org.awaitility.Awaitility;
import org.jboss.shrinkwrap.api.asset.StringAsset;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.quarkus.builder.Version;
import io.quarkus.maven.dependency.Dependency;
import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.WebSocketConnector;
import io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage.BounceClient;
import io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage.BounceEndpoint;
import io.quarkus.websockets.next.test.utils.WSClient;
import io.vertx.core.Vertx;
import io.vertx.core.http.WebSocketConnectOptions;

public class OpenTelemetryWebSocketsTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> root
.addClasses(BounceEndpoint.class, WSClient.class, InMemorySpanExporterProducer.class, BounceClient.class)
.addAsResource(new StringAsset("""
quarkus.otel.bsp.export.timeout=1s
quarkus.otel.bsp.schedule.delay=50
"""), "application.properties"))
.setForcedDependencies(
List.of(Dependency.of("io.quarkus", "quarkus-opentelemetry-deployment", Version.getVersion())));

@TestHTTPResource("bounce")
URI bounceUri;

@TestHTTPResource("/")
URI baseUri;

@Inject
Vertx vertx;

@Inject
InMemorySpanExporter spanExporter;

@Inject
WebSocketConnector<BounceClient> connector;

@BeforeEach
public void resetSpans() {
spanExporter.reset();
BounceEndpoint.connectionId = null;
BounceEndpoint.endpointId = null;
BounceEndpoint.MESSAGES.clear();
BounceClient.MESSAGES.clear();
BounceClient.CLOSED_LATCH = new CountDownLatch(1);
BounceEndpoint.CLOSED_LATCH = new CountDownLatch(1);
}

@Test
public void testServerEndpointTracesOnly() {
assertEquals(0, spanExporter.getFinishedSpanItems().size());
try (WSClient client = new WSClient(vertx)) {
client.connect(new WebSocketConnectOptions(), bounceUri);
var response = client.sendAndAwaitReply("How U Livin'").toString();
assertEquals("How U Livin'", response);
}
waitForTracesToArrive(3);
var initialRequestSpan = getSpanByName("GET /bounce", SpanKind.SERVER);

var connectionOpenedSpan = getSpanByName("OPEN " + bounceUri.getPath(), SpanKind.SERVER);
assertEquals(bounceUri.getPath(), getUriAttrVal(connectionOpenedSpan));
assertEquals(initialRequestSpan.getSpanId(), connectionOpenedSpan.getLinks().get(0).getSpanContext().getSpanId());

var connectionClosedSpan = getSpanByName("CLOSE " + bounceUri.getPath(), SpanKind.SERVER);
assertEquals(bounceUri.getPath(), getUriAttrVal(connectionClosedSpan));
assertEquals(BounceEndpoint.connectionId, getConnectionIdAttrVal(connectionClosedSpan));
assertEquals(BounceEndpoint.endpointId, getEndpointIdAttrVal(connectionClosedSpan));
assertEquals(1, connectionClosedSpan.getLinks().size());
assertEquals(connectionOpenedSpan.getSpanId(), connectionClosedSpan.getLinks().get(0).getSpanContext().getSpanId());
}

@Test
public void testClientAndServerEndpointTraces() throws InterruptedException {
var clientConn = connector.baseUri(baseUri).connectAndAwait();
clientConn.sendTextAndAwait("Make It Bun Dem");

// assert client and server called
Awaitility.await().untilAsserted(() -> {
assertEquals(1, BounceEndpoint.MESSAGES.size());
assertEquals("Make It Bun Dem", BounceEndpoint.MESSAGES.get(0));
assertEquals(1, BounceClient.MESSAGES.size());
assertEquals("Make It Bun Dem", BounceClient.MESSAGES.get(0));
});

clientConn.closeAndAwait();
// assert connection closed and client/server were notified
assertTrue(BounceClient.CLOSED_LATCH.await(5, TimeUnit.SECONDS));
assertTrue(BounceEndpoint.CLOSED_LATCH.await(5, TimeUnit.SECONDS));

waitForTracesToArrive(5);

// server traces
var initialRequestSpan = getSpanByName("GET /bounce", SpanKind.SERVER);
var connectionOpenedSpan = getSpanByName("OPEN " + bounceUri.getPath(), SpanKind.SERVER);
assertEquals(bounceUri.getPath(), getUriAttrVal(connectionOpenedSpan));
assertEquals(initialRequestSpan.getSpanId(), connectionOpenedSpan.getLinks().get(0).getSpanContext().getSpanId());
var connectionClosedSpan = getSpanByName("CLOSE " + bounceUri.getPath(), SpanKind.SERVER);
assertEquals(bounceUri.getPath(), getUriAttrVal(connectionClosedSpan));
assertEquals(BounceEndpoint.connectionId, getConnectionIdAttrVal(connectionClosedSpan));
assertEquals(BounceEndpoint.endpointId, getEndpointIdAttrVal(connectionClosedSpan));
assertEquals(1, connectionClosedSpan.getLinks().size());
assertEquals(connectionOpenedSpan.getSpanId(), connectionClosedSpan.getLinks().get(0).getSpanContext().getSpanId());

// client traces
connectionOpenedSpan = getSpanByName("OPEN " + bounceUri.getPath(), SpanKind.CLIENT);
assertEquals(bounceUri.getPath(), getUriAttrVal(connectionOpenedSpan));
assertTrue(connectionOpenedSpan.getLinks().isEmpty());
connectionClosedSpan = getSpanByName("CLOSE " + bounceUri.getPath(), SpanKind.CLIENT);
assertEquals(bounceUri.getPath(), getUriAttrVal(connectionClosedSpan));
assertNotNull(getConnectionIdAttrVal(connectionClosedSpan));
assertNotNull(getClientIdAttrVal(connectionClosedSpan));
assertEquals(1, connectionClosedSpan.getLinks().size());
assertEquals(connectionOpenedSpan.getSpanId(), connectionClosedSpan.getLinks().get(0).getSpanContext().getSpanId());
}

@Test
public void testServerTracesWhenErrorOnMessage() {
assertEquals(0, spanExporter.getFinishedSpanItems().size());
try (WSClient client = new WSClient(vertx)) {
client.connect(new WebSocketConnectOptions(), bounceUri);
var response = client.sendAndAwaitReply("It's Alright, Ma").toString();
assertEquals("It's Alright, Ma", response);
response = client.sendAndAwaitReply("I'm Only Bleeding").toString();
assertEquals("I'm Only Bleeding", response);

client.sendAndAwait("throw-exception");
Awaitility.await().atMost(Duration.ofSeconds(5)).until(client::isClosed);
assertEquals(WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(), client.closeStatusCode());
}
waitForTracesToArrive(3);

// server traces
var initialRequestSpan = getSpanByName("GET /bounce", SpanKind.SERVER);
var connectionOpenedSpan = getSpanByName("OPEN " + bounceUri.getPath(), SpanKind.SERVER);
assertEquals(bounceUri.getPath(), getUriAttrVal(connectionOpenedSpan));
assertEquals(initialRequestSpan.getSpanId(), connectionOpenedSpan.getLinks().get(0).getSpanContext().getSpanId());
var connectionClosedSpan = getSpanByName("CLOSE " + bounceUri.getPath(), SpanKind.SERVER);
assertEquals(bounceUri.getPath(), getUriAttrVal(connectionClosedSpan));
assertEquals(BounceEndpoint.connectionId, getConnectionIdAttrVal(connectionClosedSpan));
assertEquals(BounceEndpoint.endpointId, getEndpointIdAttrVal(connectionClosedSpan));
assertEquals(1, connectionClosedSpan.getLinks().size());
assertEquals(connectionOpenedSpan.getSpanId(), connectionClosedSpan.getLinks().get(0).getSpanContext().getSpanId());
}

private String getConnectionIdAttrVal(SpanData connectionOpenedSpan) {
return connectionOpenedSpan
.getAttributes()
.get(AttributeKey.stringKey(CONNECTION_ID_ATTR_KEY));
}

private String getClientIdAttrVal(SpanData connectionOpenedSpan) {
return connectionOpenedSpan
.getAttributes()
.get(AttributeKey.stringKey(CONNECTION_CLIENT_ATTR_KEY));
}

private String getUriAttrVal(SpanData connectionOpenedSpan) {
return connectionOpenedSpan.getAttributes().get(URL_PATH);
}

private String getEndpointIdAttrVal(SpanData connectionOpenedSpan) {
return connectionOpenedSpan
.getAttributes()
.get(AttributeKey.stringKey(CONNECTION_ENDPOINT_ATTR_KEY));
}

private void waitForTracesToArrive(int expectedTracesCount) {
Awaitility.await()
.atMost(Duration.ofSeconds(5))
.untilAsserted(() -> assertEquals(expectedTracesCount, spanExporter.getFinishedSpanItems().size()));
}

private SpanData getSpanByName(String name, SpanKind kind) {
return spanExporter.getFinishedSpanItems()
.stream()
.filter(sd -> name.equals(sd.getName()))
.filter(sd -> sd.getKind() == kind)
.findFirst()
.orElseThrow(() -> new AssertionError(
"Expected span name '" + name + "' and kind '" + kind + "' not found: "
+ spanExporter.getFinishedSpanItems()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package io.quarkus.websockets.next.test.telemetry.endpoints.onbinarymessage;

public record Dto(String property) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.quarkus.websockets.next.test.telemetry.endpoints.onbinarymessage;

import java.lang.reflect.Type;

import jakarta.annotation.Priority;
import jakarta.inject.Singleton;

import io.quarkus.websockets.next.BinaryMessageCodec;
import io.vertx.core.buffer.Buffer;

@Priority(15)
@Singleton
public class DtoBinaryCodec implements BinaryMessageCodec<Dto> {
@Override
public boolean supports(Type type) {
return type.equals(Dto.class);
}

@Override
public Buffer encode(Dto dto) {
return Buffer.buffer(dto.property());
}

@Override
public Dto decode(Type type, Buffer value) {
return new Dto(value.toString());
}

}
Loading

0 comments on commit d11c424

Please sign in to comment.