Skip to content

Commit

Permalink
Merge pull request #40655 from mkouba/issue-40648
Browse files Browse the repository at this point in the history
WebSockets Next: provide strategies to process unhandled failures
  • Loading branch information
mkouba authored May 16, 2024
2 parents 670b1ad + faf0d36 commit 8402d10
Show file tree
Hide file tree
Showing 25 changed files with 609 additions and 16 deletions.
6 changes: 6 additions & 0 deletions docs/src/main/asciidoc/websockets-next-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ include::_attributes.adoc[]

include::{includes}/extension-status.adoc[]

The `quarkus-websockets-next` extension provides a modern declarative API to define WebSocket server and client endpoints.

== The WebSocket protocol

The _WebSocket_ protocol, documented in the https://datatracker.ietf.org/doc/html/rfc6455[RFC6455], establishes a standardized method for creating a bidirectional communication channel between a client and a server through a single TCP connection.
Expand Down Expand Up @@ -457,6 +459,10 @@ The method that declares a most-specific supertype of the actual exception is se

NOTE: The `@io.quarkus.websockets.next.OnError` annotation can be also used to declare a global error handler, i.e. a method that is not declared on a WebSocket endpoint. Such a method may not accept `@PathParam` paremeters. Error handlers declared on an endpoint take precedence over the global error handlers.

When an error occurs but no error handler can handle the failure, Quarkus uses the strategy specified by `quarkus.websockets-next.server.unhandled-failure-strategy` and `quarkus.websockets-next.client.unhandled-failure-strategy`, respectively.
By default, the connection is closed.
Alternatively, an error message can be logged or no operation performed.

== Access to the WebSocketConnection

The `io.quarkus.websockets.next.WebSocketConnection` object represents the WebSocket connection.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.quarkus.websockets.next.test.client;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;

import io.quarkus.websockets.next.OnClose;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocketClient;

@WebSocketClient(path = "/endpoint")
public class ClientMessageErrorEndpoint {

static final CountDownLatch MESSAGE_LATCH = new CountDownLatch(1);

static final List<String> MESSAGES = new CopyOnWriteArrayList<>();

static final CountDownLatch CLOSED_LATCH = new CountDownLatch(1);

@OnTextMessage
void message(String message) {
if ("foo".equals(message)) {
throw new IllegalStateException("I cannot do it!");
} else {
MESSAGES.add(message);
}
MESSAGE_LATCH.countDown();
}

@OnClose
void close() {
CLOSED_LATCH.countDown();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.quarkus.websockets.next.test.client;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;

import io.quarkus.websockets.next.OnClose;
import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocketClient;

@WebSocketClient(path = "/endpoint")
public class ClientOpenErrorEndpoint {

static final CountDownLatch MESSAGE_LATCH = new CountDownLatch(1);

static final List<String> MESSAGES = new CopyOnWriteArrayList<>();

static final CountDownLatch CLOSED_LATCH = new CountDownLatch(1);

@OnOpen
void open() {
throw new IllegalStateException("I cannot do it!");
}

@OnTextMessage
void message(String message) {
MESSAGES.add(message);
MESSAGE_LATCH.countDown();
}

@OnClose
void close() {
CLOSED_LATCH.countDown();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.quarkus.websockets.next.test.client;

import java.util.concurrent.CountDownLatch;

import io.quarkus.websockets.next.OnClose;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;

@WebSocket(path = "/endpoint")
public class ServerEndpoint {

static final CountDownLatch CLOSED_LATCH = new CountDownLatch(1);

@OnTextMessage
String echo(String message) {
return message;
}

@OnClose
void close() {
CLOSED_LATCH.countDown();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.quarkus.websockets.next.test.client;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.util.concurrent.TimeUnit;

import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.WebSocketClientConnection;
import io.quarkus.websockets.next.WebSocketConnector;

public class UnhandledMessageFailureDefaultStrategyTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(ServerEndpoint.class, ClientMessageErrorEndpoint.class);
});

@Inject
WebSocketConnector<ClientMessageErrorEndpoint> connector;

@TestHTTPResource("/")
URI testUri;

@Test
void testError() throws InterruptedException {
WebSocketClientConnection connection = connector
.baseUri(testUri)
.connectAndAwait();
connection.sendTextAndAwait("foo");
assertTrue(ServerEndpoint.CLOSED_LATCH.await(5, TimeUnit.SECONDS));
assertTrue(ClientMessageErrorEndpoint.CLOSED_LATCH.await(5, TimeUnit.SECONDS));
assertTrue(connection.isClosed());
assertEquals(WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(), connection.closeReason().getCode());
assertTrue(ClientMessageErrorEndpoint.MESSAGES.isEmpty());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.quarkus.websockets.next.test.client;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.util.concurrent.TimeUnit;

import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.WebSocketClientConnection;
import io.quarkus.websockets.next.WebSocketConnector;

public class UnhandledMessageFailureLogStrategyTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(ServerEndpoint.class, ClientMessageErrorEndpoint.class);
}).overrideConfigKey("quarkus.websockets-next.client.unhandled-failure-strategy", "log");

@Inject
WebSocketConnector<ClientMessageErrorEndpoint> connector;

@TestHTTPResource("/")
URI testUri;

@Test
void testError() throws InterruptedException {
WebSocketClientConnection connection = connector
.baseUri(testUri)
.connectAndAwait();
connection.sendTextAndAwait("foo");
assertFalse(connection.isClosed());
connection.sendText("bar");
assertTrue(ClientMessageErrorEndpoint.MESSAGE_LATCH.await(5, TimeUnit.SECONDS));
assertEquals("bar", ClientMessageErrorEndpoint.MESSAGES.get(0));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.quarkus.websockets.next.test.client;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.util.concurrent.TimeUnit;

import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.WebSocketClientConnection;
import io.quarkus.websockets.next.WebSocketConnector;

public class UnhandledOpenFailureDefaultStrategyTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(ServerEndpoint.class, ClientOpenErrorEndpoint.class);
});

@Inject
WebSocketConnector<ClientOpenErrorEndpoint> connector;

@TestHTTPResource("/")
URI testUri;

@Test
void testError() throws InterruptedException {
WebSocketClientConnection connection = connector
.baseUri(testUri)
.connectAndAwait();
assertTrue(ServerEndpoint.CLOSED_LATCH.await(5, TimeUnit.SECONDS));
assertTrue(ClientOpenErrorEndpoint.CLOSED_LATCH.await(5, TimeUnit.SECONDS));
assertTrue(connection.isClosed());
assertEquals(WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(), connection.closeReason().getCode());
assertTrue(ClientOpenErrorEndpoint.MESSAGES.isEmpty());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.quarkus.websockets.next.test.client;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.util.concurrent.TimeUnit;

import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.WebSocketClientConnection;
import io.quarkus.websockets.next.WebSocketConnector;

public class UnhandledOpenFailureLogStrategyTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(ServerEndpoint.class, ClientOpenErrorEndpoint.class);
}).overrideConfigKey("quarkus.websockets-next.client.unhandled-failure-strategy", "log");

@Inject
WebSocketConnector<ClientOpenErrorEndpoint> connector;

@TestHTTPResource("/")
URI testUri;

@Test
void testError() throws InterruptedException {
WebSocketClientConnection connection = connector
.baseUri(testUri)
.connectAndAwait();
connection.sendTextAndAwait("foo");
assertFalse(connection.isClosed());
assertNull(connection.closeReason());
assertTrue(ClientOpenErrorEndpoint.MESSAGE_LATCH.await(5, TimeUnit.SECONDS));
assertEquals("foo", ClientOpenErrorEndpoint.MESSAGES.get(0));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.quarkus.websockets.next.test.errors;

import java.util.concurrent.CountDownLatch;

import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;

@WebSocket(path = "/echo")
public class EchoMessageError {

static final CountDownLatch MESSAGE_FAILURE_CALLED = new CountDownLatch(1);

@OnTextMessage
String echo(String message) {
if ("foo".equals(message)) {
MESSAGE_FAILURE_CALLED.countDown();
throw new IllegalStateException("I cannot do it!");
} else {
return message;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.quarkus.websockets.next.test.errors;

import java.util.concurrent.CountDownLatch;

import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;

@WebSocket(path = "/echo")
public class EchoOpenError {

static final CountDownLatch OPEN_CALLED = new CountDownLatch(1);

@OnOpen
void open() {
OPEN_CALLED.countDown();
throw new IllegalStateException("I cannot do it!");
}

@OnTextMessage
String echo(String message) {
return message;
}

}
Loading

0 comments on commit 8402d10

Please sign in to comment.