From 0b1bf996d5b7025e97569ee8421bfa85ad106c89 Mon Sep 17 00:00:00 2001 From: Robert Stupp Date: Thu, 15 Feb 2024 15:29:02 +0100 Subject: [PATCH] Enhance RestMulti, configurable demand + distinct objects Adds two enhancements: 1. Produce multiple JSON objects, not just an array 2. Make requested demand configurable == Produce multiple JSON objects Currently, `PublisherResponseHandler.StreamingMultiSubscriber` produces a JSON array, where each emitted item is encoded as a JSON array element. For some use cases it is easier to consume a bunch of "bare" JSON objects - i.e. just write the individual JSON objects, possibly separated by a newline. As an option, of course. Proposal to add: ```java RestMulti.fromMultiData(multi).encodeAsArray(false)... ``` With `encodeAsArray(false)`, the produced JSON would look like this: ```json {"some": "value"} {"some": "value"} {"some": "value"} ``` `encodeAsArray(true)` or omitting it would use the current behavior and produce something like this: ```json [{"some": "value"}, {"some": "value"}, {"some": "value"} } ``` == Configure request-demand All implementations of `PublisherResponseHandler.AbstractMultiSubscriber` work with a hard-coded request-demand of `1`, which means that every emitted item is "produced"/"computed" serially / one-after-the-other. If the computation of individual items takes somewhat longer, possibly waiting for remote resources to reply, it makes sense to use a higher demand to produce multiple items concurrently. For example, if each item takes maybe 250 ms (requesting data from a remote source) to be produced, and 100 items are produced, it currently takes 25 seconds. With a higher concurrency it would take a fraction of that time. I.e. if the use case is known to be not CPU but (async) I/O bound, it _might_ be legit/feasible to use a high demand. Proposal to add: ``` RestMulti.fromMultiData(multi).withDemand( (long) 123 )... ``` Which would pass `123` as the demand for all call sites to `Subscription.request` in implementations of `PublisherResponseHandler.AbstractMultiSubscriber`. --- docs/src/main/asciidoc/resteasy-reactive.adoc | 118 +++++++++++ .../deployment/test/streams/Demands.java | 15 ++ .../test/streams/StreamResource.java | 66 +++++++ .../test/streams/StreamTestCase.java | 36 +++- .../jboss/resteasy/reactive/RestMulti.java | 57 +++++- .../handlers/PublisherResponseHandler.java | 183 ++++++++---------- 6 files changed, 374 insertions(+), 101 deletions(-) create mode 100644 extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/Demands.java diff --git a/docs/src/main/asciidoc/resteasy-reactive.adoc b/docs/src/main/asciidoc/resteasy-reactive.adoc index 52278efc7003c..0670c4e9102e6 100644 --- a/docs/src/main/asciidoc/resteasy-reactive.adoc +++ b/docs/src/main/asciidoc/resteasy-reactive.adoc @@ -1005,6 +1005,124 @@ public class Endpoint { } ---- +=== Concurrent stream element processing + +By default, `RestMulti` ensures serial/sequential order of the items/elements produced by the wrapped +`Multi` by using a value of 1 for the demand signaled to the publishers. To enable concurrent +processing/generation of multiple items, use `withDemand(long demand)`. + +Using a demand higher than 1 is useful when multiple items shall be returned and the production of each +item takes some time, i.e. when parallel/concurrent production improves the service response time. Be +aware the concurrent processing also requires more resources and puts a higher load on services or +resources that are needed to produce the items. Also consider using `Multi.capDemandsTo(long)` and +`Multi.capDemandsUsing(LongFunction)`. + +The example below produces 5 (JSON) strings, but the _order_ of the strings in the returned JSON array +is not guaranteed. The below example also works for JSON objects and not just simple types. + +[source,java] +---- +package org.acme.rest; + +import jakarta.inject.Inject; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; + +import io.smallrye.mutiny.Multi; +import org.jboss.resteasy.reactive.RestMulti; + +@Path("message-stream") +public class Endpoint { + @GET + public Multi streamMessages() { + Multi sourceMulti = Multi + .createBy() + .merging() + .streams( + Multi.createFrom().items( + "message-1", + "message-2", + "message-3", + "message-4", + "message-5" + ) + ); + + return RestMulti + .fromMultiData(sourceMulti) + .withDemand(5) + .build(); + } +} +---- + +Example response, the order is non-deterministic. + +[source,text] +---- +"message-3" +"message-5" +"message-4" +"message-1" +"message-2" +---- + +=== Returning multiple JSON objects + +By default, `RestMulti` returns items/elements produced by the wrapped `Multi` as a JSON array, if the +media-type is `application/json`. To return separate JSON objects that are not wrapped in a JSON array, +use `encodeAsArray(false)` (`encodeAsArray(true)` is the default). Note that streaming multiple +objects this way requires a slightly different parsing on the client side, but objects can be parsed and +consumed as they appear without having to deserialize a possibly huge result at once. + +The example below produces 5 (JSON) strings, that are not wrapped in an array, like this: + +[source,text] +---- +"message-1" +"message-2" +"message-3" +"message-4" +"message-5" +---- + +[source,java] +---- +package org.acme.rest; + +import jakarta.inject.Inject; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; + +import io.smallrye.mutiny.Multi; +import org.jboss.resteasy.reactive.RestMulti; + +@Path("message-stream") +public class Endpoint { + @GET + public Multi streamMessages() { + Multi sourceMulti = Multi + .createBy() + .merging() + .streams( + Multi.createFrom().items( + "message-1", + "message-2", + "message-3", + "message-4", + "message-5" + ) + ); + + return RestMulti + .fromMultiData(sourceMulti) + .encodeAsJsonArray(false) + .build(); + } +} +---- + + === Server-Sent Event (SSE) support If you want to stream JSON objects in your response, you can use diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/Demands.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/Demands.java new file mode 100644 index 0000000000000..9d206fb4dbf4c --- /dev/null +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/Demands.java @@ -0,0 +1,15 @@ +package io.quarkus.resteasy.reactive.jackson.deployment.test.streams; + +import java.util.List; + +public class Demands { + public List demands; + + public Demands(List demands) { + this.demands = demands; + } + + // for Jsonb + public Demands() { + } +} diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/StreamResource.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/StreamResource.java index dfc905597b338..01895dfbe7026 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/StreamResource.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/StreamResource.java @@ -6,6 +6,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.Flow; import jakarta.ws.rs.GET; import jakarta.ws.rs.Path; @@ -22,6 +23,9 @@ import io.smallrye.common.annotation.Blocking; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.helpers.StrictMultiSubscriber; +import io.smallrye.mutiny.operators.multi.AbstractMultiOperator; +import io.smallrye.mutiny.operators.multi.MultiOperatorProcessor; @Path("streams") public class StreamResource { @@ -105,6 +109,68 @@ public Multi multiJson() { .header("foo", "bar").build(); } + @Path("json/multi-alt") + @GET + @RestStreamElementType(MediaType.APPLICATION_JSON) + public Multi multiJsonAlt() { + return RestMulti.fromMultiData(Multi.createFrom().items(new Message("hello"), new Message("stef"))) + .header("foo", "bar").encodeAsJsonArray(true).build(); + } + + @Path("json/multi-docs") + @GET + @Produces(MediaType.APPLICATION_JSON) + public Multi multiJsonMultiDocs() { + return RestMulti.fromMultiData(Multi.createFrom().items(new Message("hello"), new Message("stef"))) + .header("foo", "bar").encodeAsJsonArray(false).build(); + } + + @Path("json/multi-docs-huge-demand") + @GET + @Produces(MediaType.APPLICATION_JSON) + public Multi multiJsonMultiDocsHigherDemand() { + List demands = new ArrayList<>(); + + Multi inner = Multi.createBy().merging() + // Add some messages + .streams(Multi.createFrom().items( + new Message("hello"), + new Message("stef"), + new Message("snazy"), + new Message("stef"), + new Message("elani"), + new Message("foo"), + new Message("bar"), + new Message("baz"))); + + Multi items = Multi.createBy().concatenating().streams( + inner, + // Add "collected" demand values as the last JSON object, produce "lazily" to + // make sure that we "see" the demands signaled via Publisher.request(long). + Multi.createFrom().item(() -> new Demands(demands))); + + Multi outer = new AbstractMultiOperator<>(items) { + @Override + public void subscribe(Flow.Subscriber subscriber) { + this.upstream.subscribe() + .withSubscriber(new MultiOperatorProcessor(new StrictMultiSubscriber<>(subscriber)) { + @Override + public void request(long numberOfItems) { + // Collect the "demands" to return to the test case + demands.add(numberOfItems); + super.request(numberOfItems); + } + }); + } + }.log("outer"); + + return RestMulti.fromMultiData( + Multi.createBy().concatenating().streams(outer).log()) + .withDemand(5) + .encodeAsJsonArray(false) + .header("foo", "bar").build(); + } + @Path("json/multi2") @GET @Produces(MediaType.SERVER_SENT_EVENTS) diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/StreamTestCase.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/StreamTestCase.java index 2ae2a7f2895df..915ad45721cf7 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/StreamTestCase.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/StreamTestCase.java @@ -2,7 +2,10 @@ import static io.restassured.RestAssured.when; import static org.assertj.core.api.Assertions.assertThat; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.endsWith; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import java.net.URI; @@ -44,7 +47,7 @@ public class StreamTestCase { @RegisterExtension static final QuarkusUnitTest config = new QuarkusUnitTest() .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class) - .addClasses(StreamResource.class, Message.class)); + .addClasses(StreamResource.class, Message.class, Demands.class)); @Test public void testSseFromSse() throws Exception { @@ -110,6 +113,7 @@ public void testJsonMultiFromSse() { @Test public void testJsonMultiFromMulti() { testJsonMulti("streams/json/multi"); + testJsonMulti("streams/json/multi-alt"); } @Test @@ -117,6 +121,36 @@ public void testJsonMultiFromMultiWithDefaultElementType() { testJsonMulti("streams/json/multi2"); } + @Test + public void testJsonMultiMultiDoc() { + when().get(uri.toString() + "streams/json/multi-docs") + .then().statusCode(HttpStatus.SC_OK) + // @formatter:off + .body(is("{\"name\":\"hello\"}\n" + + "{\"name\":\"stef\"}\n")) + // @formatter:on + .header(HttpHeaders.CONTENT_TYPE, containsString(RestMediaType.APPLICATION_JSON)); + } + + @Test + public void testJsonMultiMultiDocHigherDemand() { + when().get(uri.toString() + "streams/json/multi-docs-huge-demand") + .then().statusCode(HttpStatus.SC_OK) + // @formatter:off + .body(allOf( + containsString("{\"name\":\"hello\"}\n"), + containsString("{\"name\":\"stef\"}\n"), + containsString("{\"name\":\"snazy\"}\n"), + containsString("{\"name\":\"elani\"}\n"), + containsString("{\"name\":\"foo\"}\n"), + containsString("{\"name\":\"bar\"}\n"), + containsString("{\"name\":\"baz\"}\n"), + endsWith("{\"demands\":[5,5]}\n"))) + // @formatter:on + .header(HttpHeaders.CONTENT_TYPE, containsString(RestMediaType.APPLICATION_JSON)) + .header("foo", equalTo("bar")); + } + @Test public void testNdJsonMultiFromMulti() { when().get(uri.toString() + "streams/ndjson/multi") diff --git a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/RestMulti.java b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/RestMulti.java index b9fc7ed4228c0..695b1d3074a2c 100644 --- a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/RestMulti.java +++ b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/RestMulti.java @@ -11,6 +11,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.function.LongFunction; import org.jboss.resteasy.reactive.common.util.CaseInsensitiveMap; import org.jboss.resteasy.reactive.common.util.MultivaluedTreeMap; @@ -18,6 +19,7 @@ import io.smallrye.mutiny.Context; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.groups.MultiMerge; import io.smallrye.mutiny.helpers.EmptyUniSubscription; import io.smallrye.mutiny.helpers.Subscriptions; import io.smallrye.mutiny.infrastructure.Infrastructure; @@ -69,6 +71,8 @@ public static class SyncRestMulti extends RestMulti { private final Multi multi; private final Integer status; private final MultivaluedTreeMap headers; + private final long demand; + private final boolean encodeAsJsonArray; @Override public void subscribe(MultiSubscriber subscriber) { @@ -79,6 +83,8 @@ private SyncRestMulti(Builder builder) { this.multi = builder.multi; this.status = builder.status; this.headers = builder.headers; + this.demand = builder.demand; + this.encodeAsJsonArray = builder.encodeAsJsonArray; } @Override @@ -91,17 +97,62 @@ public Map> getHeaders() { return headers; } - public static class Builder { - private final Multi multi; + public long getDemand() { + return demand; + } - private Integer status; + public boolean encodeAsJsonArray() { + return encodeAsJsonArray; + } + public static class Builder { + private final Multi multi; private final MultivaluedTreeMap headers = new CaseInsensitiveMap<>(); + private Integer status; + private long demand = 1; + private boolean encodeAsJsonArray = true; private Builder(Multi multi) { this.multi = Objects.requireNonNull(multi, "multi cannot be null"); } + /** + * Configure the {@code demand} signaled to the wrapped {@link Multi}, defaults to {@code 1}. + * + *

+ * A demand of {@code 1} guarantees serial/sequential processing, any higher demand supports + * concurrent processing. A demand greater {@code 1}, with concurrent {@link Multi} processing, + * does not guarantee element order - this means that elements emitted by the + * {@link RestMulti#fromMultiData(Multi) RestMulti.fromMultiData(Multi)} source Multi} + * will be produced in a non-deterministic order. + * + * @see MultiMerge#withConcurrency(int) Multi.createBy().merging().withConcurrency(int) + * @see Multi#capDemandsTo(long) + * @see Multi#capDemandsUsing(LongFunction) + */ + public Builder withDemand(long demand) { + if (demand <= 0) { + throw new IllegalArgumentException("Demand must be greater than zero"); + } + this.demand = demand; + return this; + } + + /** + * Configure whether objects produced by the wrapped {@link Multi} are encoded as JSON array elements, which is the + * default. + * + *

+ * {@code encodeAsJsonArray(false)} produces separate JSON objects. + * + *

+ * This property is only used for JSON object results and ignored for SSE and chunked streaming. + */ + public Builder encodeAsJsonArray(boolean encodeAsJsonArray) { + this.encodeAsJsonArray = encodeAsJsonArray; + return this; + } + public Builder status(int status) { this.status = status; return this; diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java index 4d097feaad874..c1af678260549 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java @@ -10,9 +10,6 @@ import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; -import java.util.function.Consumer; import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.sse.OutboundSseEvent; @@ -51,8 +48,9 @@ public void setStreamingResponseCustomizers(List st private static class SseMultiSubscriber extends AbstractMultiSubscriber { - SseMultiSubscriber(ResteasyReactiveRequestContext requestContext, List staticCustomizers) { - super(requestContext, staticCustomizers); + SseMultiSubscriber(ResteasyReactiveRequestContext requestContext, List staticCustomizers, + long demand) { + super(requestContext, staticCustomizers, demand); } @Override @@ -63,67 +61,43 @@ public void onNext(Object item) { } else { event = new OutboundSseEventImpl.BuilderImpl().data(item).build(); } - SseUtil.send(requestContext, event, staticCustomizers).whenComplete(new BiConsumer() { - @Override - public void accept(Object v, Throwable t) { - if (t != null) { - // need to cancel because the exception didn't come from the Multi - subscription.cancel(); - handleException(requestContext, t); - } else { - // send in the next item - subscription.request(1); - } + SseUtil.send(requestContext, event, staticCustomizers).whenComplete((v, t) -> { + if (t != null) { + // need to cancel because the exception didn't come from the Multi + subscription.cancel(); + handleException(requestContext, t); + } else { + // send in the next item + subscription.request(demand); } }); } } - private static class ChunkedStreamingMultiSubscriber extends StreamingMultiSubscriber { + @SuppressWarnings("rawtypes") + private static class StreamingMultiSubscriber extends AbstractMultiSubscriber { private static final String LINE_SEPARATOR = "\n"; - private boolean isFirstItem = true; - - ChunkedStreamingMultiSubscriber(ResteasyReactiveRequestContext requestContext, - List staticCustomizers, Publisher publisher, boolean json) { - super(requestContext, staticCustomizers, publisher, json); - } - - @Override - protected String messagePrefix() { - // When message is chunked, we don't need to add prefixes at first - return null; - } - - @Override - protected String messageSuffix() { - return LINE_SEPARATOR; - } - - @Override - protected String onCompleteText() { - // When message is chunked, we don't need to add text at the end of the messages - return null; - } - } - - private static class StreamingMultiSubscriber extends AbstractMultiSubscriber { + private final Publisher publisher; + private final boolean json; + private final boolean encodeAsJsonArray; // Huge hack to stream valid json - private boolean json; - private String nextJsonPrefix; - private boolean hadItem; - - private final Publisher publisher; + private volatile String nextJsonPrefix; + private volatile boolean hadItem; StreamingMultiSubscriber(ResteasyReactiveRequestContext requestContext, List staticCustomizers, Publisher publisher, - boolean json) { - super(requestContext, staticCustomizers); + boolean json, long demand, boolean encodeAsJsonArray) { + super(requestContext, staticCustomizers, demand); this.publisher = publisher; this.json = json; - this.nextJsonPrefix = "["; + // encodeAsJsonArray == true means JSON array "encoding" + // encodeAsJsonArray == false mean no prefix, no suffix and LF as message separator, + // also used for/same as chunked-streaming + this.encodeAsJsonArray = encodeAsJsonArray; + this.nextJsonPrefix = encodeAsJsonArray ? "[" : null; this.hadItem = false; } @@ -132,33 +106,29 @@ public void onNext(Object item) { List customizers = determineCustomizers(!hadItem); hadItem = true; StreamingUtil.send(requestContext, customizers, item, messagePrefix(), messageSuffix()) - .handle(new BiFunction() { - @Override - public Object apply(Object v, Throwable t) { - if (t != null) { - // need to cancel because the exception didn't come from the Multi - try { - subscription.cancel(); - } catch (Throwable t2) { - t2.printStackTrace(); - } - handleException(requestContext, t); - } else { - // next item will need this prefix if json - nextJsonPrefix = ","; - // send in the next item - subscription.request(1); + .handle((v, t) -> { + if (t != null) { + // need to cancel because the exception didn't come from the Multi + try { + subscription.cancel(); + } catch (Throwable t2) { + t2.printStackTrace(); } - return null; + handleException(requestContext, t); + } else { + // next item will need this prefix if json + nextJsonPrefix = encodeAsJsonArray ? "," : null; + // send in the next item + subscription.request(demand); } + return null; }); } private List determineCustomizers(boolean isFirst) { // we only need to obtain the customizers from the Publisher if it's the first time we are sending data and the Publisher has customizable data // at this point no matter the type of RestMulti we can safely obtain the headers and status - if (isFirst && (publisher instanceof RestMulti)) { - RestMulti restMulti = (RestMulti) publisher; + if (isFirst && (publisher instanceof RestMulti restMulti)) { Map> headers = restMulti.getHeaders(); Integer status = restMulti.getStatus(); if (headers.isEmpty() && (status == null)) { @@ -201,6 +171,10 @@ public void onComplete() { } protected String onCompleteText() { + if (!encodeAsJsonArray) { + return null; + } + String postfix; // check if we never sent the open prefix if (!hadItem) { @@ -218,20 +192,23 @@ protected String messagePrefix() { } protected String messageSuffix() { - return null; + return !encodeAsJsonArray ? LINE_SEPARATOR : null; } } static abstract class AbstractMultiSubscriber implements Subscriber { - protected Subscription subscription; - protected ResteasyReactiveRequestContext requestContext; - protected List staticCustomizers; - private boolean weClosed = false; + protected final ResteasyReactiveRequestContext requestContext; + protected final List staticCustomizers; + protected final long demand; + + protected volatile Subscription subscription; + private volatile boolean weClosed = false; AbstractMultiSubscriber(ResteasyReactiveRequestContext requestContext, - List staticCustomizers) { + List staticCustomizers, long demand) { this.requestContext = requestContext; this.staticCustomizers = staticCustomizers; + this.demand = demand; // let's make sure we never restart by accident, also make sure we're not marked as completed requestContext.restart(AWOL, true); requestContext.serverResponse().addCloseHandler(() -> { @@ -245,7 +222,7 @@ static abstract class AbstractMultiSubscriber implements Subscriber { public void onSubscribe(Subscription s) { this.subscription = s; // initially ask for one item - s.request(1); + s.request(demand); } @Override @@ -279,13 +256,8 @@ protected void handleException(ResteasyReactiveRequestContext requestContext, Th private static final Logger log = Logger.getLogger(PublisherResponseHandler.class); private static final ServerRestHandler[] AWOL = new ServerRestHandler[] { - new ServerRestHandler() { - - @Override - public void handle(ResteasyReactiveRequestContext requestContext) - throws Exception { - throw new IllegalStateException("FAILURE: should never be restarted"); - } + requestContext -> { + throw new IllegalStateException("FAILURE: should never be restarted"); } }; @@ -296,8 +268,7 @@ public void handle(ResteasyReactiveRequestContext requestContext) throws Excepti if (requestContextResult instanceof org.reactivestreams.Publisher) { requestContextResult = AdaptersToFlow.publisher((org.reactivestreams.Publisher) requestContextResult); } - if (requestContextResult instanceof Publisher) { - Publisher result = (Publisher) requestContextResult; + if (requestContextResult instanceof Publisher result) { // FIXME: if we make a pretend Response and go through the normal route, we will get // media type negotiation and fixed entity writer set up, perhaps it's better than // cancelling the normal route? @@ -310,7 +281,6 @@ public void handle(ResteasyReactiveRequestContext requestContext) throws Excepti throw new IllegalStateException( "Negotiation or dynamic media type resolution for Multi is only supported when using 'org.jboss.resteasy.reactive.RestMulti'"); } - } MediaType[] mediaTypes = produces.getSortedOriginalMediaTypes(); if (mediaTypes.length != 1) { @@ -343,24 +313,43 @@ private boolean requiresChunkedStream(MediaType mediaType) { } private void handleChunkedStreaming(ResteasyReactiveRequestContext requestContext, Publisher result, boolean json) { - result.subscribe(new ChunkedStreamingMultiSubscriber(requestContext, streamingResponseCustomizers, result, json)); + long demand = 1L; + if (result instanceof RestMulti.SyncRestMulti) { + RestMulti.SyncRestMulti rest = (RestMulti.SyncRestMulti) result; + demand = rest.getDemand(); + } + result.subscribe( + new StreamingMultiSubscriber(requestContext, streamingResponseCustomizers, result, json, demand, false)); } private void handleStreaming(ResteasyReactiveRequestContext requestContext, Publisher result, boolean json) { - result.subscribe(new StreamingMultiSubscriber(requestContext, streamingResponseCustomizers, result, json)); + long demand = 1L; + boolean encodeAsJsonArray = true; + if (result instanceof RestMulti.SyncRestMulti) { + RestMulti.SyncRestMulti rest = (RestMulti.SyncRestMulti) result; + demand = rest.getDemand(); + encodeAsJsonArray = rest.encodeAsJsonArray(); + } + result.subscribe(new StreamingMultiSubscriber(requestContext, streamingResponseCustomizers, result, json, demand, + encodeAsJsonArray)); } private void handleSse(ResteasyReactiveRequestContext requestContext, Publisher result) { + long demand; + if (result instanceof RestMulti.SyncRestMulti) { + RestMulti.SyncRestMulti rest = (RestMulti.SyncRestMulti) result; + demand = rest.getDemand(); + } else { + demand = 1L; + } + SseUtil.setHeaders(requestContext, requestContext.serverResponse(), streamingResponseCustomizers); requestContext.suspend(); - requestContext.serverResponse().write(EMPTY_BUFFER, new Consumer() { - @Override - public void accept(Throwable throwable) { - if (throwable == null) { - result.subscribe(new SseMultiSubscriber(requestContext, streamingResponseCustomizers)); - } else { - requestContext.resume(throwable); - } + requestContext.serverResponse().write(EMPTY_BUFFER, throwable -> { + if (throwable == null) { + result.subscribe(new SseMultiSubscriber(requestContext, streamingResponseCustomizers, demand)); + } else { + requestContext.resume(throwable); } }); }