Skip to content

Commit

Permalink
WebSockets Next: send ping message from the server automatically
Browse files Browse the repository at this point in the history
- an opt-in config property is used to set the interval
- resolves quarkusio#39862
  • Loading branch information
mkouba committed Apr 23, 2024
1 parent 824234e commit 9821b4f
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ private void validateOnPongMessage(Callback callback) {
"@OnPongMessage callback must return void or Uni<Void>: " + callbackToString(callback.method));
}
Type messageType = callback.argumentType(MessageCallbackArgument::isMessage);
if (!messageType.name().equals(WebSocketDotNames.BUFFER)) {
if (messageType == null || !messageType.name().equals(WebSocketDotNames.BUFFER)) {
throw new WebSocketServerException(
"@OnPongMessage callback must accept exactly one message parameter of type io.vertx.core.buffer.Buffer: "
+ callbackToString(callback.method));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package io.quarkus.websockets.next.test.pingpong;

import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.OnPongMessage;
import io.quarkus.websockets.next.WebSocket;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.WebSocketClient;

public class AutoPingIntervalTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(Endpoint.class);
}).overrideConfigKey("quarkus.websockets-next.auto-ping-interval", "200");

@Inject
Vertx vertx;

@TestHTTPResource("end")
URI endUri;

@Test
public void testPingPong() throws InterruptedException, ExecutionException {
WebSocketClient client = vertx.createWebSocketClient();
try {
CountDownLatch connectedLatch = new CountDownLatch(1);
client
.connect(endUri.getPort(), endUri.getHost(), endUri.getPath())
.onComplete(r -> {
if (r.succeeded()) {
connectedLatch.countDown();
} else {
throw new IllegalStateException(r.cause());
}
});
assertTrue(connectedLatch.await(5, TimeUnit.SECONDS));
// The pong message should be sent by the client automatically and should be identical to the ping message
assertTrue(Endpoint.PONG.await(5, TimeUnit.SECONDS));
} finally {
client.close().toCompletionStage().toCompletableFuture().get();
}
}

@WebSocket(path = "/end")
public static class Endpoint {

static final CountDownLatch PONG = new CountDownLatch(3);

@OnOpen
public String open() {
return "ok";
}

@OnPongMessage
void pong(Buffer data) {
PONG.countDown();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;

import io.quarkus.runtime.annotations.ConfigPhase;
import io.quarkus.runtime.annotations.ConfigRoot;
Expand Down Expand Up @@ -39,4 +40,9 @@ public interface WebSocketsRuntimeConfig {
*/
OptionalInt maxMessageSize();

/**
* The interval in milliseconds after which the server sends a ping message to a connected client automatically.
*/
OptionalLong autoPingInterval();

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.jboss.logging.Logger;

import io.quarkus.vertx.core.runtime.VertxBufferImpl;
import io.quarkus.websockets.next.WebSocketConnection;
import io.smallrye.mutiny.Uni;
Expand All @@ -26,6 +28,8 @@

class WebSocketConnectionImpl implements WebSocketConnection {

private static final Logger LOG = Logger.getLogger(WebSocketConnectionImpl.class);

private final String generatedEndpointClass;

private final String endpointId;
Expand Down Expand Up @@ -106,6 +110,14 @@ public Uni<Void> sendPing(Buffer data) {
return UniHelper.toUni(webSocket.writePing(data));
}

void sendAutoPing() {
webSocket.writePing(Buffer.buffer("ping")).onComplete(r -> {
if (r.failed()) {
LOG.errorf(r.cause(), "Unable to send auto-ping: %s", this);
}
});
}

@Override
public Uni<Void> sendPong(Buffer data) {
return UniHelper.toUni(webSocket.writePong(data));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,13 @@ public Handler<RoutingContext> createEndpointHandler(String generatedEndpointCla
public void handle(RoutingContext ctx) {
Future<ServerWebSocket> future = ctx.request().toWebSocket();
future.onSuccess(ws -> {
Context context = VertxCoreRecorder.getVertx().get().getOrCreateContext();
Vertx vertx = VertxCoreRecorder.getVertx().get();
Context context = vertx.getOrCreateContext();

WebSocketConnection connection = new WebSocketConnectionImpl(generatedEndpointClass, endpointId, ws,
WebSocketConnectionImpl connection = new WebSocketConnectionImpl(generatedEndpointClass, endpointId, ws,
connectionManager, codecs, ctx);
connectionManager.add(generatedEndpointClass, connection);
LOG.debugf("Connnected: %s", connection);
LOG.debugf("Connection created: %s", connection);

// Initialize and capture the session context state that will be activated
// during message processing
Expand Down Expand Up @@ -216,6 +217,18 @@ public void handle(Void event) {
});
});

Long timerId;
if (config.autoPingInterval().isPresent()) {
timerId = vertx.setPeriodic(config.autoPingInterval().getAsLong(), new Handler<Long>() {
@Override
public void handle(Long timerId) {
connection.sendAutoPing();
}
});
} else {
timerId = null;
}

ws.closeHandler(new Handler<Void>() {
@Override
public void handle(Void event) {
Expand All @@ -229,6 +242,9 @@ public void handle(Void event) {
LOG.errorf(r.cause(), "Unable to complete @OnClose callback: %s", connection);
}
connectionManager.remove(generatedEndpointClass, connection);
if (timerId != null) {
vertx.cancelTimer(timerId);
}
});
}
});
Expand All @@ -249,6 +265,7 @@ public void handle(Void event) {
});
}
});

});
}
};
Expand Down

0 comments on commit 9821b4f

Please sign in to comment.