Skip to content

Commit

Permalink
WebSockets Next: error handlers part 4
Browse files Browse the repository at this point in the history
- use error handlers to process Mutiny Multi failures
  • Loading branch information
mkouba committed Apr 2, 2024
1 parent e00af30 commit c279876
Show file tree
Hide file tree
Showing 10 changed files with 513 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ private String generateEndpoint(WebSocketEndpointBuildItem endpoint,
MethodDescriptor.ofMethod(WebSocketEndpointBase.class, "beanInstance", Object.class, String.class),
doOnOpen.getThis(), doOnOpen.load(endpoint.bean.getIdentifier()));
// Call the business method
TryBlock tryBlock = onErrorTryBlock(doOnOpen);
TryBlock tryBlock = onErrorTryBlock(doOnOpen, doOnOpen.getThis());
ResultHandle[] args = callback.generateArguments(tryBlock.getThis(), tryBlock, transformedAnnotations, index);
ResultHandle ret = tryBlock.invokeVirtualMethod(MethodDescriptor.of(callback.method), beanInstance, args);
encodeAndReturnResult(tryBlock.getThis(), tryBlock, callback, globalErrorHandlers, endpoint, ret);
Expand All @@ -488,7 +488,7 @@ private String generateEndpoint(WebSocketEndpointBuildItem endpoint,
MethodDescriptor.ofMethod(WebSocketEndpointBase.class, "beanInstance", Object.class, String.class),
doOnClose.getThis(), doOnClose.load(endpoint.bean.getIdentifier()));
// Call the business method
TryBlock tryBlock = onErrorTryBlock(doOnClose);
TryBlock tryBlock = onErrorTryBlock(doOnClose, doOnClose.getThis());
ResultHandle[] args = callback.generateArguments(tryBlock.getThis(), tryBlock, transformedAnnotations, index);
ResultHandle ret = tryBlock.invokeVirtualMethod(MethodDescriptor.of(callback.method), beanInstance, args);
encodeAndReturnResult(tryBlock.getThis(), tryBlock, callback, globalErrorHandlers, endpoint, ret);
Expand Down Expand Up @@ -632,7 +632,7 @@ private void generateOnMessage(ClassCreator endpointCreator, WebSocketEndpointBu
MethodCreator doOnMessage = endpointCreator.getMethodCreator("doOn" + messageType + "Message", Uni.class,
methodParameterType);

TryBlock tryBlock = onErrorTryBlock(doOnMessage);
TryBlock tryBlock = onErrorTryBlock(doOnMessage, doOnMessage.getThis());
// Foo foo = beanInstance("foo");
ResultHandle beanInstance = tryBlock.invokeVirtualMethod(
MethodDescriptor.ofMethod(WebSocketEndpointBase.class, "beanInstance", Object.class, String.class),
Expand Down Expand Up @@ -673,13 +673,13 @@ private TryBlock uniFailureTryBlock(BytecodeCreator method) {
return tryBlock;
}

private TryBlock onErrorTryBlock(BytecodeCreator method) {
private TryBlock onErrorTryBlock(BytecodeCreator method, ResultHandle endpointThis) {
TryBlock tryBlock = method.tryBlock();
CatchBlockCreator catchBlock = tryBlock.addCatch(Throwable.class);
// return doOnError(t);
catchBlock.returnValue(catchBlock.invokeVirtualMethod(
MethodDescriptor.ofMethod(WebSocketEndpointBase.class, "doOnError", Uni.class, Throwable.class),
catchBlock.getThis(), catchBlock.getCaughtException()));
endpointThis, catchBlock.getCaughtException()));
return tryBlock;
}

Expand Down Expand Up @@ -810,23 +810,28 @@ private ResultHandle encodeMessage(ResultHandle endpointThis, BytecodeCreator me
return uniOnFailureDoOnError(endpointThis, method, callback, uniChain, endpoint, globalErrorHandlers);
}
} else if (callback.isReturnTypeMulti()) {
// return multiBinary(multi, broadcast, m -> {
// Buffer buffer = encodeBuffer(m);
// return sendBinary(buffer,broadcast);
//});
// try {
// Buffer buffer = encodeBuffer(m);
// return sendBinary(buffer,broadcast);
// } catch(Throwable t) {
// return doOnError(t);
// }
FunctionCreator fun = method.createFunction(Function.class);
BytecodeCreator funBytecode = fun.getBytecode();
ResultHandle buffer = encodeBuffer(funBytecode, callback.returnType().asParameterizedType().arguments().get(0),
funBytecode.getMethodParam(0), endpointThis, callback);
funBytecode.returnValue(funBytecode.invokeVirtualMethod(
// This checkcast should not be necessary but we need to use the endpoint in the function bytecode
// otherwise gizmo does not access the endpoint reference correcly
ResultHandle endpointBase = funBytecode.checkCast(endpointThis, WebSocketEndpointBase.class);
TryBlock tryBlock = onErrorTryBlock(fun.getBytecode(), endpointBase);
ResultHandle buffer = encodeBuffer(tryBlock, callback.returnType().asParameterizedType().arguments().get(0),
tryBlock.getMethodParam(0), endpointThis, callback);
tryBlock.returnValue(tryBlock.invokeVirtualMethod(
MethodDescriptor.ofMethod(WebSocketEndpointBase.class,
"sendBinary", Uni.class, Buffer.class, boolean.class),
endpointThis, buffer,
funBytecode.load(callback.broadcast())));
tryBlock.load(callback.broadcast())));
return method.invokeVirtualMethod(MethodDescriptor.ofMethod(WebSocketEndpointBase.class,
"multiBinary", Uni.class, Multi.class, boolean.class, Function.class), endpointThis,
"multiBinary", Uni.class, Multi.class, Function.class), endpointThis,
value,
method.load(callback.broadcast()),
fun.getInstance());
} else {
// return sendBinary(buffer,broadcast);
Expand Down Expand Up @@ -865,22 +870,29 @@ private ResultHandle encodeMessage(ResultHandle endpointThis, BytecodeCreator me
}
} else if (callback.isReturnTypeMulti()) {
// return multiText(multi, broadcast, m -> {
// String text = encodeText(m);
// return sendText(buffer,broadcast);
// try {
// String text = encodeText(m);
// return sendText(buffer,broadcast);
// } catch(Throwable t) {
// return doOnError(t);
// }
//});
FunctionCreator fun = method.createFunction(Function.class);
BytecodeCreator funBytecode = fun.getBytecode();
ResultHandle text = encodeText(funBytecode, callback.returnType().asParameterizedType().arguments().get(0),
funBytecode.getMethodParam(0), endpointThis, callback);
funBytecode.returnValue(funBytecode.invokeVirtualMethod(
// This checkcast should not be necessary but we need to use the endpoint in the function bytecode
// otherwise gizmo does not access the endpoint reference correcly
ResultHandle endpointBase = funBytecode.checkCast(endpointThis, WebSocketEndpointBase.class);
TryBlock tryBlock = onErrorTryBlock(fun.getBytecode(), endpointBase);
ResultHandle text = encodeText(tryBlock, callback.returnType().asParameterizedType().arguments().get(0),
tryBlock.getMethodParam(0), endpointThis, callback);
tryBlock.returnValue(tryBlock.invokeVirtualMethod(
MethodDescriptor.ofMethod(WebSocketEndpointBase.class,
"sendText", Uni.class, String.class, boolean.class),
endpointThis, text,
funBytecode.load(callback.broadcast())));
tryBlock.load(callback.broadcast())));
return method.invokeVirtualMethod(MethodDescriptor.ofMethod(WebSocketEndpointBase.class,
"multiText", Uni.class, Multi.class, boolean.class, Function.class), endpointThis,
"multiText", Uni.class, Multi.class, Function.class), endpointThis,
value,
method.load(callback.broadcast()),
fun.getInstance());
} else {
// return sendText(text,broadcast);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package io.quarkus.websockets.next.test.errors;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;

import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.BinaryDecodeException;
import io.quarkus.websockets.next.OnBinaryMessage;
import io.quarkus.websockets.next.OnError;
import io.quarkus.websockets.next.WebSocket;
import io.quarkus.websockets.next.test.utils.WSClient;
import io.smallrye.mutiny.Multi;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.mutiny.core.Context;

public class MultiBinaryDecodeErrorTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(Echo.class, WSClient.class);
});

@Inject
Vertx vertx;

@TestHTTPResource("echo")
URI testUri;

@Test
void testError() {
WSClient client = WSClient.create(vertx).connect(testUri);
client.send(Buffer.buffer("1"));
client.waitForMessages(1);
assertEquals("Problem decoding: 1", client.getLastMessage().toString());
}

@WebSocket(path = "/echo")
public static class Echo {

@OnBinaryMessage
Multi<Integer> process(Multi<Integer> messages) {
return messages;
}

@OnError
String decodingError(BinaryDecodeException e) {
assertTrue(Context.isOnWorkerThread());
return "Problem decoding: " + e.getBytes().toString();
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package io.quarkus.websockets.next.test.errors;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;

import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.BinaryEncodeException;
import io.quarkus.websockets.next.OnBinaryMessage;
import io.quarkus.websockets.next.OnError;
import io.quarkus.websockets.next.WebSocket;
import io.quarkus.websockets.next.test.utils.WSClient;
import io.smallrye.mutiny.Multi;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.mutiny.core.Context;

public class MultiBinaryEncodeErrorTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(Echo.class, WSClient.class);
});

@Inject
Vertx vertx;

@TestHTTPResource("echo")
URI testUri;

@Test
void testError() {
WSClient client = WSClient.create(vertx).connect(testUri);
client.send(Buffer.buffer("1"));
client.waitForMessages(1);
assertEquals("Problem encoding: 1", client.getLastMessage().toString());
}

@WebSocket(path = "/echo")
public static class Echo {

@OnBinaryMessage
Multi<Integer> process(Buffer message) {
return Multi.createFrom().item(Integer.parseInt(message.toString()));
}

@OnError
String encodingError(BinaryEncodeException e) {
assertTrue(Context.isOnWorkerThread());
return "Problem encoding: " + e.getEncodedObject().toString();
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package io.quarkus.websockets.next.test.errors;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.net.URI;
import java.time.Duration;

import jakarta.inject.Inject;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.OnError;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;
import io.quarkus.websockets.next.WebSocketConnection;
import io.quarkus.websockets.next.test.utils.WSClient;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Vertx;

public class MultiFailureCloseConnectionTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(Echo.class, WSClient.class);
});

@Inject
Vertx vertx;

@TestHTTPResource("echo")
URI testUri;

@Test
void testError() {
WSClient client = WSClient.create(vertx).connect(testUri);
client.sendAndAwait("bar,foo,baz");
// "bar" should be sent back
client.waitForMessages(1);
// "foo" results in a failure -> connection closed
Awaitility.await().atMost(Duration.ofSeconds(5)).until(() -> client.isClosed());
// "foo" and "baz" should never be sent back
assertEquals(1, client.getMessages().size());
}

@WebSocket(path = "/echo")
public static class Echo {

@OnTextMessage
Multi<String> process(String message) {
return Multi.createFrom().items(message.split(",")).invoke(s -> {
if (s.equals("foo")) {
throw new IllegalArgumentException();
}
});
}

@OnError
Uni<Void> runtimeProblem(IllegalArgumentException e, WebSocketConnection connection) {
return connection.close();
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package io.quarkus.websockets.next.test.errors;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.net.URI;

import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.OnError;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;
import io.quarkus.websockets.next.test.utils.WSClient;
import io.smallrye.mutiny.Multi;
import io.vertx.core.Vertx;

public class MultiFailureTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(Echo.class, WSClient.class);
});

@Inject
Vertx vertx;

@TestHTTPResource("echo")
URI testUri;

@Test
void testError() {
WSClient client = WSClient.create(vertx).connect(testUri);
client.sendAndAwait("bar,foo,baz");
client.waitForMessages(2);
assertEquals("bar", client.getMessages().get(0).toString());
assertEquals("foo detected", client.getMessages().get(1).toString());
}

@WebSocket(path = "/echo")
public static class Echo {

@OnTextMessage
Multi<String> process(String message) {
return Multi.createFrom().items(message.split(",")).invoke(s -> {
if (s.equals("foo")) {
throw new IllegalArgumentException();
}
});
}

@OnError
String runtimeProblem(IllegalArgumentException e) {
return "foo detected";
}

}

}
Loading

0 comments on commit c279876

Please sign in to comment.