Skip to content

Commit

Permalink
WebSockets Next: add rule for Transactional annotation
Browse files Browse the repository at this point in the history
  • Loading branch information
mkouba committed Nov 14, 2024
1 parent 20dc22a commit ce12ec3
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 12 deletions.
21 changes: 12 additions & 9 deletions docs/src/main/asciidoc/websockets-next-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -233,20 +233,23 @@ Method receiving messages from the client are annotated with `@OnTextMessage` or

==== Invocation rules

When invoking these annotated methods, the _session_ scope linked to the WebSocket connection remains active.
When invoking the callback methods, the _session_ scope linked to the WebSocket connection remains active.
In addition, the request scope is active until the completion of the method (or until it produces its result for async and reactive methods).

Quarkus WebSocket Next supports _blocking_ and _non-blocking_ logic, akin to Quarkus REST, determined by the method signature and additional annotations such as `@Blocking` and `@NonBlocking`.
WebSocket Next supports _blocking_ and _non-blocking_ logic, akin to Quarkus REST, determined from the return type of the method and additional annotations such as `@Blocking` and `@NonBlocking`.

Here are the rules governing execution:

* Non-blocking methods must execute on the connection's event loop.
* Methods annotated with `@RunOnVirtualThread` are considered blocking and should execute on a virtual thread.
* Blocking methods must execute on a worker thread if not annotated with `@RunOnVirtualThread`.
* When `@RunOnVirtualThread` is employed, each invocation spawns a new virtual thread.
* Methods returning `CompletionStage`, `Uni` and `Multi` are considered non-blocking.
* Methods returning `void` or plain objects are considered blocking.
* Kotlin `suspend` functions are considered non-blocking.
* Methods annotated with `@RunOnVirtualThread`, `@Blocking` or `@Transactional` are considered blocking.
* Methods annotated with `@NonBlocking` are considered non-blocking.
* Methods declared on a class annotated with `@Transactional` are considered blocking unless annotated with `@NonBlocking`.
* If the method does not declare any of the annotations listed above the execution model is derived from the return type:
** Methods returning `Uni` and `Multi` are considered non-blocking.
** Methods returning `void` or any other type are considered blocking.
* Kotlin `suspend` functions are always considered non-blocking and may not be annotated with `@Blocking`, `@NonBlocking` or `@RunOnVirtualThread`.
* Non-blocking methods must execute on the connection's event loop thread.
* Blocking methods must execute on a worker thread unless annotated with `@RunOnVirtualThread`.
* Methods annotated with `@RunOnVirtualThread` must execute on a virtual thread, each invocation spawns a new virtual thread.

==== Method parameters

Expand Down
5 changes: 5 additions & 0 deletions extensions/websockets-next/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@
<artifactId>opentelemetry-semconv</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>jakarta.transaction</groupId>
<artifactId>jakarta.transaction-api</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ final class WebSocketDotNames {
static final DotName HANDSHAKE_REQUEST = DotName.createSimple(HandshakeRequest.class);
static final DotName THROWABLE = DotName.createSimple(Throwable.class);
static final DotName CLOSE_REASON = DotName.createSimple(CloseReason.class);
static final DotName TRANSACTIONAL = DotName.createSimple("jakarta.transaction.Transactional");

static final List<DotName> CALLBACK_ANNOTATIONS = List.of(ON_OPEN, ON_CLOSE, ON_BINARY_MESSAGE, ON_TEXT_MESSAGE,
ON_PONG_MESSAGE, ON_ERROR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1578,13 +1578,16 @@ private static ExecutionModel executionModel(MethodInfo method, TransformedAnnot
throw new WebSocketException("Kotlin `suspend` functions in WebSockets Next endpoints may not be "
+ "annotated @Blocking, @NonBlocking or @RunOnVirtualThread: " + method);
}

if (transformedAnnotations.hasAnnotation(method, WebSocketDotNames.RUN_ON_VIRTUAL_THREAD)) {
return ExecutionModel.VIRTUAL_THREAD;
} else if (transformedAnnotations.hasAnnotation(method, WebSocketDotNames.BLOCKING)) {
return ExecutionModel.WORKER_THREAD;
} else if (transformedAnnotations.hasAnnotation(method, WebSocketDotNames.NON_BLOCKING)) {
return ExecutionModel.EVENT_LOOP;
} else if (transformedAnnotations.hasAnnotation(method, WebSocketDotNames.TRANSACTIONAL)
|| transformedAnnotations.hasAnnotation(method.declaringClass(), WebSocketDotNames.TRANSACTIONAL)) {
// Method annotated with @Transactional or declared on a class annotated @Transactional is also treated as a blocking method
return ExecutionModel.WORKER_THREAD;
} else {
return hasBlockingSignature(method) ? ExecutionModel.WORKER_THREAD : ExecutionModel.EVENT_LOOP;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public class BlockingAnnotationTest {

@Test
void testEndoint() {
try (WSClient client = new WSClient(vertx).connect(endUri)) {
try (WSClient client = new WSClient(vertx)) {
client.connect(endUri);
assertEquals("evenloop:false,worker:true", client.sendAndAwaitReply("foo").toString());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public class NonBlockingAnnotationTest {

@Test
void testEndoint() {
try (WSClient client = new WSClient(vertx).connect(endUri)) {
try (WSClient client = new WSClient(vertx)) {
client.connect(endUri);
assertEquals("evenloop:true,worker:false", client.sendAndAwaitReply("foo").toString());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.quarkus.websockets.next.test.executionmodel;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.net.URI;

import jakarta.inject.Inject;
import jakarta.transaction.Transactional;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;
import io.quarkus.websockets.next.test.utils.WSClient;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;
import io.vertx.core.Vertx;

public class TransactionalClassTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(Endpoint.class, WSClient.class);
});

@Inject
Vertx vertx;

@TestHTTPResource("endpoint")
URI endUri;

@Test
void testEndoint() {
try (WSClient client = new WSClient(vertx)) {
client.connect(endUri);
assertEquals("evenloop:false,worker:true", client.sendAndAwaitReply("foo").toString());
}
}

@Transactional
@WebSocket(path = "/endpoint")
public static class Endpoint {

@OnTextMessage
Uni<String> message(String ignored) {
return Uni.createFrom().item("evenloop:" + Context.isOnEventLoopThread() + ",worker:" + Context.isOnWorkerThread());
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.quarkus.websockets.next.test.executionmodel;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.net.URI;

import jakarta.inject.Inject;
import jakarta.transaction.Transactional;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;
import io.quarkus.websockets.next.test.utils.WSClient;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;
import io.vertx.core.Vertx;

public class TransactionalMethodTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(Endpoint.class, WSClient.class);
});

@Inject
Vertx vertx;

@TestHTTPResource("endpoint")
URI endUri;

@Test
void testEndoint() {
try (WSClient client = new WSClient(vertx)) {
client.connect(endUri);
assertEquals("evenloop:false,worker:true", client.sendAndAwaitReply("foo").toString());
}
}

@WebSocket(path = "/endpoint")
public static class Endpoint {

@Transactional
@OnTextMessage
Uni<String> message(String ignored) {
return Uni.createFrom().item("evenloop:" + Context.isOnEventLoopThread() + ",worker:" + Context.isOnWorkerThread());
}

}

}

0 comments on commit ce12ec3

Please sign in to comment.