diff --git a/docs/src/main/asciidoc/websockets-next-reference.adoc b/docs/src/main/asciidoc/websockets-next-reference.adoc
index 3ccf664f55127..8554b4254ed4a 100644
--- a/docs/src/main/asciidoc/websockets-next-reference.adoc
+++ b/docs/src/main/asciidoc/websockets-next-reference.adoc
@@ -1016,6 +1016,24 @@ quarkus.log.category."io.quarkus.websockets.next.traffic".level=DEBUG <3>
<2> Set the number of characters of a text message payload which will be logged.
<3> Enable `DEBUG` level is for the logger `io.quarkus.websockets.next.traffic`.
+[[telemetry]]
+== Telemetry
+
+When the OpenTelemetry extension is present, traces for opened and closed WebSocket connections are collected by default.
+If you do not require WebSocket traces, you can disable tracing like in the example below:
+
+[source, properties]
+----
+quarkus.websockets-next.tracing.enabled=false
+----
+
+When the Micrometer extension is present, metrics for messages, errors and bytes transferred are collected.
+If you do not require WebSocket metrics, you can disable metrics like in the example below:
+
+[source, properties]
+----
+quarkus.websockets-next.metrics.enabled=false
+----
[[websocket-next-configuration-reference]]
== Configuration reference
diff --git a/extensions/websockets-next/deployment/pom.xml b/extensions/websockets-next/deployment/pom.xml
index 3b62c8108947a..bb480eeb8657a 100644
--- a/extensions/websockets-next/deployment/pom.xml
+++ b/extensions/websockets-next/deployment/pom.xml
@@ -80,6 +80,12 @@
mutiny-kotlin
test
+
+
+ io.opentelemetry
+ opentelemetry-sdk-testing
+ test
+
diff --git a/extensions/websockets-next/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketProcessor.java b/extensions/websockets-next/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketProcessor.java
index a6e5c44f375d7..d6cd12948f5b5 100644
--- a/extensions/websockets-next/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketProcessor.java
+++ b/extensions/websockets-next/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketProcessor.java
@@ -72,6 +72,7 @@
import io.quarkus.deployment.builditem.RuntimeConfigSetupCompleteBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.deployment.execannotations.ExecutionModelAnnotationsAllowedBuildItem;
+import io.quarkus.deployment.metrics.MetricsCapabilityBuildItem;
import io.quarkus.gizmo.BytecodeCreator;
import io.quarkus.gizmo.CatchBlockCreator;
import io.quarkus.gizmo.ClassCreator;
@@ -82,6 +83,7 @@
import io.quarkus.gizmo.MethodDescriptor;
import io.quarkus.gizmo.ResultHandle;
import io.quarkus.gizmo.TryBlock;
+import io.quarkus.runtime.metrics.MetricsFactory;
import io.quarkus.security.spi.ClassSecurityCheckAnnotationBuildItem;
import io.quarkus.security.spi.ClassSecurityCheckStorageBuildItem;
import io.quarkus.security.spi.SecurityTransformerUtils;
@@ -119,6 +121,10 @@
import io.quarkus.websockets.next.runtime.WebSocketSessionContext;
import io.quarkus.websockets.next.runtime.kotlin.ApplicationCoroutineScope;
import io.quarkus.websockets.next.runtime.kotlin.CoroutineInvoker;
+import io.quarkus.websockets.next.runtime.telemetry.ErrorInterceptor;
+import io.quarkus.websockets.next.runtime.telemetry.MicrometerWebSocketEndpointDecorator;
+import io.quarkus.websockets.next.runtime.telemetry.OtelWebSocketEndpointDecorator;
+import io.quarkus.websockets.next.runtime.telemetry.TelemetrySupport;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniCreate;
@@ -194,7 +200,8 @@ void additionalBeans(CombinedIndexBuildItem combinedIndex, BuildProducer beanProducer, Capabilities capabilities) {
+ if (capabilities.isPresent(Capability.OPENTELEMETRY_TRACER)) {
+ beanProducer.produce(AdditionalBeanBuildItem.unremovableOf(OtelWebSocketEndpointDecorator.class));
+ }
+ }
+
+ @BuildStep
+ void addMicrometerEndpointDecorator(BuildProducer beanProducer,
+ Optional metricsCapability) {
+ boolean micrometerPresent = metricsCapability.map(m -> m.metricsSupported(MetricsFactory.MICROMETER)).orElse(false);
+ if (micrometerPresent) {
+ beanProducer.produce(AdditionalBeanBuildItem.unremovableOf(MicrometerWebSocketEndpointDecorator.class));
+ }
+ }
+
private static Map collectEndpointSecurityChecks(List endpoints,
ClassSecurityCheckStorageBuildItem storage, IndexView index) {
return endpoints
@@ -743,7 +766,6 @@ private void validateOnClose(Callback callback) {
* @return the name of the generated class
*/
static String generateEndpoint(WebSocketEndpointBuildItem endpoint,
- CallbackArgumentsBuildItem argumentProviders,
TransformedAnnotationsBuildItem transformedAnnotations,
IndexView index,
ClassOutput classOutput,
@@ -766,12 +788,12 @@ static String generateEndpoint(WebSocketEndpointBuildItem endpoint,
.build();
MethodCreator constructor = endpointCreator.getConstructorCreator(WebSocketConnectionBase.class,
- Codecs.class, ContextSupport.class, SecuritySupport.class);
+ Codecs.class, ContextSupport.class, SecuritySupport.class, ErrorInterceptor.class);
constructor.invokeSpecialMethod(
MethodDescriptor.ofConstructor(WebSocketEndpointBase.class, WebSocketConnectionBase.class,
- Codecs.class, ContextSupport.class, SecuritySupport.class),
+ Codecs.class, ContextSupport.class, SecuritySupport.class, ErrorInterceptor.class),
constructor.getThis(), constructor.getMethodParam(0), constructor.getMethodParam(1),
- constructor.getMethodParam(2), constructor.getMethodParam(3));
+ constructor.getMethodParam(2), constructor.getMethodParam(3), constructor.getMethodParam(4));
MethodCreator inboundProcessingMode = endpointCreator.getMethodCreator("inboundProcessingMode",
InboundProcessingMode.class);
@@ -791,19 +813,18 @@ static String generateEndpoint(WebSocketEndpointBuildItem endpoint,
ResultHandle[] args = callback.generateArguments(tryBlock.getThis(), tryBlock, transformedAnnotations, index);
ResultHandle ret = callBusinessMethod(endpointCreator, constructor, callback, "Open", tryBlock,
beanInstance, args, invokerFactory);
- encodeAndReturnResult(tryBlock.getThis(), tryBlock, callback, globalErrorHandlers, endpoint, ret);
+ encodeAndReturnResult(tryBlock.getThis(), tryBlock, callback, ret);
MethodCreator onOpenExecutionModel = endpointCreator.getMethodCreator("onOpenExecutionModel",
ExecutionModel.class);
onOpenExecutionModel.returnValue(onOpenExecutionModel.load(callback.executionModel));
}
- generateOnMessage(endpointCreator, constructor, endpoint, endpoint.onBinaryMessage, argumentProviders,
- transformedAnnotations, index, globalErrorHandlers, invokerFactory);
- generateOnMessage(endpointCreator, constructor, endpoint, endpoint.onTextMessage, argumentProviders,
- transformedAnnotations, index, globalErrorHandlers, invokerFactory);
- generateOnMessage(endpointCreator, constructor, endpoint, endpoint.onPongMessage, argumentProviders,
- transformedAnnotations, index, globalErrorHandlers, invokerFactory);
+ generateOnMessage(endpointCreator, constructor, endpoint.onBinaryMessage, transformedAnnotations, index,
+ invokerFactory);
+ generateOnMessage(endpointCreator, constructor, endpoint.onTextMessage, transformedAnnotations, index,
+ invokerFactory);
+ generateOnMessage(endpointCreator, constructor, endpoint.onPongMessage, transformedAnnotations, index, invokerFactory);
if (endpoint.onClose != null) {
Callback callback = endpoint.onClose;
@@ -816,7 +837,7 @@ static String generateEndpoint(WebSocketEndpointBuildItem endpoint,
ResultHandle[] args = callback.generateArguments(tryBlock.getThis(), tryBlock, transformedAnnotations, index);
ResultHandle ret = callBusinessMethod(endpointCreator, constructor, callback, "Close", tryBlock,
beanInstance, args, invokerFactory);
- encodeAndReturnResult(tryBlock.getThis(), tryBlock, callback, globalErrorHandlers, endpoint, ret);
+ encodeAndReturnResult(tryBlock.getThis(), tryBlock, callback, ret);
MethodCreator onCloseExecutionModel = endpointCreator.getMethodCreator("onCloseExecutionModel",
ExecutionModel.class);
@@ -870,6 +891,14 @@ private static void generateOnError(ClassCreator endpointCreator, MethodCreator
throwableInfos.sort(Comparator.comparingInt(ThrowableInfo::level).reversed());
ResultHandle endpointThis = doOnError.getThis();
+ // this.getErrorInterceptor().intercept(throwable);
+ var errorInterceptorHandle = doOnError.invokeVirtualMethod(
+ MethodDescriptor.ofMethod(WebSocketEndpointBase.class, "getErrorInterceptor", ErrorInterceptor.class),
+ endpointThis);
+ doOnError.invokeInterfaceMethod(
+ MethodDescriptor.ofMethod(ErrorInterceptor.class, "intercept", void.class, Throwable.class),
+ errorInterceptorHandle, doOnError.getMethodParam(0));
+
for (ThrowableInfo throwableInfo : throwableInfos) {
BytecodeCreator throwableMatches = doOnError
.ifTrue(doOnError.instanceOf(doOnError.getMethodParam(0), throwableInfo.hierarchy.get(0).toString()))
@@ -887,7 +916,7 @@ private static void generateOnError(ClassCreator endpointCreator, MethodCreator
ResultHandle[] args = callback.generateArguments(endpointThis, tryBlock, transformedAnnotations, index);
ResultHandle ret = callBusinessMethod(endpointCreator, constructor, callback, "Error", tryBlock,
beanInstance, args, invokerFactory);
- encodeAndReturnResult(endpointThis, tryBlock, callback, globalErrorHandlers, endpoint, ret);
+ encodeAndReturnResult(endpointThis, tryBlock, callback, ret);
// return doErrorExecute()
throwableMatches.returnValue(
@@ -936,10 +965,8 @@ record GlobalErrorHandler(BeanInfo bean, Callback callback) {
}
- private static void generateOnMessage(ClassCreator endpointCreator, MethodCreator constructor,
- WebSocketEndpointBuildItem endpoint, Callback callback,
- CallbackArgumentsBuildItem callbackArguments, TransformedAnnotationsBuildItem transformedAnnotations,
- IndexView index, GlobalErrorHandlersBuildItem globalErrorHandlers, InvokerFactoryBuildItem invokerFactory) {
+ private static void generateOnMessage(ClassCreator endpointCreator, MethodCreator constructor, Callback callback,
+ TransformedAnnotationsBuildItem transformedAnnotations, IndexView index, InvokerFactoryBuildItem invokerFactory) {
if (callback == null) {
return;
}
@@ -972,7 +999,7 @@ private static void generateOnMessage(ClassCreator endpointCreator, MethodCreato
// Call the business method
ResultHandle ret = callBusinessMethod(endpointCreator, constructor, callback, messageType, tryBlock, beanInstance, args,
invokerFactory);
- encodeAndReturnResult(tryBlock.getThis(), tryBlock, callback, globalErrorHandlers, endpoint, ret);
+ encodeAndReturnResult(tryBlock.getThis(), tryBlock, callback, ret);
MethodCreator onMessageExecutionModel = endpointCreator.getMethodCreator("on" + messageType + "MessageExecutionModel",
ExecutionModel.class);
@@ -1114,13 +1141,9 @@ static ResultHandle decodeMessage(
}
}
- private static ResultHandle uniOnFailureDoOnError(ResultHandle endpointThis, BytecodeCreator method, Callback callback,
- ResultHandle uni, WebSocketEndpointBuildItem endpoint, GlobalErrorHandlersBuildItem globalErrorHandlers) {
- if (callback.isOnError()
- || (globalErrorHandlers.handlers.isEmpty() && (endpoint == null || endpoint.onErrors.isEmpty()))) {
- // @OnError or no error handlers available
- return uni;
- }
+ private static ResultHandle uniOnFailureDoOnError(ResultHandle endpointThis, BytecodeCreator method, ResultHandle uni) {
+ // this is always called on failure so that we have one (sometimes overloaded) method that is always called then,
+ // which simplifies collection of error metrics
// return uniMessage.onFailure().recoverWithUni(t -> {
// return doOnError(t);
// });
@@ -1137,7 +1160,6 @@ private static ResultHandle uniOnFailureDoOnError(ResultHandle endpointThis, Byt
}
private static ResultHandle encodeMessage(ResultHandle endpointThis, BytecodeCreator method, Callback callback,
- GlobalErrorHandlersBuildItem globalErrorHandlers, WebSocketEndpointBuildItem endpoint,
ResultHandle value) {
if (callback.acceptsBinaryMessage()
|| isOnOpenWithBinaryReturnType(callback)) {
@@ -1155,7 +1177,7 @@ private static ResultHandle encodeMessage(ResultHandle endpointThis, BytecodeCre
}
if (messageType.name().equals(WebSocketDotNames.VOID)) {
// Uni
- return uniOnFailureDoOnError(endpointThis, method, callback, value, endpoint, globalErrorHandlers);
+ return uniOnFailureDoOnError(endpointThis, method, value);
} else {
// return uniMessage.chain(m -> {
// Buffer buffer = encodeBuffer(m);
@@ -1173,7 +1195,7 @@ private static ResultHandle encodeMessage(ResultHandle endpointThis, BytecodeCre
ResultHandle uniChain = method.invokeInterfaceMethod(
MethodDescriptor.ofMethod(Uni.class, "chain", Uni.class, Function.class), value,
fun.getInstance());
- return uniOnFailureDoOnError(endpointThis, method, callback, uniChain, endpoint, globalErrorHandlers);
+ return uniOnFailureDoOnError(endpointThis, method, uniChain);
}
} else if (callback.isReturnTypeMulti()) {
// try {
@@ -1221,7 +1243,7 @@ private static ResultHandle encodeMessage(ResultHandle endpointThis, BytecodeCre
}
if (messageType.name().equals(WebSocketDotNames.VOID)) {
// Uni
- return uniOnFailureDoOnError(endpointThis, method, callback, value, endpoint, globalErrorHandlers);
+ return uniOnFailureDoOnError(endpointThis, method, value);
} else {
// return uniMessage.chain(m -> {
// String text = encodeText(m);
@@ -1239,7 +1261,7 @@ private static ResultHandle encodeMessage(ResultHandle endpointThis, BytecodeCre
ResultHandle uniChain = method.invokeInterfaceMethod(
MethodDescriptor.ofMethod(Uni.class, "chain", Uni.class, Function.class), value,
fun.getInstance());
- return uniOnFailureDoOnError(endpointThis, method, callback, uniChain, endpoint, globalErrorHandlers);
+ return uniOnFailureDoOnError(endpointThis, method, uniChain);
}
} else if (callback.isReturnTypeMulti()) {
// return multiText(multi, m -> {
@@ -1340,7 +1362,6 @@ private static ResultHandle uniVoid(BytecodeCreator method) {
}
private static void encodeAndReturnResult(ResultHandle endpointThis, BytecodeCreator method, Callback callback,
- GlobalErrorHandlersBuildItem globalErrorHandlers, WebSocketEndpointBuildItem endpoint,
ResultHandle result) {
// The result must be always Uni
if (callback.isReturnTypeVoid()) {
@@ -1350,7 +1371,7 @@ private static void encodeAndReturnResult(ResultHandle endpointThis, BytecodeCre
// Skip response
BytecodeCreator isNull = method.ifNull(result).trueBranch();
isNull.returnValue(uniVoid(isNull));
- method.returnValue(encodeMessage(endpointThis, method, callback, globalErrorHandlers, endpoint, result));
+ method.returnValue(encodeMessage(endpointThis, method, callback, result));
}
}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/AbstractWebSocketsOnMessageTest.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/AbstractWebSocketsOnMessageTest.java
new file mode 100644
index 0000000000000..2506d7582aa9c
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/AbstractWebSocketsOnMessageTest.java
@@ -0,0 +1,349 @@
+package io.quarkus.websockets.next.test.telemetry;
+
+import static io.quarkus.websockets.next.test.telemetry.Connection.sendAndAssertResponses;
+import static io.quarkus.websockets.next.test.telemetry.MetricsAsserter.stringToBytes;
+
+import java.net.URI;
+import java.util.List;
+
+import jakarta.inject.Inject;
+
+import org.jboss.shrinkwrap.api.asset.StringAsset;
+import org.junit.jupiter.api.Test;
+
+import io.quarkus.builder.Version;
+import io.quarkus.maven.dependency.Dependency;
+import io.quarkus.test.QuarkusUnitTest;
+import io.quarkus.test.common.http.TestHTTPResource;
+import io.quarkus.websockets.next.WebSocketClientConnection;
+import io.quarkus.websockets.next.WebSocketConnector;
+import io.quarkus.websockets.next.test.utils.WSClient;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.WebSocketConnectOptions;
+
+public abstract class AbstractWebSocketsOnMessageTest {
+
+ static QuarkusUnitTest createQuarkusUnitTest(String endpointsPackage) {
+ return new QuarkusUnitTest()
+ .withApplicationRoot(root -> root
+ .addPackage(endpointsPackage)
+ .addClasses(WSClient.class, Connection.class, MetricsAsserter.class,
+ AbstractWebSocketsOnMessageTest.class)
+ .addAsResource(new StringAsset("""
+ bounce-endpoint.prefix-responses=true
+ """), "application.properties"))
+ .setForcedDependencies(
+ List.of(Dependency.of("io.quarkus", "quarkus-micrometer-registry-prometheus-deployment",
+ Version.getVersion())));
+ }
+
+ protected final MetricsAsserter asserter = new MetricsAsserter();
+
+ @TestHTTPResource("bounce")
+ URI bounceUri;
+
+ @TestHTTPResource("/")
+ URI baseUri;
+
+ @TestHTTPResource("received-single-text-response-none")
+ URI singleTextReceived_NoResponse_Uri;
+
+ @TestHTTPResource("received-single-text-response-multi-text")
+ URI singleTextReceived_MultiTextResponse_Uri;
+
+ @TestHTTPResource("received-multi-text-response-none")
+ URI multiTextReceived_NoResponse_Uri;
+
+ @TestHTTPResource("received-multi-text-response-single-text")
+ URI multiTextReceived_SingleTextResponse_Uri;
+
+ @TestHTTPResource("received-multi-text-response-multi-text")
+ URI multiTextReceived_MultiTextResponse_Uri;
+
+ @TestHTTPResource("received-single-text-response-uni-text")
+ URI singleTextReceived_UniTextResponse_Uri;
+
+ @TestHTTPResource("received-single-dto-response-single-dto")
+ URI singleDtoReceived_SingleDtoResponse_Uri;
+
+ @TestHTTPResource("received-single-dto-response-none")
+ URI singleDtoReceived_NoResponse_Uri;
+
+ @TestHTTPResource("received-single-dto-response-uni-dto")
+ URI singleDtoReceived_UniDtoResponse_Uri;
+
+ @TestHTTPResource("received-single-dto-response-multi-dto")
+ URI singleDtoReceived_MultiDtoResponse_Uri;
+
+ @TestHTTPResource("received-multi-dto-response-none")
+ URI multiDtoReceived_NoResponse_Uri;
+
+ @TestHTTPResource("received-multi-dto-response-single-dto")
+ URI multiDtoReceived_SingleDtoResponse_Uri;
+
+ @TestHTTPResource("received-multi-dto-response-multi-dto")
+ URI multiDtoReceived_MultiDtoResponse_Uri;
+
+ @TestHTTPResource("broadcast")
+ URI broadcast_Uri;
+
+ @Inject
+ Vertx vertx;
+
+ protected abstract boolean binaryMode();
+
+ protected abstract WebSocketConnector> bounceClientConnector();
+
+ protected abstract WebSocketConnector> multiClientConnector();
+
+ @Test
+ public void testServerEndpoint_SingleTextReceived_NoSent() {
+ // endpoint: void onMessage(String message)
+ var connection = Connection.of(singleTextReceived_NoResponse_Uri, false, 0, binaryMode(), "Ballad of a Prodigal Son");
+ sendAndAssertResponses(vertx, connection);
+ asserter.assertMetrics(0, 1, connection);
+ }
+
+ @Test
+ public void testServerEndpoint_SingleTextReceived_SingleTextSent() {
+ // endpoint: String onMessage(String message)
+ var connection = Connection.of(bounceUri, false, 1, binaryMode(), "Can't Find My Way Home");
+ sendAndAssertResponses(vertx, connection);
+ asserter.assertMetrics(0, 1, connection);
+ }
+
+ @Test
+ public void testServerEndpoint_SingleTextReceived_MultiTextSent() {
+ // endpoint: Multi onMessage(String message)
+ var connection = Connection.of(singleTextReceived_MultiTextResponse_Uri, false, 2, binaryMode(),
+ "Always take a banana to a party");
+ sendAndAssertResponses(vertx, connection);
+ asserter.assertMetrics(0, 1, connection);
+ }
+
+ @Test
+ public void testServerEndpoint_MultiTextReceived_NoSent() {
+ // endpoint: void onMessage(Multi message)
+ var connection = Connection.of(multiTextReceived_NoResponse_Uri, false, 0, binaryMode(), "When I go",
+ "don't cry for me",
+ "In my Father's arms I'll be", "The wounds this world left on my soul");
+ sendAndAssertResponses(vertx, connection);
+ asserter.assertMetrics(0, 4, connection);
+ }
+
+ @Test
+ public void testServerEndpoint_MultiTextReceived_SingleTextSent() {
+ // endpoint: String onMessage(Multi message)
+ var connection = Connection.of(multiTextReceived_SingleTextResponse_Uri, false, 1, "Alpha Shallows", binaryMode(),
+ "Msg1", "Msg2", "Msg3", "Msg4");
+ sendAndAssertResponses(vertx, connection);
+ asserter.assertMetrics(0, 4, connection);
+ }
+
+ @Test
+ public void testServerEndpoint_MultiTextReceived_MultiTextSent() {
+ // endpoint: Multi onMessage(Multi message)
+ var connection = Connection.of(multiTextReceived_MultiTextResponse_Uri, false, 2, binaryMode(), "Msg1", "Msg2", "Msg3");
+ sendAndAssertResponses(vertx, connection);
+ asserter.assertMetrics(0, 3, connection);
+ }
+
+ @Test
+ public void testServerEndpoint_SingleTextReceived_UniTextSent() {
+ // endpoint: Uni onMessage(String message)
+ var connection = Connection.of(singleTextReceived_UniTextResponse_Uri, false, 1, binaryMode(), "Bernie Sanders");
+ sendAndAssertResponses(vertx, connection);
+ asserter.assertMetrics(0, 1, connection);
+ }
+
+ @Test
+ public void testServerEndpoint_SingleDtoReceived_NoSent() {
+ // endpoint: void onMessage(Dto dto)
+ var connection = Connection.of(singleDtoReceived_NoResponse_Uri, false, 0, binaryMode(),
+ "major disappointment speaking");
+ sendAndAssertResponses(vertx, connection);
+ asserter.assertMetrics(0, 1, connection);
+ }
+
+ @Test
+ public void testServerEndpoint_SingleDtoReceived_SingleDtoSent() {
+ // endpoint: Dto onMessage(Dto dto)
+ var connection = Connection.of(singleDtoReceived_SingleDtoResponse_Uri, false, 1, binaryMode(), "abcd123456");
+ sendAndAssertResponses(vertx, connection);
+ asserter.assertMetrics(0, 1, connection);
+ }
+
+ @Test
+ public void testServerEndpoint_SingleDtoReceived_UniDtoSent() {
+ // endpoint: Uni onMessage(Dto dto)
+ var connection = Connection.of(singleDtoReceived_UniDtoResponse_Uri, false, 1, binaryMode(),
+ "Shot heard round the world");
+ sendAndAssertResponses(vertx, connection);
+ asserter.assertMetrics(0, 1, connection);
+ }
+
+ @Test
+ public void testServerEndpoint_SingleDtoReceived_MultiDtoSent() {
+ // endpoint: Multi onMessage(Dto dto)
+ var connection = Connection.of(singleDtoReceived_MultiDtoResponse_Uri, false, 2, binaryMode(),
+ "Bananas are good");
+ sendAndAssertResponses(vertx, connection);
+ asserter.assertMetrics(0, 1, connection);
+ }
+
+ @Test
+ public void testServerEndpoint_MultiDtoReceived_NoSent() {
+ // endpoint: void onMessage(Multi dto)
+ var connection = Connection.of(multiDtoReceived_NoResponse_Uri, false, 0, binaryMode(), "Tell me how ya livin",
+ "Soljie what ya got givin");
+ sendAndAssertResponses(vertx, connection);
+ asserter.assertMetrics(0, 2, connection);
+ }
+
+ @Test
+ public void testServerEndpoint_MultiDtoReceived_SingleDtoSent() {
+ // endpoint: Dto onMessage(Multi message)
+ var connection = Connection.of(multiDtoReceived_SingleDtoResponse_Uri, false, 1, "ut labore et dolore magna aliqua",
+ binaryMode(), "Lorem ipsum dolor sit amet", "consectetur adipiscing elit", "sed do eiusmod tempor incididunt");
+ sendAndAssertResponses(vertx, connection);
+ asserter.assertMetrics(0, 3, connection);
+ }
+
+ @Test
+ public void testServerEndpoint_MultiDtoReceived_MultiDtoSent() {
+ // endpoint: Multi onMessage(Multi dto)
+ var connection = Connection.of(multiDtoReceived_MultiDtoResponse_Uri, false, 2, binaryMode(), "Right", "Left");
+ sendAndAssertResponses(vertx, connection);
+ asserter.assertMetrics(0, 2, connection);
+ }
+
+ @Test
+ public void testClientEndpoint_SingleTextReceived_NoSent() {
+ var clientConn = bounceClientConnector().baseUri(baseUri).connectAndAwait();
+ var msg1 = "Ut enim ad minim veniam";
+ sendClientMessageAndWait(clientConn, msg1);
+ // 'clientConn' sends 'Ut enim ad minim veniam'
+ // 'BounceEndpoint' -> 'String onMessage(String message)' sends 'Response 0: Ut enim ad minim veniam'
+ // 'BounceClient' -> 'void echo(String message)' receives 'Response 0: Ut enim ad minim veniam'
+ // that is received 2 messages and sent 2 messages
+ int clientBytesReceived = stringToBytes("Response 0: " + msg1);
+ int clientBytesSent = stringToBytes(msg1);
+ int serverBytesReceived = clientBytesSent;
+ int serverBytesSent = clientBytesReceived;
+ asserter.assertMetrics(0, 0, 1, serverBytesReceived, serverBytesSent, 1, clientBytesSent, clientBytesReceived);
+
+ msg1 = "quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat";
+ var msg2 = "Duis aute irure dolor in reprehenderit";
+ sendClientMessageAndWait(clientConn, msg1);
+ sendClientMessageAndWait(clientConn, msg2);
+
+ clientBytesReceived = stringToBytes("Response 0: " + msg1, "Response 0: " + msg2);
+ clientBytesSent = stringToBytes(msg1, msg2);
+ serverBytesReceived = clientBytesSent;
+ serverBytesSent = clientBytesReceived;
+ asserter.assertMetrics(0, 0, 2, serverBytesReceived, serverBytesSent, 2, clientBytesSent, clientBytesReceived);
+
+ clientConn.closeAndAwait();
+ }
+
+ @Test
+ public void testClientEndpoint_MultiTextReceived_MultiTextSent() {
+ var clientConn = multiClientConnector().baseUri(baseUri).connectAndAwait();
+ var msg1 = "in voluptate velit esse cillum dolore eu fugiat nulla pariatur";
+ var msg2 = "Excepteur sint occaecat cupidatat non proident";
+ sendClientMessageAndWait(clientConn, msg1);
+ sendClientMessageAndWait(clientConn, msg2);
+
+ // 2 sent: 'clientConn' sends 2 messages
+ // 2 sent, 2 received: 'MultiEndpoint' -> 'Multi echo(Multi messages)' -> accepts and receives message
+ // 2 sent, 2 received: 'MultiClient' -> 'Multi echo(Multi messages)' -> accepts, receives, adds "Response 0: "
+ // 2 received: 'MultiEndpoint' -> accepts and returns empty Multi
+ int clientBytesReceived = stringToBytes(msg1, msg2);
+ int clientBytesSent = stringToBytes(msg1, msg2, msg1 + "Response 0: ", msg2 + "Response 0: ");
+ int serverBytesReceived = clientBytesSent;
+ int serverBytesSent = clientBytesReceived;
+
+ asserter.assertMetrics(0, 0, 4, serverBytesReceived, serverBytesSent, 4, clientBytesSent, clientBytesReceived);
+
+ clientConn.closeAndAwait();
+ }
+
+ @Test
+ public void testServerEndpoint_broadcasting() {
+ // broadcast = true
+ // endpoint: String onMessage(String message)
+
+ var msg1 = "It's alright ma";
+ // expected metrics:
+ // endpoint receives msg1
+ // 2 connections are opened so 2 responses are expected
+ int sentBytes = stringToBytes("Response 0: " + msg1, "Response 0: " + msg1);
+ int receivedBytes = stringToBytes(msg1);
+ var connection1 = Connection.of(broadcast_Uri, true, 1, binaryMode(), msg1);
+
+ var msg2 = "I'm Only Bleeding";
+ // expected metrics:
+ // endpoint receives msg2
+ // 2 connections are opened so 2 responses are expected
+ sentBytes += stringToBytes("Response 0: " + msg2, "Response 0: " + msg2);
+ receivedBytes += stringToBytes(msg2);
+ var connection2 = Connection.of(broadcast_Uri, true, 1, binaryMode(), msg2);
+ sendAndAssertResponses(vertx, connection1, connection2);
+ asserter.assertMetrics(0, 2, sentBytes, receivedBytes);
+ }
+
+ @Test
+ public void testServerEndpoint_SingleTextReceived_SingleTextSent_MultipleConnections() {
+ // endpoint: String onMessage(String message)
+ // testing multiple connections because we need to know that same counter endpoint counter is used by connections
+ var msg = "Can't Find My Way Home";
+
+ try (var client1 = new WSClient(vertx)) {
+ var connection1 = Connection.of(bounceUri, false, 1, binaryMode(), msg);
+ client1.connect(new WebSocketConnectOptions(), bounceUri);
+ sendClientMessageAndWait(client1, msg);
+ asserter.assertMetrics(0, 1, connection1);
+
+ var connection2 = Connection.of(bounceUri, false, 1, binaryMode(), msg);
+ sendAndAssertResponses(vertx, connection2);
+ asserter.assertMetrics(0, 1, connection2);
+
+ var connection3 = Connection.of(bounceUri, false, 1, binaryMode(), msg);
+ sendAndAssertResponses(vertx, connection3);
+ asserter.assertMetrics(0, 1, connection3);
+
+ // --- try different endpoint - start
+ // endpoint: void onMessage(Multi message)
+ var connection = Connection.of(multiTextReceived_NoResponse_Uri, false, 0, binaryMode(), "I get up in the evening",
+ "I ain't nothing but tired", "I could use just a little help");
+ sendAndAssertResponses(vertx, connection);
+ asserter.assertMetrics(0, 3, connection);
+ // --- try different endpoint - end
+
+ var connection4 = Connection.of(bounceUri, false, 1, binaryMode(), msg);
+ sendAndAssertResponses(vertx, connection4);
+ asserter.assertMetrics(0, 1, connection4);
+
+ // send again message via the first connection that is still open
+ sendClientMessageAndWait(client1, msg);
+ asserter.assertMetrics(0, 1, connection1);
+ }
+ }
+
+ private void sendClientMessageAndWait(WSClient client, String msg) {
+ if (binaryMode()) {
+ client.sendAndAwait(Buffer.buffer(msg));
+ } else {
+ client.sendAndAwait(msg);
+ }
+ }
+
+ protected void sendClientMessageAndWait(WebSocketClientConnection clientConn, String msg1) {
+ if (binaryMode()) {
+ clientConn.sendBinaryAndAwait(Buffer.buffer(msg1));
+ } else {
+ clientConn.sendTextAndAwait(msg1);
+ }
+ }
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/Connection.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/Connection.java
new file mode 100644
index 0000000000000..669b56753cbff
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/Connection.java
@@ -0,0 +1,112 @@
+package io.quarkus.websockets.next.test.telemetry;
+
+import static io.quarkus.websockets.next.test.utils.WSClient.ReceiverMode.BINARY;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.net.URI;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.awaitility.Awaitility;
+
+import io.quarkus.websockets.next.test.utils.WSClient;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.WebSocketConnectOptions;
+
+record Connection(URI uri, String[] messagesToSend, WSClient client, boolean broadcast, boolean binaryMode,
+ int responseMultiplier, String[] expectedResponses) {
+
+ static Connection of(URI uri, boolean broadcast, int responseMultiplier, boolean binaryMode, String... messages) {
+ return new Connection(uri, messages, null, broadcast, binaryMode, responseMultiplier,
+ expectedResponses(messages, responseMultiplier));
+ }
+
+ static Connection of(URI uri, boolean broadcast, int responseMultiplier, String expectedResponse, boolean binaryMode,
+ String... messages) {
+ return new Connection(uri, messages, null, broadcast, binaryMode, responseMultiplier,
+ new String[] { expectedResponse });
+ }
+
+ private Connection with(WSClient client) {
+ return new Connection(uri, messagesToSend, client, broadcast, binaryMode, responseMultiplier, expectedResponses);
+ }
+
+ private Set getReceivedMessages() {
+ return client.getMessages().stream().map(Buffer::toString).collect(Collectors.toSet());
+ }
+
+ static void sendAndAssertResponses(Vertx vertx, Connection... connections) {
+ openConnectionsThenSend(connections, vertx, 0);
+ }
+
+ private static String[] expectedResponses(String[] messagesToSend, int responseMultiplier) {
+ return IntStream
+ .range(0, messagesToSend.length)
+ .boxed()
+ . mapMulti((idx, consumer) -> {
+ for (int i = 0; i < responseMultiplier; i++) {
+ consumer.accept("Response " + i + ": " + messagesToSend[idx]);
+ }
+ })
+ .toArray(String[]::new);
+ }
+
+ private static void openConnectionsThenSend(Connection[] connections, Vertx vertx, int idx) {
+ var connection = connections[idx];
+ final WSClient client = connection.binaryMode() ? new WSClient(vertx, BINARY) : new WSClient(vertx);
+ try (client) {
+ client.connect(new WebSocketConnectOptions(), connection.uri());
+ connections[idx] = connection.with(client);
+
+ if (idx < connections.length - 1) {
+ openConnectionsThenSend(connections, vertx, idx + 1);
+ } else {
+ sendMessages(connections, connection.binaryMode());
+ }
+ }
+ }
+
+ private static void sendMessages(Connection[] connections, boolean binaryMode) {
+ for (Connection connection : connections) {
+ for (String message : connection.messagesToSend()) {
+ if (binaryMode) {
+ connection.client().sendAndAwait(Buffer.buffer(message));
+ } else {
+ connection.client().sendAndAwait(message);
+ }
+ }
+ var expectedResponses = connection.expectedResponses();
+ if (expectedResponses.length != 0) {
+ if (connection.broadcast()) {
+ for (Connection conn : connections) {
+ assertResponses(conn, expectedResponses);
+ }
+ } else {
+ assertResponses(connection, expectedResponses);
+ }
+ }
+ }
+ }
+
+ private static void assertResponses(Connection connection, String[] expectedResponses) {
+ Awaitility.await().atMost(Duration.ofSeconds(15))
+ .untilAsserted(() -> assertTrue(connection.client.getMessages().size() >= expectedResponses.length,
+ () -> """
+ Expected %d responses but got %d responses. Tested path was '%s' and sent messages were '%s'.
+ """.formatted(expectedResponses.length, connection.client.getMessages().size(),
+ connection.uri().getPath(), Arrays.toString(connection.messagesToSend()))));
+
+ Set actualResponses = connection.getReceivedMessages();
+
+ for (String expectedResponse : expectedResponses) {
+ assertTrue(actualResponses.contains(expectedResponse),
+ () -> "Expected response '%s' not found, was: %s".formatted(expectedResponse, actualResponses));
+ }
+
+ connection.client().getMessages().clear();
+ }
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/InMemorySpanExporterProducer.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/InMemorySpanExporterProducer.java
new file mode 100644
index 0000000000000..dc4f4d3997c3a
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/InMemorySpanExporterProducer.java
@@ -0,0 +1,18 @@
+package io.quarkus.websockets.next.test.telemetry;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.inject.Produces;
+import jakarta.inject.Singleton;
+
+import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
+
+@ApplicationScoped
+public class InMemorySpanExporterProducer {
+
+ @Produces
+ @Singleton
+ InMemorySpanExporter inMemorySpanExporter() {
+ return InMemorySpanExporter.create();
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/MetricsAsserter.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/MetricsAsserter.java
new file mode 100644
index 0000000000000..2d015cfe40b08
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/MetricsAsserter.java
@@ -0,0 +1,167 @@
+package io.quarkus.websockets.next.test.telemetry;
+
+import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CLIENT_MESSAGES_COUNT_ERRORS;
+import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CLIENT_MESSAGES_COUNT_RECEIVED_BYTES;
+import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CLIENT_MESSAGES_COUNT_SENT;
+import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CLIENT_MESSAGES_COUNT_SENT_BYTES;
+import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.SERVER_MESSAGES_COUNT_ERRORS;
+import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.SERVER_MESSAGES_COUNT_RECEIVED;
+import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.SERVER_MESSAGES_COUNT_RECEIVED_BYTES;
+import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.SERVER_MESSAGES_COUNT_SENT_BYTES;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.function.Consumer;
+
+import org.awaitility.Awaitility;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+
+import io.restassured.RestAssured;
+import io.restassured.response.ValidatableResponse;
+
+public final class MetricsAsserter {
+
+ private int serverReceivedCount;
+ private int serverReceivedCountBytes;
+ private int serverSentCountBytes;
+ private int clientSentCount;
+ private int clientSentCountBytes;
+ private int clientReceivedCountBytes;
+ private int clientErrorCount;
+ private int serverErrorCount;
+
+ void assertMetrics(int serverErrorsDelta, int serverReceivedCountDelta, Connection connection) {
+ int serverSentCountBytesDelta = connectionToSentBytes(connection);
+ int serverReceivedCountBytesDelta = connectionToReceivedBytes(connection);
+ assertMetrics(serverErrorsDelta, 0, serverReceivedCountDelta, serverReceivedCountBytesDelta,
+ serverSentCountBytesDelta, 0, 0, 0);
+ }
+
+ void assertMetrics(int serverErrorsDelta, int serverReceivedCountDelta, int serverSentCountBytesDelta,
+ int serverReceivedCountBytesDelta) {
+ assertMetrics(serverErrorsDelta, 0, serverReceivedCountDelta, serverReceivedCountBytesDelta,
+ serverSentCountBytesDelta, 0, 0, 0);
+ }
+
+ private int connectionToReceivedBytes(Connection connection) {
+ return stringToBytes(connection.messagesToSend());
+ }
+
+ private int connectionToSentBytes(Connection connection) {
+ return stringToBytes(connection.expectedResponses());
+ }
+
+ void assertMetrics(int serverErrorsDelta, int clientErrorsDelta, int serverReceivedCountDelta,
+ int serverReceivedCountBytesDelta, int serverSentCountBytesDelta, int clientSentCountDelta,
+ int clientSentCountBytesDelta, int clientReceivedCountBytesDelta) {
+ addDeltasToTotalsMeasuredPreviously(serverErrorsDelta, clientErrorsDelta, serverReceivedCountDelta,
+ serverReceivedCountBytesDelta, serverSentCountBytesDelta, clientSentCountDelta, clientSentCountBytesDelta,
+ clientReceivedCountBytesDelta);
+
+ assertMetrics(metrics -> metrics
+ .body(assertServerErrorTotal(null, serverErrorCount))
+ .body(assertClientErrorTotal(null, clientErrorCount))
+ .body(assertClientMessagesCountBytesSent(null, clientSentCountBytes))
+ .body(assertClientMessagesCountBytesReceived(null, clientReceivedCountBytes))
+ .body(assertClientMessagesCountSent(null, clientSentCount))
+ .body(assertServerMessagesCountBytesReceived(null, serverReceivedCountBytes))
+ .body(assertServerMessagesCountBytesSent(null, serverSentCountBytes))
+ .body(assertServerMessagesCountReceived(null, serverReceivedCount)));
+ }
+
+ private void addDeltasToTotalsMeasuredPreviously(int serverErrorsDelta, int clientErrorsDelta, int serverReceivedCountDelta,
+ int serverReceivedCountBytesDelta, int serverSentCountBytesDelta, int clientSentCountDelta,
+ int clientSentCountBytesDelta, int clientReceivedCountBytesDelta) {
+ serverReceivedCount += serverReceivedCountDelta;
+ serverReceivedCountBytes += serverReceivedCountBytesDelta;
+ serverSentCountBytes += serverSentCountBytesDelta;
+ clientSentCount += clientSentCountDelta;
+ clientSentCountBytes += clientSentCountBytesDelta;
+ clientReceivedCountBytes += clientReceivedCountBytesDelta;
+ clientErrorCount += clientErrorsDelta;
+ serverErrorCount += serverErrorsDelta;
+ }
+
+ static Matcher assertClientMessagesCountBytesSent(String path, int clientSentCountBytes) {
+ return assertTotal(CLIENT_MESSAGES_COUNT_SENT_BYTES, clientSentCountBytes, path);
+ }
+
+ static Matcher assertClientMessagesCountBytesReceived(String path, int clientReceivedCountBytes) {
+ return assertTotal(CLIENT_MESSAGES_COUNT_RECEIVED_BYTES, clientReceivedCountBytes, path);
+ }
+
+ static Matcher assertClientMessagesCountSent(String path, int clientSentCount) {
+ return assertTotal(CLIENT_MESSAGES_COUNT_SENT, clientSentCount, path);
+ }
+
+ static Matcher assertServerMessagesCountReceived(String path, int serverReceivedCount) {
+ return assertTotal(SERVER_MESSAGES_COUNT_RECEIVED, serverReceivedCount, path);
+ }
+
+ static Matcher assertServerMessagesCountBytesSent(String path, int serverSentCountBytes) {
+ return assertTotal(SERVER_MESSAGES_COUNT_SENT_BYTES, serverSentCountBytes, path);
+ }
+
+ static Matcher assertServerMessagesCountBytesReceived(String path, int serverReceivedCountBytes) {
+ return assertTotal(SERVER_MESSAGES_COUNT_RECEIVED_BYTES, serverReceivedCountBytes, path);
+ }
+
+ static Matcher assertServerErrorTotal(String path, int serverErrorCount) {
+ return assertTotal(SERVER_MESSAGES_COUNT_ERRORS, serverErrorCount, path);
+ }
+
+ static Matcher assertClientErrorTotal(String path, int clientErrorCount) {
+ return assertTotal(CLIENT_MESSAGES_COUNT_ERRORS, clientErrorCount, path);
+ }
+
+ private static Matcher assertTotal(String metricKey, int expectedCount, String path) {
+ var prometheusFormatKey = "%s_total".formatted(toPrometheusFormat(metricKey));
+ return new BaseMatcher<>() {
+ @Override
+ public boolean matches(Object o) {
+ if (o instanceof String str) {
+ var sameKeyMultipleTags = str
+ .lines()
+ .filter(l -> l.contains(prometheusFormatKey))
+ .filter(l -> path == null || l.contains(path)) // filter by path
+ .map(String::trim)
+ .toList();
+ // quarkus_websockets_server_messages_count_received_total{<>} 2.0
+ // quarkus_websockets_server_messages_count_received_total{<>} 5.0
+ // = 7
+ var totalSum = sameKeyMultipleTags
+ .stream()
+ .map(l -> l.substring(l.lastIndexOf(" ")).trim())
+ .map(Double::parseDouble)
+ .map(Double::intValue)
+ .reduce(0, Integer::sum);
+ return totalSum == expectedCount;
+ }
+ return false;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("Key '%s' with value '%d'".formatted(prometheusFormatKey, expectedCount));
+ }
+ };
+ }
+
+ private static String toPrometheusFormat(String dottedMicrometerFormat) {
+ return dottedMicrometerFormat.replace(".", "_").replace("-", "_");
+ }
+
+ private static ValidatableResponse getMetrics() {
+ return RestAssured.given().get("/q/metrics").then().statusCode(200);
+ }
+
+ static void assertMetrics(Consumer assertion) {
+ Awaitility.await().atMost(Duration.ofSeconds(12)).untilAsserted(() -> assertion.accept(getMetrics()));
+ }
+
+ static int stringToBytes(String... messages) {
+ return Arrays.stream(messages).map(String::getBytes).map(s -> s.length).reduce(0, Integer::sum);
+ }
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/MicrometerWebSocketsOnBinaryMessageTest.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/MicrometerWebSocketsOnBinaryMessageTest.java
new file mode 100644
index 0000000000000..bb79677de7f43
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/MicrometerWebSocketsOnBinaryMessageTest.java
@@ -0,0 +1,41 @@
+package io.quarkus.websockets.next.test.telemetry;
+
+import jakarta.inject.Inject;
+
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import io.quarkus.test.QuarkusUnitTest;
+import io.quarkus.websockets.next.WebSocketConnector;
+import io.quarkus.websockets.next.test.telemetry.endpoints.onbinarymessage.BounceClient;
+import io.quarkus.websockets.next.test.telemetry.endpoints.onbinarymessage.MultiClient;
+
+/**
+ * Tests metrics for {@link io.quarkus.websockets.next.OnBinaryMessage}.
+ */
+public class MicrometerWebSocketsOnBinaryMessageTest extends AbstractWebSocketsOnMessageTest {
+
+ @RegisterExtension
+ public static final QuarkusUnitTest test = createQuarkusUnitTest(
+ "io.quarkus.websockets.next.test.telemetry.endpoints.onbinarymessage");
+
+ @Inject
+ WebSocketConnector bounceClientConnector;
+
+ @Inject
+ WebSocketConnector multiClientConnector;
+
+ @Override
+ protected boolean binaryMode() {
+ return true;
+ }
+
+ @Override
+ protected WebSocketConnector> bounceClientConnector() {
+ return bounceClientConnector;
+ }
+
+ @Override
+ protected WebSocketConnector> multiClientConnector() {
+ return multiClientConnector;
+ }
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/MicrometerWebSocketsOnErrorTest.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/MicrometerWebSocketsOnErrorTest.java
new file mode 100644
index 0000000000000..9e72eecf50d18
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/MicrometerWebSocketsOnErrorTest.java
@@ -0,0 +1,204 @@
+package io.quarkus.websockets.next.test.telemetry;
+
+import static io.quarkus.websockets.next.test.telemetry.AbstractWebSocketsOnMessageTest.createQuarkusUnitTest;
+import static io.quarkus.websockets.next.test.telemetry.Connection.sendAndAssertResponses;
+import static io.quarkus.websockets.next.test.telemetry.MetricsAsserter.stringToBytes;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+import jakarta.inject.Inject;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import io.quarkus.test.QuarkusUnitTest;
+import io.quarkus.test.common.http.TestHTTPResource;
+import io.quarkus.websockets.next.WebSocketConnector;
+import io.quarkus.websockets.next.test.telemetry.endpoints.onerror.ErroneousClient_NoOnError;
+import io.quarkus.websockets.next.test.telemetry.endpoints.onerror.ErroneousClient_OverloadedOnError;
+import io.quarkus.websockets.next.test.telemetry.endpoints.onerror.ErroneousServerEndpoint_OnClose;
+import io.quarkus.websockets.next.test.telemetry.endpoints.onerror.ErroneousServerEndpoint_OverriddenOnError;
+import io.quarkus.websockets.next.test.telemetry.endpoints.onerror.GlobalErrorHandler;
+import io.vertx.core.Vertx;
+
+public class MicrometerWebSocketsOnErrorTest {
+
+ @RegisterExtension
+ public static final QuarkusUnitTest test = createQuarkusUnitTest(
+ "io.quarkus.websockets.next.test.telemetry.endpoints.onerror");
+
+ @Inject
+ WebSocketConnector erroneousClientConnector_NoOnErr;
+
+ @Inject
+ WebSocketConnector erroneousClientConnector_OverloadedOnErr;
+
+ @TestHTTPResource("/")
+ URI baseUri;
+
+ @TestHTTPResource("server-error-no-on-error")
+ URI serverEndpoint_NoExplicitOnError_Uri;
+
+ @TestHTTPResource("server-error-overridden-on-error")
+ URI serverEndpoint_OverriddenOnError_Uri;
+
+ @TestHTTPResource("server-error-on-open")
+ URI serverEndpoint_ErrorOnOpen_Uri;
+
+ @TestHTTPResource("server-error-on-close")
+ URI serverEndpoint_ErrorOnClose_Uri;
+
+ @TestHTTPResource("server-error-global-handler")
+ URI serverEndpoint_GlobalErrorHandler_Uri;
+
+ @Inject
+ Vertx vertx;
+
+ private final MetricsAsserter asserter = new MetricsAsserter();
+
+ @Test
+ public void testClientEndpointError_ExceptionInsideOnTextMessage_noExplicitOnErrorDefined() {
+ // client endpoint doesn't define @OnError
+ // @OnTextMessage results in a failure
+
+ var clientConn = erroneousClientConnector_NoOnErr.baseUri(baseUri).connectAndAwait();
+ var msg = "What'd I Say";
+ // 1 sent: use 'clientConn' to send 'msg'
+ // 1 received, 2 sent: 'ErroneousClientEndpoint_NoOnError' -> 'Multi onMessage(String message)', 2 in Multi
+ // 2 received: 'ErroneousClient_NoOnError' -> 'Uni onMessage(String message)'
+ int clientBytesSent = stringToBytes(msg);
+ int clientBytesReceived = stringToBytes("Response 0: " + msg, "Response 1: " + msg);
+ int serverBytesReceived = clientBytesSent;
+ int serverBytesSent = clientBytesReceived;
+
+ clientConn.sendTextAndAwait(msg);
+ Awaitility.await().untilAsserted(() -> Assertions.assertEquals(2, ErroneousClient_NoOnError.MESSAGES.size()));
+ asserter.assertMetrics(0, 0, 1, serverBytesReceived, serverBytesSent, 1, clientBytesSent, clientBytesReceived);
+
+ // 'ErroneousClient_NoOnError' throws exception inside 'onMessage' after it received 4 messages
+ clientConn.sendTextAndAwait(msg);
+ Awaitility.await().untilAsserted(() -> Assertions.assertEquals(4, ErroneousClient_NoOnError.MESSAGES.size()));
+ asserter.assertMetrics(0, 1, 1, serverBytesReceived, serverBytesSent, 1, clientBytesSent, clientBytesReceived);
+
+ clientConn.closeAndAwait();
+ }
+
+ @Test
+ public void testClientEndpointError_ExceptionInsideOnTextMessage_WithOverloadedOnError() throws InterruptedException {
+ // client endpoint defines multiple @OnError
+ // @OnTextMessage results in a failure
+
+ var clientConn = erroneousClientConnector_OverloadedOnErr.baseUri(baseUri).connectAndAwait();
+ var msg = "What'd I Say";
+ int clientBytesSent = stringToBytes(msg);
+ int clientBytesReceived = stringToBytes("Response 0: " + msg, "Response 1: " + msg);
+ int serverBytesReceived = clientBytesSent;
+ int serverBytesSent = clientBytesReceived;
+
+ // 1 sent: use 'clientConn' to send 'msg'
+ // 1 received, 2 sent: 'ErroneousClientEndpoint_OverloadedOnError' -> 'Multi onMessage(String message)'
+ // 2 received: 'ErroneousClient_OverloadedOnError' -> 'Uni onMessage(String message)'
+ clientConn.sendTextAndAwait(msg);
+
+ // assert messages and metrics
+ Awaitility.await().untilAsserted(() -> Assertions.assertEquals(2, ErroneousClient_OverloadedOnError.MESSAGES.size()));
+ // assert no client exception collected as metric
+ asserter.assertMetrics(0, 0, 1, serverBytesReceived, serverBytesSent, 1, clientBytesSent, clientBytesReceived);
+
+ // 1 sent: use 'clientConn' to send 'msg'
+ // 1 received, 2 sent: 'ErroneousClientEndpoint_OverloadedOnError' -> 'Multi onMessage(String message)'
+ // 2 received: 'ErroneousClient_OverloadedOnError' -> 'Uni onMessage(String message)'
+ // after 4 messages, a RuntimeException is thrown
+ // 1 sent: 'ErroneousClient_OverloadedOnError' recovers with 'recoveryMsg' in
+ // @OnError 'String onError(RuntimeException e)'
+ // 1 received, 2 sent: 'ErroneousClientEndpoint_OverloadedOnError' (that 'recoveryMsg')
+ // 2 received: in 'ErroneousClient_OverloadedOnError'
+ // === total expected: 6 received, 6 sent
+ clientConn.sendTextAndAwait(msg);
+
+ // client @OnError returns this String which is sent to the server @OnMessage, so expect extra bytes
+ var recoveryMsg = "Expected error - 4 items";
+ int extraClientSentBytes = stringToBytes(recoveryMsg);
+ int extraServerReceivedBytes = extraClientSentBytes;
+ int extraServerSentBytes = stringToBytes("Response 0: " + recoveryMsg, "Response 1: " + recoveryMsg);
+ int extraClientReceivedBytes = extraServerSentBytes;
+
+ // assert messages and metrics
+ Awaitility.await().untilAsserted(() -> Assertions.assertEquals(6, ErroneousClient_OverloadedOnError.MESSAGES.size()));
+ assertTrue(ErroneousClient_OverloadedOnError.RUNTIME_EXCEPTION_LATCH.await(2, TimeUnit.SECONDS));
+ asserter.assertMetrics(0, 1, 2, serverBytesReceived + extraServerReceivedBytes, serverBytesSent + extraServerSentBytes,
+ 2, clientBytesSent + extraClientSentBytes, clientBytesReceived + extraClientReceivedBytes);
+
+ // 1 sent: use 'clientConn' to send 'msg'
+ // 1 received, 2 sent: 'ErroneousClientEndpoint_OverloadedOnError' -> 'Multi onMessage(String message)'
+ // 2 received: 'ErroneousClient_OverloadedOnError' -> 'Uni onMessage(String message)'
+ clientConn.sendTextAndAwait(msg);
+
+ // assert messages and metrics
+ Awaitility.await().untilAsserted(() -> Assertions.assertEquals(8, ErroneousClient_OverloadedOnError.MESSAGES.size()));
+ // after 8 messages, an IllegalStateException is thrown
+ // @OnError void onError(IllegalStateException e)
+ assertTrue(ErroneousClient_OverloadedOnError.ILLEGAL_STATE_EXCEPTION_LATCH.await(2, TimeUnit.SECONDS));
+ asserter.assertMetrics(0, 1, 1, serverBytesReceived, serverBytesSent, 1, clientBytesSent, clientBytesReceived);
+
+ clientConn.closeAndAwait();
+ }
+
+ @Test
+ public void testServerEndpointError_ExceptionDuringTextDecoding_noExplicitOnErrorDefined() {
+ // server endpoint @OnTextMessage: Uni onMessage(Multi dto)
+ // text codec throws exception
+ // no explicit @OnError
+ var connection = Connection.of(serverEndpoint_NoExplicitOnError_Uri, false, 0, false, "Billions");
+ sendAndAssertResponses(vertx, connection);
+ asserter.assertMetrics(1, 1, connection);
+ }
+
+ @Test
+ public void testServerEndpointError_ExceptionDuringBinaryDecoding_OnErrorOverloaded() throws InterruptedException {
+ // server endpoint @OnBinaryMessage: Uni onMessage(Multi dto)
+ // @OnError: void onError(RuntimeException e)
+ var msg = "Wendy";
+ var connection = Connection.of(serverEndpoint_OverriddenOnError_Uri, false, 0, true, msg);
+ sendAndAssertResponses(vertx, connection);
+ assertTrue(ErroneousServerEndpoint_OverriddenOnError.RUNTIME_EXCEPTION_LATCH.await(5, TimeUnit.SECONDS));
+ asserter.assertMetrics(1, 1, connection);
+ }
+
+ @Test
+ public void testServerEndpointError_ExceptionInsideOnOpen() {
+ // error happens in @OnOpen, @OnTextMessage is invoked but the connection is already closed
+ var connection = Connection.of(serverEndpoint_ErrorOnOpen_Uri, false, 0, false, "Rhodes");
+ sendAndAssertResponses(vertx, connection);
+ asserter.assertMetrics(1, 1, connection);
+ }
+
+ @Test
+ public void testServerEndpointError_ExceptionInsideOnClose() throws InterruptedException {
+ // @OnBinaryMessage is called: Multi onMessage(String message)
+ // expect 1 received message and one response
+ // @OnClose fails with IllegalStateException
+ // explicitly declared @OnError catches the exception
+ var connection = Connection.of(serverEndpoint_ErrorOnClose_Uri, false, 1, "Bobby", true, "Chuck");
+ sendAndAssertResponses(vertx, connection);
+ assertTrue(ErroneousServerEndpoint_OnClose.ILLEGAL_STATE_EXCEPTION_LATCH.await(7, TimeUnit.SECONDS));
+ asserter.assertMetrics(1, 1, connection);
+ }
+
+ @Test
+ public void testServerEndpointError_GlobalErrorHandler() throws InterruptedException {
+ // test that error handled by a global error handler (defined outside the endpoint) are accounted for
+ // global error handler recovers exception with original message: String onError(IllegalArgumentException e)
+ // we need to check that both error and response sent from the global handler (bytes) are collected as a metric
+ var connection = Connection.of(serverEndpoint_GlobalErrorHandler_Uri, false, 1, false, "Hold the Line");
+ sendAndAssertResponses(vertx, connection);
+ assertTrue(GlobalErrorHandler.ILLEGAL_ARGUMENT_EXCEPTION_LATCH.await(5, TimeUnit.SECONDS));
+ // on each message, an exception is raised, we send 1 message -> expect 1 error
+ asserter.assertMetrics(1, 1, connection);
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/MicrometerWebSocketsOnTextMessageTest.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/MicrometerWebSocketsOnTextMessageTest.java
new file mode 100644
index 0000000000000..241599196ab59
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/MicrometerWebSocketsOnTextMessageTest.java
@@ -0,0 +1,146 @@
+package io.quarkus.websockets.next.test.telemetry;
+
+import static io.quarkus.websockets.next.test.telemetry.Connection.sendAndAssertResponses;
+import static io.quarkus.websockets.next.test.telemetry.MetricsAsserter.assertClientMessagesCountBytesReceived;
+import static io.quarkus.websockets.next.test.telemetry.MetricsAsserter.assertClientMessagesCountBytesSent;
+import static io.quarkus.websockets.next.test.telemetry.MetricsAsserter.assertClientMessagesCountSent;
+import static io.quarkus.websockets.next.test.telemetry.MetricsAsserter.assertMetrics;
+import static io.quarkus.websockets.next.test.telemetry.MetricsAsserter.assertServerMessagesCountBytesReceived;
+import static io.quarkus.websockets.next.test.telemetry.MetricsAsserter.assertServerMessagesCountBytesSent;
+import static io.quarkus.websockets.next.test.telemetry.MetricsAsserter.assertServerMessagesCountReceived;
+import static io.quarkus.websockets.next.test.telemetry.MetricsAsserter.stringToBytes;
+
+import java.net.URI;
+
+import jakarta.inject.Inject;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import io.quarkus.test.QuarkusUnitTest;
+import io.quarkus.test.common.http.TestHTTPResource;
+import io.quarkus.websockets.next.WebSocketConnector;
+import io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage.BounceClient;
+import io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage.ClientEndpointWithPathParams;
+import io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage.MultiClient;
+
+/**
+ * Tests metrics for {@link io.quarkus.websockets.next.OnTextMessage}.
+ */
+public class MicrometerWebSocketsOnTextMessageTest extends AbstractWebSocketsOnMessageTest {
+
+ @RegisterExtension
+ public static final QuarkusUnitTest test = createQuarkusUnitTest(
+ "io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage");
+
+ @Inject
+ WebSocketConnector bounceClientConnector;
+
+ @Inject
+ WebSocketConnector multiClientConnector;
+
+ @Inject
+ WebSocketConnector clientWithPathParamsConnector;
+
+ @TestHTTPResource("/ping/ho/and/hey")
+ URI testServerPathParam1;
+
+ @TestHTTPResource("/ping/amy/and/macdonald")
+ URI testServerPathParam2;
+
+ @Override
+ protected boolean binaryMode() {
+ return false;
+ }
+
+ @Override
+ protected WebSocketConnector> bounceClientConnector() {
+ return bounceClientConnector;
+ }
+
+ @Override
+ protected WebSocketConnector> multiClientConnector() {
+ return multiClientConnector;
+ }
+
+ @Test
+ public void testServerEndpoint_PathParams_ResponseFromOnOpenMethod() {
+ // endpoint: @OnOpen String process(@PathParam String one, @PathParam String two)
+ // path: /ping/{one}/and/{two} -> one:two
+ var path = "/ping/:one/and/:two";
+ var expectedResponse = "ho:hey"; // path is /ping/ho/and/hey
+ var connection1 = Connection.of(testServerPathParam1, false, 1, expectedResponse, binaryMode(), "whatever");
+ sendAndAssertResponses(vertx, connection1);
+
+ // assert totals for all the path tags
+ asserter.assertMetrics(0, 1, connection1);
+ // assert metric for our path tag only (this is sent from @OnOpen)
+ int serverBytesSent1 = stringToBytes(expectedResponse);
+ assertMetrics(metrics -> metrics.body(assertServerMessagesCountBytesSent(path, serverBytesSent1)));
+
+ var expectedResponse2 = "amy:macdonald"; // path is /ping/amy/and/macdonald
+ var connection2 = Connection.of(testServerPathParam2, false, 1, expectedResponse2, binaryMode(), "whatever");
+ sendAndAssertResponses(vertx, connection2);
+
+ // assert totals for all the path tags
+ asserter.assertMetrics(0, 1, connection2);
+ // assert metric for our path tag only (this is sent from @OnOpen) (no deltas, so previous bytes + current ones)
+ int serverBytesSent2 = stringToBytes(expectedResponse2);
+ assertMetrics(metrics -> metrics.body(assertServerMessagesCountBytesSent(path, serverBytesSent1 + serverBytesSent2)));
+ }
+
+ @Test
+ public void testClientEndpoint_PathParam() {
+ // server endpoint: Uni onMessage(String message)
+ // client endpoint: void onTextMessage(String message)
+ var msg = "Ut enim ad minim veniam";
+
+ var clientConn = clientWithPathParamsConnector
+ .baseUri(baseUri)
+ .pathParam("name", "Lu=")
+ .connectAndAwait();
+ sendClientMessageAndWait(clientConn, msg);
+ // 'clientConn' sends 'Ut enim ad minim veniam'
+ // server endpoint - 1 received, 1 sent: Uni onMessage(String message)
+ // client endpoint - 1 received: void onTextMessage(String message)
+ // that is received 2 messages and sent 2 messages
+ int clientBytesReceived = stringToBytes("Response 0: " + msg);
+ int clientBytesSent = stringToBytes(msg);
+ int serverBytesReceived = clientBytesSent;
+ int serverBytesSent = clientBytesReceived;
+
+ // assert totals for all the path tags
+ asserter.assertMetrics(0, 0, 1, serverBytesReceived, serverBytesSent, 1, clientBytesSent, clientBytesReceived);
+ // assert metric for our path tag only
+ assertMetrics(metrics -> metrics
+ .body(assertClientMessagesCountBytesSent("/client-endpoint-with-path-param/{name}", clientBytesSent))
+ .body(assertClientMessagesCountSent("/client-endpoint-with-path-param/{name}", 1))
+ .body(assertClientMessagesCountBytesReceived("/client-endpoint-with-path-param/{name}", clientBytesReceived))
+ .body(assertServerMessagesCountReceived("/client-endpoint-with-path-param/:name", 1))
+ .body(assertServerMessagesCountBytesReceived("/client-endpoint-with-path-param/:name", serverBytesReceived))
+ .body(assertServerMessagesCountBytesSent("/client-endpoint-with-path-param/:name", serverBytesSent)));
+
+ clientConn.closeAndAwait();
+
+ clientConn = clientWithPathParamsConnector
+ .baseUri(baseUri)
+ .pathParam("name", "Go=Through")
+ .connectAndAwait();
+ sendClientMessageAndWait(clientConn, msg);
+
+ // assert totals for all the path tags
+ asserter.assertMetrics(0, 0, 1, serverBytesReceived, serverBytesSent, 1, clientBytesSent, clientBytesReceived);
+ // assert metric for our path tag only (prev + current ones, no deltas here)
+ assertMetrics(metrics -> metrics
+ .body(assertClientMessagesCountBytesSent("/client-endpoint-with-path-param/{name}", clientBytesSent * 2))
+ .body(assertClientMessagesCountSent("/client-endpoint-with-path-param/{name}", 2))
+ .body(assertClientMessagesCountBytesReceived("/client-endpoint-with-path-param/{name}",
+ clientBytesReceived * 2))
+ .body(assertServerMessagesCountReceived("/client-endpoint-with-path-param/:name", 2))
+ .body(assertServerMessagesCountBytesReceived("/client-endpoint-with-path-param/:name", serverBytesReceived * 2))
+ .body(assertServerMessagesCountBytesSent("/client-endpoint-with-path-param/:name", serverBytesSent * 2)));
+
+ clientConn.closeAndAwait();
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/OpenTelemetryWebSocketsTest.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/OpenTelemetryWebSocketsTest.java
new file mode 100644
index 0000000000000..710f6be951336
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/OpenTelemetryWebSocketsTest.java
@@ -0,0 +1,211 @@
+package io.quarkus.websockets.next.test.telemetry;
+
+import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CONNECTION_CLIENT_ATTR_KEY;
+import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CONNECTION_CLOSED;
+import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CONNECTION_ENDPOINT_ATTR_KEY;
+import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CONNECTION_ID_ATTR_KEY;
+import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CONNECTION_OPENED;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.net.URI;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import jakarta.inject.Inject;
+
+import org.awaitility.Awaitility;
+import org.jboss.shrinkwrap.api.asset.StringAsset;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
+import io.opentelemetry.sdk.trace.data.SpanData;
+import io.quarkus.builder.Version;
+import io.quarkus.maven.dependency.Dependency;
+import io.quarkus.test.QuarkusUnitTest;
+import io.quarkus.test.common.http.TestHTTPResource;
+import io.quarkus.websockets.next.WebSocketConnector;
+import io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage.BounceClient;
+import io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage.BounceEndpoint;
+import io.quarkus.websockets.next.test.utils.WSClient;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.WebSocketConnectOptions;
+
+public class OpenTelemetryWebSocketsTest {
+
+ @RegisterExtension
+ public static final QuarkusUnitTest test = new QuarkusUnitTest()
+ .withApplicationRoot(root -> root
+ .addClasses(BounceEndpoint.class, WSClient.class, InMemorySpanExporterProducer.class, BounceClient.class)
+ .addAsResource(new StringAsset("""
+ quarkus.otel.bsp.export.timeout=1s
+ quarkus.otel.bsp.schedule.delay=50
+ """), "application.properties"))
+ .setForcedDependencies(
+ List.of(Dependency.of("io.quarkus", "quarkus-opentelemetry-deployment", Version.getVersion())));
+
+ @TestHTTPResource("bounce")
+ URI bounceUri;
+
+ @TestHTTPResource("/")
+ URI baseUri;
+
+ @Inject
+ Vertx vertx;
+
+ @Inject
+ InMemorySpanExporter spanExporter;
+
+ @Inject
+ WebSocketConnector connector;
+
+ @BeforeEach
+ public void resetSpans() {
+ spanExporter.reset();
+ BounceEndpoint.connectionId = null;
+ BounceEndpoint.endpointId = null;
+ BounceEndpoint.MESSAGES.clear();
+ BounceClient.MESSAGES.clear();
+ BounceClient.CLOSED_LATCH = new CountDownLatch(1);
+ BounceEndpoint.CLOSED_LATCH = new CountDownLatch(1);
+ }
+
+ @Test
+ public void testServerEndpointTracesOnly() {
+ assertEquals(0, spanExporter.getFinishedSpanItems().size());
+ try (WSClient client = new WSClient(vertx)) {
+ client.connect(new WebSocketConnectOptions(), bounceUri);
+ var response = client.sendAndAwaitReply("How U Livin'").toString();
+ assertEquals("How U Livin'", response);
+ }
+ waitForTracesToArrive(3);
+ assertServerTraces();
+ }
+
+ @Test
+ public void testClientAndServerEndpointTraces() throws InterruptedException {
+ var clientConn = connector.baseUri(baseUri).connectAndAwait();
+ clientConn.sendTextAndAwait("Make It Bun Dem");
+
+ // assert client and server called
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(1, BounceEndpoint.MESSAGES.size());
+ assertEquals("Make It Bun Dem", BounceEndpoint.MESSAGES.get(0));
+ assertEquals(1, BounceClient.MESSAGES.size());
+ assertEquals("Make It Bun Dem", BounceClient.MESSAGES.get(0));
+ });
+
+ clientConn.closeAndAwait();
+ // assert connection closed and client/server were notified
+ assertTrue(BounceClient.CLOSED_LATCH.await(5, TimeUnit.SECONDS));
+ assertTrue(BounceEndpoint.CLOSED_LATCH.await(5, TimeUnit.SECONDS));
+
+ waitForTracesToArrive(5);
+ assertServerTraces();
+ assertClientTraces();
+ }
+
+ @Test
+ public void testServerTracesWhenErrorOnMessage() {
+ assertEquals(0, spanExporter.getFinishedSpanItems().size());
+ try (WSClient client = new WSClient(vertx)) {
+ client.connect(new WebSocketConnectOptions(), bounceUri);
+ var response = client.sendAndAwaitReply("It's Alright, Ma").toString();
+ assertEquals("It's Alright, Ma", response);
+ response = client.sendAndAwaitReply("I'm Only Bleeding").toString();
+ assertEquals("I'm Only Bleeding", response);
+
+ client.sendAndAwait("throw-exception");
+ Awaitility.await().atMost(Duration.ofSeconds(5)).until(client::isClosed);
+ assertEquals(WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(), client.closeStatusCode());
+ }
+ waitForTracesToArrive(3);
+ assertServerTraces();
+ }
+
+ private void assertClientTraces() {
+ var connectionOpenedSpan = getSpanByName(CONNECTION_OPENED, Target.CLIENT);
+ assertNotNull(getConnectionIdAttrVal(connectionOpenedSpan));
+ assertNotNull(getClientIdAttrVal(connectionOpenedSpan));
+ assertTrue(connectionOpenedSpan.getLinks().isEmpty());
+
+ var connectionClosedSpan = getSpanByName(CONNECTION_CLOSED, Target.CLIENT);
+ assertNotNull(getConnectionIdAttrVal(connectionClosedSpan));
+ assertNotNull(getClientIdAttrVal(connectionClosedSpan));
+ assertEquals(1, connectionClosedSpan.getLinks().size());
+ assertEquals(connectionOpenedSpan.getSpanId(), connectionClosedSpan.getLinks().get(0).getSpanContext().getSpanId());
+ }
+
+ private void assertServerTraces() {
+ var initialRequestSpan = getSpanByName("GET /bounce", Target.SERVER_INITIAL_REQ);
+
+ var connectionOpenedSpan = getSpanByName(CONNECTION_OPENED, Target.SERVER_WS);
+ assertEquals(BounceEndpoint.connectionId, getConnectionIdAttrVal(connectionOpenedSpan));
+ assertEquals(BounceEndpoint.endpointId, getEndpointIdAttrVal(connectionOpenedSpan));
+ assertEquals(initialRequestSpan.getSpanId(), connectionOpenedSpan.getLinks().get(0).getSpanContext().getSpanId());
+
+ var connectionClosedSpan = getSpanByName(CONNECTION_CLOSED, Target.SERVER_WS);
+ assertEquals(BounceEndpoint.connectionId, getConnectionIdAttrVal(connectionClosedSpan));
+ assertEquals(BounceEndpoint.endpointId, getEndpointIdAttrVal(connectionClosedSpan));
+ assertEquals(1, connectionClosedSpan.getLinks().size());
+ assertEquals(connectionOpenedSpan.getSpanId(), connectionClosedSpan.getLinks().get(0).getSpanContext().getSpanId());
+ }
+
+ private String getConnectionIdAttrVal(SpanData connectionOpenedSpan) {
+ return connectionOpenedSpan
+ .getAttributes()
+ .get(AttributeKey.stringKey(CONNECTION_ID_ATTR_KEY));
+ }
+
+ private String getClientIdAttrVal(SpanData connectionOpenedSpan) {
+ return connectionOpenedSpan
+ .getAttributes()
+ .get(AttributeKey.stringKey(CONNECTION_CLIENT_ATTR_KEY));
+ }
+
+ private String getEndpointIdAttrVal(SpanData connectionOpenedSpan) {
+ return connectionOpenedSpan
+ .getAttributes()
+ .get(AttributeKey.stringKey(CONNECTION_ENDPOINT_ATTR_KEY));
+ }
+
+ private void waitForTracesToArrive(int expectedTracesCount) {
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(30))
+ .untilAsserted(() -> assertEquals(expectedTracesCount, spanExporter.getFinishedSpanItems().size()));
+ }
+
+ private SpanData getSpanByName(String name, Target target) {
+ return spanExporter.getFinishedSpanItems()
+ .stream()
+ .filter(sd -> name.equals(sd.getName()))
+ .filter(sd -> target.appliesTo(sd.getAttributes()))
+ .findFirst()
+ .orElseThrow(() -> new AssertionError("Expected span not found: " + name));
+ }
+
+ private enum Target {
+ SERVER_WS,
+ SERVER_INITIAL_REQ,
+ CLIENT;
+
+ private boolean appliesTo(Attributes attributes) {
+ if (this == SERVER_INITIAL_REQ) {
+ // no endpoint / client attributes => there is just one span of that name
+ return true;
+ }
+ if (this == SERVER_WS) {
+ return attributes.get(AttributeKey.stringKey(CONNECTION_ENDPOINT_ATTR_KEY)) != null;
+ }
+ return attributes.get(AttributeKey.stringKey(CONNECTION_CLIENT_ATTR_KEY)) != null;
+ }
+ }
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/BounceClient.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/BounceClient.java
new file mode 100644
index 0000000000000..b4ade79356a61
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/BounceClient.java
@@ -0,0 +1,13 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onbinarymessage;
+
+import io.quarkus.websockets.next.OnBinaryMessage;
+import io.quarkus.websockets.next.WebSocketClient;
+
+@WebSocketClient(path = "/bounce")
+public class BounceClient {
+
+ @OnBinaryMessage
+ void echo(String message) {
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/BounceEndpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/BounceEndpoint.java
new file mode 100644
index 0000000000000..7e5dbb1019618
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/BounceEndpoint.java
@@ -0,0 +1,14 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onbinarymessage;
+
+import io.quarkus.websockets.next.OnBinaryMessage;
+import io.quarkus.websockets.next.WebSocket;
+
+@WebSocket(path = "/bounce")
+public class BounceEndpoint {
+
+ @OnBinaryMessage
+ public String onMessage(String message) {
+ return "Response 0: " + message;
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/BroadcastingEndpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/BroadcastingEndpoint.java
new file mode 100644
index 0000000000000..62b9d93ae7b9e
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/BroadcastingEndpoint.java
@@ -0,0 +1,14 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onbinarymessage;
+
+import io.quarkus.websockets.next.OnBinaryMessage;
+import io.quarkus.websockets.next.WebSocket;
+
+@WebSocket(path = "/broadcast")
+public class BroadcastingEndpoint {
+
+ @OnBinaryMessage(broadcast = true)
+ public String onMessage(String message) {
+ return "Response 0: " + message;
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/Dto.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/Dto.java
new file mode 100644
index 0000000000000..a23f0010af32b
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/Dto.java
@@ -0,0 +1,5 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onbinarymessage;
+
+public record Dto(String property) {
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/DtoBinaryCodec.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/DtoBinaryCodec.java
new file mode 100644
index 0000000000000..59606220cc838
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/DtoBinaryCodec.java
@@ -0,0 +1,29 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onbinarymessage;
+
+import java.lang.reflect.Type;
+
+import jakarta.annotation.Priority;
+import jakarta.inject.Singleton;
+
+import io.quarkus.websockets.next.BinaryMessageCodec;
+import io.vertx.core.buffer.Buffer;
+
+@Priority(15)
+@Singleton
+public class DtoBinaryCodec implements BinaryMessageCodec {
+ @Override
+ public boolean supports(Type type) {
+ return type.equals(Dto.class);
+ }
+
+ @Override
+ public Buffer encode(Dto dto) {
+ return Buffer.buffer(dto.property());
+ }
+
+ @Override
+ public Dto decode(Type type, Buffer value) {
+ return new Dto(value.toString());
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/MultiClient.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/MultiClient.java
new file mode 100644
index 0000000000000..d20ebbc6a085e
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/MultiClient.java
@@ -0,0 +1,15 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onbinarymessage;
+
+import io.quarkus.websockets.next.OnBinaryMessage;
+import io.quarkus.websockets.next.WebSocketClient;
+import io.smallrye.mutiny.Multi;
+
+@WebSocketClient(path = "/multi")
+public class MultiClient {
+
+ @OnBinaryMessage
+ Multi echo(Multi messages) {
+ return messages.map(msg -> "Response 0: " + msg);
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/MultiDtoReceived_MultiDtoResponse_Endpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/MultiDtoReceived_MultiDtoResponse_Endpoint.java
new file mode 100644
index 0000000000000..c840e43157eb6
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/MultiDtoReceived_MultiDtoResponse_Endpoint.java
@@ -0,0 +1,18 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onbinarymessage;
+
+import io.quarkus.websockets.next.OnBinaryMessage;
+import io.quarkus.websockets.next.WebSocket;
+import io.smallrye.mutiny.Multi;
+
+@WebSocket(path = "/received-multi-dto-response-multi-dto")
+public class MultiDtoReceived_MultiDtoResponse_Endpoint {
+
+ @OnBinaryMessage
+ public Multi onMessage(Multi messages) {
+ return messages
+ .map(Dto::property)
+ .flatMap(msg -> Multi.createFrom().items("Response 0: " + msg, "Response 1: " + msg))
+ .map(Dto::new);
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/MultiDtoReceived_NoResponse_Endpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/MultiDtoReceived_NoResponse_Endpoint.java
new file mode 100644
index 0000000000000..40b0efd7bcc3f
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/MultiDtoReceived_NoResponse_Endpoint.java
@@ -0,0 +1,15 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onbinarymessage;
+
+import io.quarkus.websockets.next.OnBinaryMessage;
+import io.quarkus.websockets.next.WebSocket;
+import io.smallrye.mutiny.Multi;
+
+@WebSocket(path = "/received-multi-dto-response-none")
+public class MultiDtoReceived_NoResponse_Endpoint {
+
+ @OnBinaryMessage
+ public void onMessage(Multi dto) {
+
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/MultiDtoReceived_SingleDtoResponse_Endpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/MultiDtoReceived_SingleDtoResponse_Endpoint.java
new file mode 100644
index 0000000000000..41a424ccacd14
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/MultiDtoReceived_SingleDtoResponse_Endpoint.java
@@ -0,0 +1,15 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onbinarymessage;
+
+import io.quarkus.websockets.next.OnBinaryMessage;
+import io.quarkus.websockets.next.WebSocket;
+import io.smallrye.mutiny.Multi;
+
+@WebSocket(path = "/received-multi-dto-response-single-dto")
+public class MultiDtoReceived_SingleDtoResponse_Endpoint {
+
+ @OnBinaryMessage
+ public String onMessage(Multi message) {
+ return "ut labore et dolore magna aliqua";
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/MultiEndpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/MultiEndpoint.java
new file mode 100644
index 0000000000000..6a016fa0b59e5
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/MultiEndpoint.java
@@ -0,0 +1,15 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onbinarymessage;
+
+import io.quarkus.websockets.next.OnBinaryMessage;
+import io.quarkus.websockets.next.WebSocket;
+import io.smallrye.mutiny.Multi;
+
+@WebSocket(path = "/multi")
+public class MultiEndpoint {
+
+ @OnBinaryMessage
+ Multi echo(Multi messages) {
+ return messages.filter(msg -> !msg.startsWith("Response 0: "));
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/MultiTextReceived_MultiTextResponse_Endpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/MultiTextReceived_MultiTextResponse_Endpoint.java
new file mode 100644
index 0000000000000..c2b0eaf17eef7
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/MultiTextReceived_MultiTextResponse_Endpoint.java
@@ -0,0 +1,15 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onbinarymessage;
+
+import io.quarkus.websockets.next.OnBinaryMessage;
+import io.quarkus.websockets.next.WebSocket;
+import io.smallrye.mutiny.Multi;
+
+@WebSocket(path = "/received-multi-text-response-multi-text")
+public class MultiTextReceived_MultiTextResponse_Endpoint {
+
+ @OnBinaryMessage
+ public Multi onMessage(Multi messages) {
+ return messages.flatMap(msg -> Multi.createFrom().items("Response 0: " + msg, "Response 1: " + msg));
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/MultiTextReceived_NoResponse_Endpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/MultiTextReceived_NoResponse_Endpoint.java
new file mode 100644
index 0000000000000..1c696acc26287
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/MultiTextReceived_NoResponse_Endpoint.java
@@ -0,0 +1,15 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onbinarymessage;
+
+import io.quarkus.websockets.next.OnBinaryMessage;
+import io.quarkus.websockets.next.WebSocket;
+import io.smallrye.mutiny.Multi;
+
+@WebSocket(path = "/received-multi-text-response-none")
+public class MultiTextReceived_NoResponse_Endpoint {
+
+ @OnBinaryMessage
+ public void onMessage(Multi message) {
+
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/MultiTextReceived_SingleTextResponse_Endpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/MultiTextReceived_SingleTextResponse_Endpoint.java
new file mode 100644
index 0000000000000..e3c4acb81aba0
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/MultiTextReceived_SingleTextResponse_Endpoint.java
@@ -0,0 +1,15 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onbinarymessage;
+
+import io.quarkus.websockets.next.OnBinaryMessage;
+import io.quarkus.websockets.next.WebSocket;
+import io.smallrye.mutiny.Multi;
+
+@WebSocket(path = "/received-multi-text-response-single-text")
+public class MultiTextReceived_SingleTextResponse_Endpoint {
+
+ @OnBinaryMessage
+ public String onMessage(Multi message) {
+ return "Alpha Shallows";
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/SingleDtoReceived_MultiDtoResponse_Endpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/SingleDtoReceived_MultiDtoResponse_Endpoint.java
new file mode 100644
index 0000000000000..b6ba93f62202e
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/SingleDtoReceived_MultiDtoResponse_Endpoint.java
@@ -0,0 +1,15 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onbinarymessage;
+
+import io.quarkus.websockets.next.OnBinaryMessage;
+import io.quarkus.websockets.next.WebSocket;
+import io.smallrye.mutiny.Multi;
+
+@WebSocket(path = "/received-single-dto-response-multi-dto")
+public class SingleDtoReceived_MultiDtoResponse_Endpoint {
+
+ @OnBinaryMessage
+ public Multi onMessage(Dto dto) {
+ return Multi.createFrom().items("Response 0: " + dto.property(), "Response 1: " + dto.property()).map(Dto::new);
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/SingleDtoReceived_NoResponse_Endpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/SingleDtoReceived_NoResponse_Endpoint.java
new file mode 100644
index 0000000000000..32dfe6c55ea0f
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/SingleDtoReceived_NoResponse_Endpoint.java
@@ -0,0 +1,13 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onbinarymessage;
+
+import io.quarkus.websockets.next.OnBinaryMessage;
+import io.quarkus.websockets.next.WebSocket;
+
+@WebSocket(path = "/received-single-dto-response-none")
+public class SingleDtoReceived_NoResponse_Endpoint {
+
+ @OnBinaryMessage
+ public void onMessage(Dto dto) {
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/SingleDtoReceived_SingleDtoResponse_Endpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/SingleDtoReceived_SingleDtoResponse_Endpoint.java
new file mode 100644
index 0000000000000..40051d856c902
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/SingleDtoReceived_SingleDtoResponse_Endpoint.java
@@ -0,0 +1,14 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onbinarymessage;
+
+import io.quarkus.websockets.next.OnBinaryMessage;
+import io.quarkus.websockets.next.WebSocket;
+
+@WebSocket(path = "/received-single-dto-response-single-dto")
+public class SingleDtoReceived_SingleDtoResponse_Endpoint {
+
+ @OnBinaryMessage
+ public Dto onMessage(Dto dto) {
+ return new Dto("Response 0: " + dto.property());
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/SingleDtoReceived_UniDtoResponse_Endpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/SingleDtoReceived_UniDtoResponse_Endpoint.java
new file mode 100644
index 0000000000000..be8f1459de552
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/SingleDtoReceived_UniDtoResponse_Endpoint.java
@@ -0,0 +1,15 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onbinarymessage;
+
+import io.quarkus.websockets.next.OnBinaryMessage;
+import io.quarkus.websockets.next.WebSocket;
+import io.smallrye.mutiny.Uni;
+
+@WebSocket(path = "/received-single-dto-response-uni-dto")
+public class SingleDtoReceived_UniDtoResponse_Endpoint {
+
+ @OnBinaryMessage
+ public Uni onMessage(Dto dto) {
+ return Uni.createFrom().item("Response 0: " + dto.property()).map(Dto::new);
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/SingleTextReceived_MultiTextResponse_Endpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/SingleTextReceived_MultiTextResponse_Endpoint.java
new file mode 100644
index 0000000000000..afe2b6143b2a1
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/SingleTextReceived_MultiTextResponse_Endpoint.java
@@ -0,0 +1,15 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onbinarymessage;
+
+import io.quarkus.websockets.next.OnBinaryMessage;
+import io.quarkus.websockets.next.WebSocket;
+import io.smallrye.mutiny.Multi;
+
+@WebSocket(path = "/received-single-text-response-multi-text")
+public class SingleTextReceived_MultiTextResponse_Endpoint {
+
+ @OnBinaryMessage
+ public Multi onMessage(String message) {
+ return Multi.createFrom().items("Response 0: " + message, "Response 1: " + message);
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/SingleTextReceived_NoResponse_Endpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/SingleTextReceived_NoResponse_Endpoint.java
new file mode 100644
index 0000000000000..661fbf710af12
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/SingleTextReceived_NoResponse_Endpoint.java
@@ -0,0 +1,14 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onbinarymessage;
+
+import io.quarkus.websockets.next.OnBinaryMessage;
+import io.quarkus.websockets.next.WebSocket;
+
+@WebSocket(path = "/received-single-text-response-none")
+public class SingleTextReceived_NoResponse_Endpoint {
+
+ @OnBinaryMessage
+ public void onMessage(String message) {
+
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/SingleTextReceived_UniTextResponse_Endpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/SingleTextReceived_UniTextResponse_Endpoint.java
new file mode 100644
index 0000000000000..28f1a3354824f
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onbinarymessage/SingleTextReceived_UniTextResponse_Endpoint.java
@@ -0,0 +1,15 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onbinarymessage;
+
+import io.quarkus.websockets.next.OnBinaryMessage;
+import io.quarkus.websockets.next.WebSocket;
+import io.smallrye.mutiny.Uni;
+
+@WebSocket(path = "/received-single-text-response-uni-text")
+public class SingleTextReceived_UniTextResponse_Endpoint {
+
+ @OnBinaryMessage
+ public Uni onMessage(String message) {
+ return Uni.createFrom().item("Response 0: " + message);
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/Dto.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/Dto.java
new file mode 100644
index 0000000000000..0f1f22f218034
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/Dto.java
@@ -0,0 +1,5 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onerror;
+
+public record Dto(String property) {
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/DtoBinaryCodec.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/DtoBinaryCodec.java
new file mode 100644
index 0000000000000..a1aadb33dd685
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/DtoBinaryCodec.java
@@ -0,0 +1,30 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onerror;
+
+import java.lang.reflect.Type;
+
+import jakarta.annotation.Priority;
+import jakarta.inject.Singleton;
+
+import io.quarkus.websockets.next.BinaryMessageCodec;
+import io.vertx.core.buffer.Buffer;
+
+@Priority(15)
+@Singleton
+public class DtoBinaryCodec
+ implements BinaryMessageCodec {
+ @Override
+ public boolean supports(Type type) {
+ return type.equals(Dto.class);
+ }
+
+ @Override
+ public Buffer encode(Dto dto) {
+ return Buffer.buffer(dto.property());
+ }
+
+ @Override
+ public Dto decode(Type type, Buffer value) {
+ throw new RuntimeException("Expected exception during decoding");
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/DtoTextCodec.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/DtoTextCodec.java
new file mode 100644
index 0000000000000..0cf147961269a
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/DtoTextCodec.java
@@ -0,0 +1,27 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onerror;
+
+import java.lang.reflect.Type;
+
+import jakarta.annotation.Priority;
+import jakarta.inject.Singleton;
+
+import io.quarkus.websockets.next.TextMessageCodec;
+
+@Priority(15) // this must have higher priority than JsonCodec or tests will be flaky
+@Singleton
+public class DtoTextCodec implements TextMessageCodec {
+ @Override
+ public boolean supports(Type type) {
+ return type.equals(Dto.class);
+ }
+
+ @Override
+ public String encode(Dto dto) {
+ return dto.property();
+ }
+
+ @Override
+ public Dto decode(Type type, String value) {
+ throw new RuntimeException("Expected exception during decoding");
+ }
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/ErroneousClientEndpoint_NoOnError.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/ErroneousClientEndpoint_NoOnError.java
new file mode 100644
index 0000000000000..b05774316e930
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/ErroneousClientEndpoint_NoOnError.java
@@ -0,0 +1,15 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onerror;
+
+import io.quarkus.websockets.next.OnTextMessage;
+import io.quarkus.websockets.next.WebSocket;
+import io.smallrye.mutiny.Multi;
+
+@WebSocket(path = "/client-error-no-on-error")
+public class ErroneousClientEndpoint_NoOnError {
+
+ @OnTextMessage
+ public Multi onMessage(String message) {
+ return Multi.createFrom().items("Response 0: " + message, "Response 1: " + message);
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/ErroneousClientEndpoint_OverloadedOnError.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/ErroneousClientEndpoint_OverloadedOnError.java
new file mode 100644
index 0000000000000..bd4fbfa5c9208
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/ErroneousClientEndpoint_OverloadedOnError.java
@@ -0,0 +1,15 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onerror;
+
+import io.quarkus.websockets.next.OnTextMessage;
+import io.quarkus.websockets.next.WebSocket;
+import io.smallrye.mutiny.Multi;
+
+@WebSocket(path = "/client-error-overloaded-on-error")
+public class ErroneousClientEndpoint_OverloadedOnError {
+
+ @OnTextMessage
+ public Multi onMessage(String message) {
+ return Multi.createFrom().items("Response 0: " + message, "Response 1: " + message);
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/ErroneousClient_NoOnError.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/ErroneousClient_NoOnError.java
new file mode 100644
index 0000000000000..9ed6f0d365df1
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/ErroneousClient_NoOnError.java
@@ -0,0 +1,26 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onerror;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import io.quarkus.websockets.next.OnTextMessage;
+import io.quarkus.websockets.next.WebSocketClient;
+import io.smallrye.mutiny.Uni;
+
+@WebSocketClient(path = "/client-error-no-on-error")
+public class ErroneousClient_NoOnError {
+
+ public static List MESSAGES = new ArrayList<>();
+
+ @OnTextMessage
+ Uni onMessage(String message) {
+ synchronized (this) {
+ MESSAGES.add(message);
+ if (MESSAGES.size() == 4) {
+ return Uni.createFrom().failure(new RuntimeException("You asked for an error, you got the error!"));
+ }
+ return Uni.createFrom().voidItem();
+ }
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/ErroneousClient_OverloadedOnError.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/ErroneousClient_OverloadedOnError.java
new file mode 100644
index 0000000000000..2a7c4f99b6a6b
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/ErroneousClient_OverloadedOnError.java
@@ -0,0 +1,43 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onerror;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import io.quarkus.websockets.next.OnError;
+import io.quarkus.websockets.next.OnTextMessage;
+import io.quarkus.websockets.next.WebSocketClient;
+import io.smallrye.mutiny.Uni;
+
+@WebSocketClient(path = "/client-error-overloaded-on-error")
+public class ErroneousClient_OverloadedOnError {
+
+ public static CountDownLatch RUNTIME_EXCEPTION_LATCH = new CountDownLatch(1);
+ public static CountDownLatch ILLEGAL_STATE_EXCEPTION_LATCH = new CountDownLatch(1);
+ public static List MESSAGES = new ArrayList<>();
+
+ @OnTextMessage
+ Uni onMessage(String message) {
+ synchronized (this) {
+ MESSAGES.add(message);
+ if (MESSAGES.size() == 4) {
+ return Uni.createFrom().failure(new RuntimeException("Expected error - 4 items"));
+ }
+ if (MESSAGES.size() == 8) {
+ return Uni.createFrom().failure(new IllegalStateException("Expected error - 8 items"));
+ }
+ return Uni.createFrom().voidItem();
+ }
+ }
+
+ @OnError
+ public String onError(RuntimeException e) {
+ RUNTIME_EXCEPTION_LATCH.countDown();
+ return e.getMessage();
+ }
+
+ @OnError
+ public void onError(IllegalStateException e) {
+ ILLEGAL_STATE_EXCEPTION_LATCH.countDown();
+ }
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/ErroneousEndpoint_GlobalErrorHandler.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/ErroneousEndpoint_GlobalErrorHandler.java
new file mode 100644
index 0000000000000..fd53a20cf54fa
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/ErroneousEndpoint_GlobalErrorHandler.java
@@ -0,0 +1,19 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onerror;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.quarkus.websockets.next.OnTextMessage;
+import io.quarkus.websockets.next.WebSocket;
+import io.smallrye.mutiny.Uni;
+
+@WebSocket(path = "/server-error-global-handler")
+public class ErroneousEndpoint_GlobalErrorHandler {
+
+ private final AtomicInteger counter = new AtomicInteger();
+
+ @OnTextMessage
+ public Uni onMessage(String txt) {
+ return Uni.createFrom().failure(new IllegalArgumentException("Response " + counter.getAndIncrement() + ": " + txt));
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/ErroneousServerEndpoint_NoOnError.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/ErroneousServerEndpoint_NoOnError.java
new file mode 100644
index 0000000000000..700732956c6f5
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/ErroneousServerEndpoint_NoOnError.java
@@ -0,0 +1,16 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onerror;
+
+import io.quarkus.websockets.next.OnTextMessage;
+import io.quarkus.websockets.next.WebSocket;
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.Uni;
+
+@WebSocket(path = "/server-error-no-on-error")
+public class ErroneousServerEndpoint_NoOnError {
+
+ @OnTextMessage
+ public Uni onMessage(Multi dto) {
+ return dto.toUni();
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/ErroneousServerEndpoint_OnClose.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/ErroneousServerEndpoint_OnClose.java
new file mode 100644
index 0000000000000..193051558a100
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/ErroneousServerEndpoint_OnClose.java
@@ -0,0 +1,30 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onerror;
+
+import java.util.concurrent.CountDownLatch;
+
+import io.quarkus.websockets.next.OnBinaryMessage;
+import io.quarkus.websockets.next.OnClose;
+import io.quarkus.websockets.next.OnError;
+import io.quarkus.websockets.next.WebSocket;
+import io.smallrye.mutiny.Multi;
+
+@WebSocket(path = "/server-error-on-close")
+public class ErroneousServerEndpoint_OnClose {
+
+ public static CountDownLatch ILLEGAL_STATE_EXCEPTION_LATCH = new CountDownLatch(1);
+
+ @OnBinaryMessage
+ public Multi onMessage(String message) {
+ return Multi.createFrom().items("Bobby");
+ }
+
+ @OnClose
+ public void onClose() {
+ throw new IllegalStateException("Expected exception");
+ }
+
+ @OnError
+ public void onError(IllegalStateException e) {
+ ILLEGAL_STATE_EXCEPTION_LATCH.countDown();
+ }
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/ErroneousServerEndpoint_OnOpen.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/ErroneousServerEndpoint_OnOpen.java
new file mode 100644
index 0000000000000..685db5069f47d
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/ErroneousServerEndpoint_OnOpen.java
@@ -0,0 +1,20 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onerror;
+
+import io.quarkus.websockets.next.OnOpen;
+import io.quarkus.websockets.next.OnTextMessage;
+import io.quarkus.websockets.next.WebSocket;
+import io.smallrye.mutiny.Uni;
+
+@WebSocket(path = "/server-error-on-open")
+public class ErroneousServerEndpoint_OnOpen {
+
+ @OnOpen
+ public Uni onOpen() {
+ return Uni.createFrom().failure(new IllegalStateException("Expected failure"));
+ }
+
+ @OnTextMessage
+ public void onMessage(String message) {
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/ErroneousServerEndpoint_OverriddenOnError.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/ErroneousServerEndpoint_OverriddenOnError.java
new file mode 100644
index 0000000000000..2dbc194f31639
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/ErroneousServerEndpoint_OverriddenOnError.java
@@ -0,0 +1,25 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onerror;
+
+import java.util.concurrent.CountDownLatch;
+
+import io.quarkus.websockets.next.OnBinaryMessage;
+import io.quarkus.websockets.next.OnError;
+import io.quarkus.websockets.next.WebSocket;
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.Uni;
+
+@WebSocket(path = "/server-error-overridden-on-error")
+public class ErroneousServerEndpoint_OverriddenOnError {
+
+ public static CountDownLatch RUNTIME_EXCEPTION_LATCH = new CountDownLatch(1);
+
+ @OnBinaryMessage
+ public Uni onMessage(Multi dto) {
+ return dto.toUni();
+ }
+
+ @OnError
+ public void onError(RuntimeException e) {
+ RUNTIME_EXCEPTION_LATCH.countDown();
+ }
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/GlobalErrorHandler.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/GlobalErrorHandler.java
new file mode 100644
index 0000000000000..424b1b9d34b04
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/onerror/GlobalErrorHandler.java
@@ -0,0 +1,22 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.onerror;
+
+import java.util.concurrent.CountDownLatch;
+
+import jakarta.enterprise.context.ApplicationScoped;
+
+import io.quarkus.arc.Unremovable;
+import io.quarkus.websockets.next.OnError;
+
+@Unremovable
+@ApplicationScoped
+public class GlobalErrorHandler {
+
+ public static final CountDownLatch ILLEGAL_ARGUMENT_EXCEPTION_LATCH = new CountDownLatch(1);
+
+ @OnError
+ public String onError(IllegalArgumentException e) {
+ ILLEGAL_ARGUMENT_EXCEPTION_LATCH.countDown();
+ return e.getMessage();
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/BounceClient.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/BounceClient.java
new file mode 100644
index 0000000000000..aba3c46ab0d79
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/BounceClient.java
@@ -0,0 +1,27 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+
+import io.quarkus.websockets.next.OnClose;
+import io.quarkus.websockets.next.OnTextMessage;
+import io.quarkus.websockets.next.WebSocketClient;
+
+@WebSocketClient(path = "/bounce", clientId = "bounce-client-id")
+public class BounceClient {
+
+ public static List MESSAGES = new CopyOnWriteArrayList<>();
+ public static CountDownLatch CLOSED_LATCH = new CountDownLatch(1);
+
+ @OnTextMessage
+ void echo(String message) {
+ MESSAGES.add(message);
+ }
+
+ @OnClose
+ void onClose() {
+ CLOSED_LATCH.countDown();
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/BounceEndpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/BounceEndpoint.java
new file mode 100644
index 0000000000000..dd4b0b0b836ce
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/BounceEndpoint.java
@@ -0,0 +1,49 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+
+import io.quarkus.websockets.next.OnClose;
+import io.quarkus.websockets.next.OnOpen;
+import io.quarkus.websockets.next.OnTextMessage;
+import io.quarkus.websockets.next.WebSocket;
+import io.quarkus.websockets.next.WebSocketConnection;
+
+@WebSocket(path = "/bounce", endpointId = "bounce-server-endpoint-id")
+public class BounceEndpoint {
+
+ public static final List MESSAGES = new CopyOnWriteArrayList<>();
+ public static CountDownLatch CLOSED_LATCH = new CountDownLatch(1);
+ public static volatile String connectionId = null;
+ public static volatile String endpointId = null;
+
+ @ConfigProperty(name = "bounce-endpoint.prefix-responses", defaultValue = "false")
+ boolean prefixResponses;
+
+ @OnTextMessage
+ public String onMessage(String message) {
+ if (prefixResponses) {
+ message = "Response 0: " + message;
+ }
+ MESSAGES.add(message);
+ if (message.equals("throw-exception")) {
+ throw new RuntimeException("Failing 'onMessage' to test behavior when an exception was thrown");
+ }
+ return message;
+ }
+
+ @OnOpen
+ void open(WebSocketConnection connection) {
+ connectionId = connection.id();
+ endpointId = connection.endpointId();
+ }
+
+ @OnClose
+ void onClose() {
+ CLOSED_LATCH.countDown();
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/BroadcastingEndpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/BroadcastingEndpoint.java
new file mode 100644
index 0000000000000..a389cbf6ee989
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/BroadcastingEndpoint.java
@@ -0,0 +1,14 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage;
+
+import io.quarkus.websockets.next.OnTextMessage;
+import io.quarkus.websockets.next.WebSocket;
+
+@WebSocket(path = "/broadcast")
+public class BroadcastingEndpoint {
+
+ @OnTextMessage(broadcast = true)
+ public String onMessage(String message) {
+ return "Response 0: " + message;
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/ClientEndpointWithPathParams.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/ClientEndpointWithPathParams.java
new file mode 100644
index 0000000000000..8957a62ee9149
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/ClientEndpointWithPathParams.java
@@ -0,0 +1,13 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage;
+
+import io.quarkus.websockets.next.OnTextMessage;
+import io.quarkus.websockets.next.WebSocketClient;
+
+@WebSocketClient(path = "/client-endpoint-with-path-param/{name}")
+public class ClientEndpointWithPathParams {
+
+ @OnTextMessage
+ public void onTextMessage(String message) {
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/Dto.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/Dto.java
new file mode 100644
index 0000000000000..585bcb0c950b1
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/Dto.java
@@ -0,0 +1,5 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage;
+
+public record Dto(String property) {
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/DtoTextCodec.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/DtoTextCodec.java
new file mode 100644
index 0000000000000..de981e77efccb
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/DtoTextCodec.java
@@ -0,0 +1,27 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage;
+
+import java.lang.reflect.Type;
+
+import jakarta.annotation.Priority;
+import jakarta.inject.Singleton;
+
+import io.quarkus.websockets.next.TextMessageCodec;
+
+@Priority(15) // this must have higher priority than JsonCodec or tests will be flaky
+@Singleton
+public class DtoTextCodec implements TextMessageCodec {
+ @Override
+ public boolean supports(Type type) {
+ return type.equals(Dto.class);
+ }
+
+ @Override
+ public String encode(Dto dto) {
+ return dto.property();
+ }
+
+ @Override
+ public Dto decode(Type type, String value) {
+ return new Dto(value);
+ }
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/MultiClient.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/MultiClient.java
new file mode 100644
index 0000000000000..4e203bcb78e54
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/MultiClient.java
@@ -0,0 +1,15 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage;
+
+import io.quarkus.websockets.next.OnTextMessage;
+import io.quarkus.websockets.next.WebSocketClient;
+import io.smallrye.mutiny.Multi;
+
+@WebSocketClient(path = "/multi")
+public class MultiClient {
+
+ @OnTextMessage
+ Multi echo(Multi messages) {
+ return messages.map(msg -> "Response 0: " + msg);
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/MultiDtoReceived_MultiDtoResponse_Endpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/MultiDtoReceived_MultiDtoResponse_Endpoint.java
new file mode 100644
index 0000000000000..825f305b4e2b4
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/MultiDtoReceived_MultiDtoResponse_Endpoint.java
@@ -0,0 +1,18 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage;
+
+import io.quarkus.websockets.next.OnTextMessage;
+import io.quarkus.websockets.next.WebSocket;
+import io.smallrye.mutiny.Multi;
+
+@WebSocket(path = "/received-multi-dto-response-multi-dto")
+public class MultiDtoReceived_MultiDtoResponse_Endpoint {
+
+ @OnTextMessage
+ public Multi onMessage(Multi messages) {
+ return messages
+ .map(Dto::property)
+ .flatMap(msg -> Multi.createFrom().items("Response 0: " + msg, "Response 1: " + msg))
+ .map(Dto::new);
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/MultiDtoReceived_NoResponse_Endpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/MultiDtoReceived_NoResponse_Endpoint.java
new file mode 100644
index 0000000000000..9072755e03cf4
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/MultiDtoReceived_NoResponse_Endpoint.java
@@ -0,0 +1,15 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage;
+
+import io.quarkus.websockets.next.OnTextMessage;
+import io.quarkus.websockets.next.WebSocket;
+import io.smallrye.mutiny.Multi;
+
+@WebSocket(path = "/received-multi-dto-response-none")
+public class MultiDtoReceived_NoResponse_Endpoint {
+
+ @OnTextMessage
+ public void onMessage(Multi dto) {
+
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/MultiDtoReceived_SingleDtoResponse_Endpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/MultiDtoReceived_SingleDtoResponse_Endpoint.java
new file mode 100644
index 0000000000000..a035dd916e7c4
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/MultiDtoReceived_SingleDtoResponse_Endpoint.java
@@ -0,0 +1,15 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage;
+
+import io.quarkus.websockets.next.OnTextMessage;
+import io.quarkus.websockets.next.WebSocket;
+import io.smallrye.mutiny.Multi;
+
+@WebSocket(path = "/received-multi-dto-response-single-dto")
+public class MultiDtoReceived_SingleDtoResponse_Endpoint {
+
+ @OnTextMessage
+ public String onMessage(Multi message) {
+ return "ut labore et dolore magna aliqua";
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/MultiEndpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/MultiEndpoint.java
new file mode 100644
index 0000000000000..8163cae5ae117
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/MultiEndpoint.java
@@ -0,0 +1,15 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage;
+
+import io.quarkus.websockets.next.OnTextMessage;
+import io.quarkus.websockets.next.WebSocket;
+import io.smallrye.mutiny.Multi;
+
+@WebSocket(path = "/multi")
+public class MultiEndpoint {
+
+ @OnTextMessage
+ Multi echo(Multi messages) {
+ return messages.filter(msg -> !msg.startsWith("Response 0: "));
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/MultiTextReceived_MultiTextResponse_Endpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/MultiTextReceived_MultiTextResponse_Endpoint.java
new file mode 100644
index 0000000000000..12429f2c11750
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/MultiTextReceived_MultiTextResponse_Endpoint.java
@@ -0,0 +1,15 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage;
+
+import io.quarkus.websockets.next.OnTextMessage;
+import io.quarkus.websockets.next.WebSocket;
+import io.smallrye.mutiny.Multi;
+
+@WebSocket(path = "/received-multi-text-response-multi-text")
+public class MultiTextReceived_MultiTextResponse_Endpoint {
+
+ @OnTextMessage
+ public Multi onMessage(Multi messages) {
+ return messages.flatMap(msg -> Multi.createFrom().items("Response 0: " + msg, "Response 1: " + msg));
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/MultiTextReceived_NoResponse_Endpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/MultiTextReceived_NoResponse_Endpoint.java
new file mode 100644
index 0000000000000..c7b5e3e8b14b2
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/MultiTextReceived_NoResponse_Endpoint.java
@@ -0,0 +1,15 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage;
+
+import io.quarkus.websockets.next.OnTextMessage;
+import io.quarkus.websockets.next.WebSocket;
+import io.smallrye.mutiny.Multi;
+
+@WebSocket(path = "/received-multi-text-response-none")
+public class MultiTextReceived_NoResponse_Endpoint {
+
+ @OnTextMessage
+ public void onMessage(Multi message) {
+
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/MultiTextReceived_SingleTextResponse_Endpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/MultiTextReceived_SingleTextResponse_Endpoint.java
new file mode 100644
index 0000000000000..7a8f93d65a849
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/MultiTextReceived_SingleTextResponse_Endpoint.java
@@ -0,0 +1,15 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage;
+
+import io.quarkus.websockets.next.OnTextMessage;
+import io.quarkus.websockets.next.WebSocket;
+import io.smallrye.mutiny.Multi;
+
+@WebSocket(path = "/received-multi-text-response-single-text")
+public class MultiTextReceived_SingleTextResponse_Endpoint {
+
+ @OnTextMessage
+ public String onMessage(Multi message) {
+ return "Alpha Shallows";
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/PingEndpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/PingEndpoint.java
new file mode 100644
index 0000000000000..d09ab082348fc
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/PingEndpoint.java
@@ -0,0 +1,15 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage;
+
+import io.quarkus.websockets.next.OnOpen;
+import io.quarkus.websockets.next.PathParam;
+import io.quarkus.websockets.next.WebSocket;
+
+@WebSocket(path = "/ping/{one}/and/{two}")
+public class PingEndpoint {
+
+ @OnOpen
+ String process(@PathParam String one, @PathParam String two) {
+ return one + ":" + two;
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/ServerEndpointWithPathParams.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/ServerEndpointWithPathParams.java
new file mode 100644
index 0000000000000..a44abd4b8f8b3
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/ServerEndpointWithPathParams.java
@@ -0,0 +1,15 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage;
+
+import io.quarkus.websockets.next.OnTextMessage;
+import io.quarkus.websockets.next.WebSocket;
+import io.smallrye.mutiny.Uni;
+
+@WebSocket(path = "/client-endpoint-with-path-param/{name}")
+public class ServerEndpointWithPathParams {
+
+ @OnTextMessage
+ public Uni onMessage(String message) {
+ return Uni.createFrom().item("Response 0: " + message);
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/SingleDtoReceived_MultiDtoResponse_Endpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/SingleDtoReceived_MultiDtoResponse_Endpoint.java
new file mode 100644
index 0000000000000..457f1ec12f63c
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/SingleDtoReceived_MultiDtoResponse_Endpoint.java
@@ -0,0 +1,15 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage;
+
+import io.quarkus.websockets.next.OnTextMessage;
+import io.quarkus.websockets.next.WebSocket;
+import io.smallrye.mutiny.Multi;
+
+@WebSocket(path = "/received-single-dto-response-multi-dto")
+public class SingleDtoReceived_MultiDtoResponse_Endpoint {
+
+ @OnTextMessage
+ public Multi onMessage(Dto dto) {
+ return Multi.createFrom().items("Response 0: " + dto.property(), "Response 1: " + dto.property()).map(Dto::new);
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/SingleDtoReceived_NoResponse_Endpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/SingleDtoReceived_NoResponse_Endpoint.java
new file mode 100644
index 0000000000000..c7d02acd71699
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/SingleDtoReceived_NoResponse_Endpoint.java
@@ -0,0 +1,13 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage;
+
+import io.quarkus.websockets.next.OnTextMessage;
+import io.quarkus.websockets.next.WebSocket;
+
+@WebSocket(path = "/received-single-dto-response-none")
+public class SingleDtoReceived_NoResponse_Endpoint {
+
+ @OnTextMessage
+ public void onMessage(Dto dto) {
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/SingleDtoReceived_SingleDtoResponse_Endpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/SingleDtoReceived_SingleDtoResponse_Endpoint.java
new file mode 100644
index 0000000000000..80a5b2281bc4a
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/SingleDtoReceived_SingleDtoResponse_Endpoint.java
@@ -0,0 +1,14 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage;
+
+import io.quarkus.websockets.next.OnTextMessage;
+import io.quarkus.websockets.next.WebSocket;
+
+@WebSocket(path = "/received-single-dto-response-single-dto")
+public class SingleDtoReceived_SingleDtoResponse_Endpoint {
+
+ @OnTextMessage
+ public Dto onMessage(Dto dto) {
+ return new Dto("Response 0: " + dto.property());
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/SingleDtoReceived_UniDtoResponse_Endpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/SingleDtoReceived_UniDtoResponse_Endpoint.java
new file mode 100644
index 0000000000000..19bd204ac2cf4
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/SingleDtoReceived_UniDtoResponse_Endpoint.java
@@ -0,0 +1,15 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage;
+
+import io.quarkus.websockets.next.OnTextMessage;
+import io.quarkus.websockets.next.WebSocket;
+import io.smallrye.mutiny.Uni;
+
+@WebSocket(path = "/received-single-dto-response-uni-dto")
+public class SingleDtoReceived_UniDtoResponse_Endpoint {
+
+ @OnTextMessage
+ public Uni onMessage(Dto dto) {
+ return Uni.createFrom().item("Response 0: " + dto.property()).map(Dto::new);
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/SingleTextReceived_MultiTextResponse_Endpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/SingleTextReceived_MultiTextResponse_Endpoint.java
new file mode 100644
index 0000000000000..66fcf512c35f9
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/SingleTextReceived_MultiTextResponse_Endpoint.java
@@ -0,0 +1,15 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage;
+
+import io.quarkus.websockets.next.OnTextMessage;
+import io.quarkus.websockets.next.WebSocket;
+import io.smallrye.mutiny.Multi;
+
+@WebSocket(path = "/received-single-text-response-multi-text")
+public class SingleTextReceived_MultiTextResponse_Endpoint {
+
+ @OnTextMessage
+ public Multi onMessage(String message) {
+ return Multi.createFrom().items("Response 0: " + message, "Response 1: " + message);
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/SingleTextReceived_NoResponse_Endpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/SingleTextReceived_NoResponse_Endpoint.java
new file mode 100644
index 0000000000000..c064f11eaab6e
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/SingleTextReceived_NoResponse_Endpoint.java
@@ -0,0 +1,14 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage;
+
+import io.quarkus.websockets.next.OnTextMessage;
+import io.quarkus.websockets.next.WebSocket;
+
+@WebSocket(path = "/received-single-text-response-none")
+public class SingleTextReceived_NoResponse_Endpoint {
+
+ @OnTextMessage
+ public void onMessage(String message) {
+
+ }
+
+}
diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/SingleTextReceived_UniTextResponse_Endpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/SingleTextReceived_UniTextResponse_Endpoint.java
new file mode 100644
index 0000000000000..31df2589e1810
--- /dev/null
+++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/SingleTextReceived_UniTextResponse_Endpoint.java
@@ -0,0 +1,15 @@
+package io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage;
+
+import io.quarkus.websockets.next.OnTextMessage;
+import io.quarkus.websockets.next.WebSocket;
+import io.smallrye.mutiny.Uni;
+
+@WebSocket(path = "/received-single-text-response-uni-text")
+public class SingleTextReceived_UniTextResponse_Endpoint {
+
+ @OnTextMessage
+ public Uni onMessage(String message) {
+ return Uni.createFrom().item("Response 0: " + message);
+ }
+
+}
diff --git a/extensions/websockets-next/runtime/pom.xml b/extensions/websockets-next/runtime/pom.xml
index de16a0cfaacaa..7c23fc0cc79d5 100644
--- a/extensions/websockets-next/runtime/pom.xml
+++ b/extensions/websockets-next/runtime/pom.xml
@@ -43,6 +43,18 @@
io.quarkus.security
quarkus-security
+
+
+ io.opentelemetry
+ opentelemetry-api
+ true
+
+
+
+ io.micrometer
+ micrometer-core
+ true
+
org.junit.jupiter
diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsRuntimeConfig.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsRuntimeConfig.java
new file mode 100644
index 0000000000000..642ea24ff43d4
--- /dev/null
+++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsRuntimeConfig.java
@@ -0,0 +1,29 @@
+package io.quarkus.websockets.next;
+
+import io.quarkus.runtime.annotations.ConfigPhase;
+import io.quarkus.runtime.annotations.ConfigRoot;
+import io.smallrye.config.ConfigMapping;
+import io.smallrye.config.WithDefault;
+import io.smallrye.config.WithName;
+
+@ConfigMapping(prefix = "quarkus.websockets-next")
+@ConfigRoot(phase = ConfigPhase.RUN_TIME)
+public interface WebSocketsRuntimeConfig {
+
+ /**
+ * If collection of WebSocket traces is enabled.
+ * Only applicable when the OpenTelemetry extension is present.
+ */
+ @WithName("tracing.enabled")
+ @WithDefault("true")
+ boolean tracingEnabled();
+
+ /**
+ * If collection of WebSocket metrics is enabled.
+ * Only applicable when the Micrometer extension is present.
+ */
+ @WithName("metrics.enabled")
+ @WithDefault("true")
+ boolean metricsEnabled();
+
+}
diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/BasicWebSocketConnectorImpl.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/BasicWebSocketConnectorImpl.java
index 6442502058725..3d753534c5b8e 100644
--- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/BasicWebSocketConnectorImpl.java
+++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/BasicWebSocketConnectorImpl.java
@@ -21,6 +21,7 @@
import io.quarkus.websockets.next.WebSocketClientConnection;
import io.quarkus.websockets.next.WebSocketClientException;
import io.quarkus.websockets.next.WebSocketsClientRuntimeConfig;
+import io.quarkus.websockets.next.runtime.telemetry.TelemetrySupport;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;
import io.vertx.core.Handler;
@@ -36,6 +37,8 @@ public class BasicWebSocketConnectorImpl extends WebSocketConnectorBase errorHandler;
BasicWebSocketConnectorImpl(Vertx vertx, Codecs codecs, ClientConnectionManager connectionManager,
- WebSocketsClientRuntimeConfig config, TlsConfigurationRegistry tlsConfigurationRegistry) {
+ WebSocketsClientRuntimeConfig config, TlsConfigurationRegistry tlsConfigurationRegistry,
+ TelemetrySupport telemetrySupport) {
super(vertx, codecs, connectionManager, config, tlsConfigurationRegistry);
+ this.telemetrySupport = telemetrySupport;
}
@Override
@@ -144,11 +149,13 @@ public Uni connect() {
.map(ws -> {
String clientId = BasicWebSocketConnector.class.getName();
TrafficLogger trafficLogger = TrafficLogger.forClient(config);
+ var sendingInterceptor = telemetrySupport.createClientEndpointTelemetrySupport(path)
+ .getSendingInterceptor();
WebSocketClientConnectionImpl connection = new WebSocketClientConnectionImpl(clientId, ws,
codecs,
pathParams,
serverEndpointUri,
- headers, trafficLogger);
+ headers, trafficLogger, sendingInterceptor);
if (trafficLogger != null) {
trafficLogger.connectionOpened(connection);
}
diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/Endpoints.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/Endpoints.java
index 12a2b327fa6b1..06f4fe0743bc2 100644
--- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/Endpoints.java
+++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/Endpoints.java
@@ -17,6 +17,8 @@
import io.quarkus.websockets.next.UnhandledFailureStrategy;
import io.quarkus.websockets.next.WebSocketException;
import io.quarkus.websockets.next.runtime.WebSocketSessionContext.SessionContextState;
+import io.quarkus.websockets.next.runtime.telemetry.EndpointTelemetrySupport;
+import io.quarkus.websockets.next.runtime.telemetry.ErrorInterceptor;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor;
import io.vertx.core.Context;
@@ -32,7 +34,7 @@ class Endpoints {
static void initialize(Vertx vertx, ArcContainer container, Codecs codecs, WebSocketConnectionBase connection,
WebSocketBase ws, String generatedEndpointClass, Optional autoPingInterval,
SecuritySupport securitySupport, UnhandledFailureStrategy unhandledFailureStrategy, TrafficLogger trafficLogger,
- Runnable onClose) {
+ Runnable onClose, EndpointTelemetrySupport endpointTelemetrySupport) {
Context context = vertx.getOrCreateContext();
@@ -45,8 +47,8 @@ static void initialize(Vertx vertx, ArcContainer container, Codecs codecs, WebSo
container.requestContext());
// Create an endpoint that delegates callbacks to the endpoint bean
- WebSocketEndpoint endpoint = createEndpoint(generatedEndpointClass, context, connection, codecs, contextSupport,
- securitySupport);
+ WebSocketEndpoint endpoint = createEndpoint(generatedEndpointClass, connection, codecs, contextSupport,
+ securitySupport, endpointTelemetrySupport);
// A broadcast processor is only needed if Multi is consumed by the callback
BroadcastProcessor