Skip to content

Commit

Permalink
Merge pull request quarkusio#39542 from mkouba/websockets-next-flaky-…
Browse files Browse the repository at this point in the history
…BroadcastOnOpenTest

WebSockets Next: attempt to diagnose BroadcastOnOpenTest
  • Loading branch information
gastaldi authored Mar 19, 2024
2 parents 9c5f191 + accdc03 commit 9d6f633
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,23 @@ public void assertBroadcast(URI testUri) throws Exception {
if (r.succeeded()) {
WebSocket ws = r.result();
ws.textMessageHandler(msg -> {
messages.add(msg);
messages.add(msg + ":client1");
if (msg.equals("c1")) {
c1MessageLatch.countDown();
} else if (msg.equals("c2")) {
// onOpen callback from the second client
c2MessageLatch.countDown();
}

});
// Trigger emission for LoMultiProduce
ws.writeTextMessage("foo");
} else {
throw new IllegalStateException(r.cause());
}
});
assertTrue(c1MessageLatch.await(5, TimeUnit.SECONDS));
assertEquals(1, messages.size());
assertEquals("c1", messages.get(0));
assertEquals("c1:client1", messages.get(0));
messages.clear();
// Now connect the second client
client2
Expand All @@ -88,18 +89,20 @@ public void assertBroadcast(URI testUri) throws Exception {
if (r.succeeded()) {
WebSocket ws = r.result();
ws.textMessageHandler(msg -> {
messages.add(msg);
messages.add(msg + ":client2");
c2MessageLatch.countDown();
});
// Trigger emission for LoMultiProduce
ws.writeTextMessage("foo");
} else {
throw new IllegalStateException(r.cause());
}
});
assertTrue(c2MessageLatch.await(5, TimeUnit.SECONDS), "Messages: " + messages);
// onOpen should be broadcasted to both clients
assertEquals(2, messages.size());
assertEquals("c2", messages.get(0));
assertEquals("c2", messages.get(1));
assertEquals(2, messages.size(), "Messages: " + messages);
assertEquals("c2", messages.get(0).substring(0, 2));
assertEquals("c2", messages.get(1).substring(0, 2));
} finally {
client1.close().toCompletionStage().toCompletableFuture().get();
client2.close().toCompletionStage().toCompletableFuture().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@

import static org.junit.jupiter.api.Assertions.assertTrue;

import jakarta.enterprise.context.SessionScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;

import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;
import io.quarkus.websockets.next.WebSocketConnection;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor;
import io.vertx.core.Context;

@WebSocket(path = "/lo-multi-produce/{client}")
Expand All @@ -16,10 +20,24 @@ public class LoMultiProduce {
@Inject
WebSocketConnection connection;

@Inject
BroadcastProcessor<String> broadcast;

@OnOpen(broadcast = true)
Multi<String> open() {
assertTrue(Context.isOnEventLoopThread());
return Multi.createFrom().item(connection.pathParam("client").toLowerCase());
return broadcast;
}

@OnTextMessage
void trigger(String message) {
broadcast.onNext(connection.pathParam("client").toLowerCase());
}

@Produces
@SessionScoped
BroadcastProcessor<String> multiProducer() {
return BroadcastProcessor.create();
}

}

0 comments on commit 9d6f633

Please sign in to comment.