diff --git a/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/broadcast/BroadcastOnOpenTest.java b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/broadcast/BroadcastOnOpenTest.java index cebc41a1e7ced..8c4cbf205df7f 100644 --- a/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/broadcast/BroadcastOnOpenTest.java +++ b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/broadcast/BroadcastOnOpenTest.java @@ -64,22 +64,23 @@ public void assertBroadcast(URI testUri) throws Exception { if (r.succeeded()) { WebSocket ws = r.result(); ws.textMessageHandler(msg -> { - messages.add(msg); + messages.add(msg + ":client1"); if (msg.equals("c1")) { c1MessageLatch.countDown(); } else if (msg.equals("c2")) { // onOpen callback from the second client c2MessageLatch.countDown(); } - }); + // Trigger emission for LoMultiProduce + ws.writeTextMessage("foo"); } else { throw new IllegalStateException(r.cause()); } }); assertTrue(c1MessageLatch.await(5, TimeUnit.SECONDS)); assertEquals(1, messages.size()); - assertEquals("c1", messages.get(0)); + assertEquals("c1:client1", messages.get(0)); messages.clear(); // Now connect the second client client2 @@ -88,18 +89,20 @@ public void assertBroadcast(URI testUri) throws Exception { if (r.succeeded()) { WebSocket ws = r.result(); ws.textMessageHandler(msg -> { - messages.add(msg); + messages.add(msg + ":client2"); c2MessageLatch.countDown(); }); + // Trigger emission for LoMultiProduce + ws.writeTextMessage("foo"); } else { throw new IllegalStateException(r.cause()); } }); assertTrue(c2MessageLatch.await(5, TimeUnit.SECONDS), "Messages: " + messages); // onOpen should be broadcasted to both clients - assertEquals(2, messages.size()); - assertEquals("c2", messages.get(0)); - assertEquals("c2", messages.get(1)); + assertEquals(2, messages.size(), "Messages: " + messages); + assertEquals("c2", messages.get(0).substring(0, 2)); + assertEquals("c2", messages.get(1).substring(0, 2)); } finally { client1.close().toCompletionStage().toCompletableFuture().get(); client2.close().toCompletionStage().toCompletableFuture().get(); diff --git a/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/broadcast/LoMultiProduce.java b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/broadcast/LoMultiProduce.java index 1ee4544554a9f..3a287cb90b2ef 100644 --- a/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/broadcast/LoMultiProduce.java +++ b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/broadcast/LoMultiProduce.java @@ -2,12 +2,16 @@ import static org.junit.jupiter.api.Assertions.assertTrue; +import jakarta.enterprise.context.SessionScoped; +import jakarta.enterprise.inject.Produces; import jakarta.inject.Inject; import io.quarkus.websockets.next.OnOpen; +import io.quarkus.websockets.next.OnTextMessage; import io.quarkus.websockets.next.WebSocket; import io.quarkus.websockets.next.WebSocketConnection; import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor; import io.vertx.core.Context; @WebSocket(path = "/lo-multi-produce/{client}") @@ -16,10 +20,24 @@ public class LoMultiProduce { @Inject WebSocketConnection connection; + @Inject + BroadcastProcessor broadcast; + @OnOpen(broadcast = true) Multi open() { assertTrue(Context.isOnEventLoopThread()); - return Multi.createFrom().item(connection.pathParam("client").toLowerCase()); + return broadcast; + } + + @OnTextMessage + void trigger(String message) { + broadcast.onNext(connection.pathParam("client").toLowerCase()); + } + + @Produces + @SessionScoped + BroadcastProcessor multiProducer() { + return BroadcastProcessor.create(); } }