From 8023609f5b88eacafd5f67bc891cdef481d3fba1 Mon Sep 17 00:00:00 2001 From: Martin Kouba Date: Tue, 23 Apr 2024 10:58:27 +0200 Subject: [PATCH] WebSockets Next: send ping message from the server automatically - an opt-in config property is used to set the interval - resolves #39862 --- .../deployment/WebSocketServerProcessor.java | 2 +- .../test/pingpong/AutoPingIntervalTest.java | 77 +++++++++++++++++++ .../next/WebSocketsRuntimeConfig.java | 6 ++ .../next/runtime/WebSocketConnectionImpl.java | 12 +++ .../next/runtime/WebSocketServerRecorder.java | 23 +++++- 5 files changed, 116 insertions(+), 4 deletions(-) create mode 100644 extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/pingpong/AutoPingIntervalTest.java diff --git a/extensions/websockets-next/server/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketServerProcessor.java b/extensions/websockets-next/server/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketServerProcessor.java index de0a834ab33a9b..7a7c8f536d1d94 100644 --- a/extensions/websockets-next/server/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketServerProcessor.java +++ b/extensions/websockets-next/server/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketServerProcessor.java @@ -383,7 +383,7 @@ private void validateOnPongMessage(Callback callback) { "@OnPongMessage callback must return void or Uni: " + callbackToString(callback.method)); } Type messageType = callback.argumentType(MessageCallbackArgument::isMessage); - if (!messageType.name().equals(WebSocketDotNames.BUFFER)) { + if (messageType == null || !messageType.name().equals(WebSocketDotNames.BUFFER)) { throw new WebSocketServerException( "@OnPongMessage callback must accept exactly one message parameter of type io.vertx.core.buffer.Buffer: " + callbackToString(callback.method)); diff --git a/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/pingpong/AutoPingIntervalTest.java b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/pingpong/AutoPingIntervalTest.java new file mode 100644 index 00000000000000..dbeaac143fb49a --- /dev/null +++ b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/pingpong/AutoPingIntervalTest.java @@ -0,0 +1,77 @@ +package io.quarkus.websockets.next.test.pingpong; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.websockets.next.OnOpen; +import io.quarkus.websockets.next.OnPongMessage; +import io.quarkus.websockets.next.WebSocket; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.WebSocketClient; + +public class AutoPingIntervalTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> { + root.addClasses(Endpoint.class); + }).overrideConfigKey("quarkus.websockets-next.auto-ping-interval", "200"); + + @Inject + Vertx vertx; + + @TestHTTPResource("end") + URI endUri; + + @Test + public void testPingPong() throws InterruptedException, ExecutionException { + WebSocketClient client = vertx.createWebSocketClient(); + try { + CountDownLatch connectedLatch = new CountDownLatch(1); + client + .connect(endUri.getPort(), endUri.getHost(), endUri.getPath()) + .onComplete(r -> { + if (r.succeeded()) { + connectedLatch.countDown(); + } else { + throw new IllegalStateException(r.cause()); + } + }); + assertTrue(connectedLatch.await(5, TimeUnit.SECONDS)); + // The pong message should be sent by the client automatically and should be identical to the ping message + assertTrue(Endpoint.PONG.await(5, TimeUnit.SECONDS)); + } finally { + client.close().toCompletionStage().toCompletableFuture().get(); + } + } + + @WebSocket(path = "/end") + public static class Endpoint { + + static final CountDownLatch PONG = new CountDownLatch(3); + + @OnOpen + public String open() { + return "ok"; + } + + @OnPongMessage + void pong(Buffer data) { + PONG.countDown(); + } + + } + +} diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsRuntimeConfig.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsRuntimeConfig.java index e1c76dc33dde36..898aca3b7325cd 100644 --- a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsRuntimeConfig.java +++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsRuntimeConfig.java @@ -3,6 +3,7 @@ import java.util.List; import java.util.Optional; import java.util.OptionalInt; +import java.util.OptionalLong; import io.quarkus.runtime.annotations.ConfigPhase; import io.quarkus.runtime.annotations.ConfigRoot; @@ -39,4 +40,9 @@ public interface WebSocketsRuntimeConfig { */ OptionalInt maxMessageSize(); + /** + * The interval in milliseconds after which the server sends a ping message to a connected client automatically. + */ + OptionalLong autoPingInterval(); + } diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionImpl.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionImpl.java index 8b7eda81b1461b..2073d18ae561b7 100644 --- a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionImpl.java +++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionImpl.java @@ -13,6 +13,8 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import org.jboss.logging.Logger; + import io.quarkus.vertx.core.runtime.VertxBufferImpl; import io.quarkus.websockets.next.WebSocketConnection; import io.smallrye.mutiny.Uni; @@ -26,6 +28,8 @@ class WebSocketConnectionImpl implements WebSocketConnection { + private static final Logger LOG = Logger.getLogger(WebSocketConnectionImpl.class); + private final String generatedEndpointClass; private final String endpointId; @@ -106,6 +110,14 @@ public Uni sendPing(Buffer data) { return UniHelper.toUni(webSocket.writePing(data)); } + void sendAutoPing() { + webSocket.writePing(Buffer.buffer("quark")).onComplete(r -> { + if (r.failed()) { + LOG.errorf(r.cause(), "Unable to send auto-ping: %s", this); + } + }); + } + @Override public Uni sendPong(Buffer data) { return UniHelper.toUni(webSocket.writePong(data)); diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java index c53d15645b01dd..4fb2598b8b1d08 100644 --- a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java +++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java @@ -67,12 +67,13 @@ public Handler createEndpointHandler(String generatedEndpointCla public void handle(RoutingContext ctx) { Future future = ctx.request().toWebSocket(); future.onSuccess(ws -> { - Context context = VertxCoreRecorder.getVertx().get().getOrCreateContext(); + Vertx vertx = VertxCoreRecorder.getVertx().get(); + Context context = vertx.getOrCreateContext(); - WebSocketConnection connection = new WebSocketConnectionImpl(generatedEndpointClass, endpointId, ws, + WebSocketConnectionImpl connection = new WebSocketConnectionImpl(generatedEndpointClass, endpointId, ws, connectionManager, codecs, ctx); connectionManager.add(generatedEndpointClass, connection); - LOG.debugf("Connnected: %s", connection); + LOG.debugf("Connection created: %s", connection); // Initialize and capture the session context state that will be activated // during message processing @@ -216,6 +217,18 @@ public void handle(Void event) { }); }); + Long timerId; + if (config.autoPingInterval().isPresent()) { + timerId = vertx.setPeriodic(config.autoPingInterval().getAsLong(), new Handler() { + @Override + public void handle(Long timerId) { + connection.sendAutoPing(); + } + }); + } else { + timerId = null; + } + ws.closeHandler(new Handler() { @Override public void handle(Void event) { @@ -229,6 +242,9 @@ public void handle(Void event) { LOG.errorf(r.cause(), "Unable to complete @OnClose callback: %s", connection); } connectionManager.remove(generatedEndpointClass, connection); + if (timerId != null) { + vertx.cancelTimer(timerId); + } }); } }); @@ -249,6 +265,7 @@ public void handle(Void event) { }); } }); + }); } };