diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamResource.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamResource.java index def237ac8da81..307d5b77d5d81 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamResource.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamResource.java @@ -8,7 +8,10 @@ import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; +import javax.ws.rs.sse.OutboundSseEvent; +import javax.ws.rs.sse.Sse; import org.jboss.resteasy.reactive.common.util.MultiCollectors; import org.reactivestreams.Publisher; @@ -152,4 +155,13 @@ public Multi sse() { public Multi sseThrows() { throw new IllegalStateException("STOP"); } + + @Path("sse/raw") + @GET + @Produces(MediaType.SERVER_SENT_EVENTS) + public Multi sseRaw(@Context Sse sse) { + return Multi.createFrom().items(sse.newEventBuilder().id("one").data("uno").name("eins").build(), + sse.newEventBuilder().id("two").data("dos").name("zwei").build(), + sse.newEventBuilder().id("three").data("tres").name("drei").build()); + } } diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamTestCase.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamTestCase.java index 36df4075e7606..d43b1ddb1249b 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamTestCase.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamTestCase.java @@ -237,4 +237,32 @@ public void testSseThrows() throws InterruptedException { Assertions.assertEquals(1, errors.size()); } } + + @Test + public void testSseForMultiWithOutboundSseEvent() throws InterruptedException { + Client client = ClientBuilder.newBuilder().build(); + WebTarget target = client.target(this.uri.toString() + "stream/sse/raw"); + try (SseEventSource sse = SseEventSource.target(target).build()) { + CountDownLatch latch = new CountDownLatch(1); + List errors = new CopyOnWriteArrayList<>(); + List results = new CopyOnWriteArrayList<>(); + List ids = new CopyOnWriteArrayList<>(); + List names = new CopyOnWriteArrayList<>(); + sse.register(event -> { + results.add(event.readData()); + ids.add(event.getId()); + names.add(event.getName()); + }, error -> { + errors.add(error); + }, () -> { + latch.countDown(); + }); + sse.open(); + Assertions.assertTrue(latch.await(20, TimeUnit.SECONDS)); + Assertions.assertEquals(Arrays.asList("uno", "dos", "tres"), results); + Assertions.assertEquals(Arrays.asList("one", "two", "three"), ids); + Assertions.assertEquals(Arrays.asList("eins", "zwei", "drei"), names); + Assertions.assertEquals(0, errors.size()); + } + } } diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java index 3ac1f35a6deb3..45389d6c2ca0e 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java @@ -11,6 +11,7 @@ import java.util.function.Consumer; import javax.ws.rs.core.MediaType; +import javax.ws.rs.sse.OutboundSseEvent; import org.jboss.logging.Logger; import org.jboss.resteasy.reactive.common.util.RestMediaType; @@ -49,7 +50,12 @@ private static class SseMultiSubscriber extends AbstractMultiSubscriber { @Override public void onNext(Object item) { - OutboundSseEventImpl event = new OutboundSseEventImpl.BuilderImpl().data(item).build(); + OutboundSseEvent event; + if (item instanceof OutboundSseEvent) { + event = (OutboundSseEvent) item; + } else { + event = new OutboundSseEventImpl.BuilderImpl().data(item).build(); + } SseUtil.send(requestContext, event, customizers).whenComplete(new BiConsumer() { @Override public void accept(Object v, Throwable t) {