Skip to content

Commit

Permalink
Enhance RestMulti, configurable demand + distinct objects
Browse files Browse the repository at this point in the history
Adds two enhancements:

1. Produce multiple JSON objects, not just an array
2. Make requested demand configurable

== Produce multiple JSON objects

Currently, `PublisherResponseHandler.StreamingMultiSubscriber` produces a JSON array, where each emitted item is encoded as a JSON array element. For some use cases it is easier to consume a bunch of "bare" JSON objects - i.e. just write the individual JSON objects, possibly separated by a newline. As an option, of course.

Proposal to add:
```java
RestMulti.fromMultiData(multi).encodeAsArray(false)...
```

With `encodeAsArray(false)`, the produced JSON would look like this:
```json
{"some": "value"}
{"some": "value"}
{"some": "value"}
```

`encodeAsArray(true)` or omitting it would use the current behavior and produce something like this:
```json
[{"some": "value"},
{"some": "value"},
{"some": "value"}
}
```

== Configure request-demand

All implementations of `PublisherResponseHandler.AbstractMultiSubscriber` work with a hard-coded request-demand of `1`, which means that every emitted item is "produced"/"computed" serially / one-after-the-other. If the computation of individual items takes somewhat longer, possibly waiting for remote resources to reply, it makes sense to use a higher demand to produce multiple items concurrently.

For example, if each item takes maybe 250 ms (requesting data from a remote source) to be produced, and 100 items are produced, it currently takes 25 seconds. With a higher concurrency it would take a fraction of that time. I.e. if the use case is known to be not CPU but (async) I/O bound, it _might_ be legit/feasible to use a high demand.

Proposal to add:
```
RestMulti.fromMultiData(multi).withDemand( (long) 123 )...
```

Which would pass `123` as the demand for all call sites to `Subscription.request` in implementations of `PublisherResponseHandler.AbstractMultiSubscriber`.
  • Loading branch information
snazy committed Feb 15, 2024
1 parent 9d5e5b2 commit 6db9089
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public static class SyncRestMulti<T> extends RestMulti<T> {
private final Multi<T> multi;
private final Integer status;
private final MultivaluedTreeMap<String, String> headers;
private final long demand;
private final boolean encodeAsJsonArray;

@Override
public void subscribe(MultiSubscriber<? super T> subscriber) {
Expand All @@ -79,6 +81,8 @@ private SyncRestMulti(Builder<T> builder) {
this.multi = builder.multi;
this.status = builder.status;
this.headers = builder.headers;
this.demand = builder.demand;
this.encodeAsJsonArray = builder.encodeAsJsonArray;
}

@Override
Expand All @@ -91,17 +95,48 @@ public Map<String, List<String>> getHeaders() {
return headers;
}

public static class Builder<T> {
private final Multi<T> multi;
public long getDemand() {
return demand;
}

private Integer status;
public boolean encodeAsJsonArray() {
return encodeAsJsonArray;
}

public static class Builder<T> {
private final Multi<T> multi;
private final MultivaluedTreeMap<String, String> headers = new CaseInsensitiveMap<>();
private Integer status;
private long demand = 1;
private boolean encodeAsJsonArray = true;

private Builder(Multi<T> multi) {
this.multi = Objects.requireNonNull(multi, "multi cannot be null");
}

/**
* Configure the {@code demand} signaled to the wrapped {@link Multi}, defaults to {@code 1}.
*/
public Builder<T> withDemand(long demand) {
this.demand = demand;
return this;
}

/**
* Configure whether objects produced by the wrapped {@link Multi} are encoded as JSON array elements, which is the
* default.
*
* <p>
* {@code encodeAsJsonArray(false)} produces separate JSON objects.
*
* <p>
* This property is only used for JSON object results and ignored for SSE and chunked streaming.
*/
public Builder<T> encodeAsJsonArray(boolean encodeAsJsonArray) {
this.encodeAsJsonArray = encodeAsJsonArray;
return this;
}

public Builder<T> status(int status) {
this.status = status;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;

import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.sse.OutboundSseEvent;
Expand Down Expand Up @@ -51,8 +48,9 @@ public void setStreamingResponseCustomizers(List<StreamingResponseCustomizer> st

private static class SseMultiSubscriber extends AbstractMultiSubscriber {

SseMultiSubscriber(ResteasyReactiveRequestContext requestContext, List<StreamingResponseCustomizer> staticCustomizers) {
super(requestContext, staticCustomizers);
SseMultiSubscriber(ResteasyReactiveRequestContext requestContext, List<StreamingResponseCustomizer> staticCustomizers,
long demand) {
super(requestContext, staticCustomizers, demand);
}

@Override
Expand All @@ -63,67 +61,43 @@ public void onNext(Object item) {
} else {
event = new OutboundSseEventImpl.BuilderImpl().data(item).build();
}
SseUtil.send(requestContext, event, staticCustomizers).whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public void accept(Object v, Throwable t) {
if (t != null) {
// need to cancel because the exception didn't come from the Multi
subscription.cancel();
handleException(requestContext, t);
} else {
// send in the next item
subscription.request(1);
}
SseUtil.send(requestContext, event, staticCustomizers).whenComplete((v, t) -> {
if (t != null) {
// need to cancel because the exception didn't come from the Multi
subscription.cancel();
handleException(requestContext, t);
} else {
// send in the next item
subscription.request(demand);
}
});
}
}

private static class ChunkedStreamingMultiSubscriber extends StreamingMultiSubscriber {
@SuppressWarnings("rawtypes")
private static class StreamingMultiSubscriber extends AbstractMultiSubscriber {

private static final String LINE_SEPARATOR = "\n";

private boolean isFirstItem = true;

ChunkedStreamingMultiSubscriber(ResteasyReactiveRequestContext requestContext,
List<StreamingResponseCustomizer> staticCustomizers, Publisher publisher, boolean json) {
super(requestContext, staticCustomizers, publisher, json);
}

@Override
protected String messagePrefix() {
// When message is chunked, we don't need to add prefixes at first
return null;
}

@Override
protected String messageSuffix() {
return LINE_SEPARATOR;
}

@Override
protected String onCompleteText() {
// When message is chunked, we don't need to add text at the end of the messages
return null;
}
}

private static class StreamingMultiSubscriber extends AbstractMultiSubscriber {
private final Publisher publisher;
private final boolean json;
private final boolean encodeAsJsonArray;

// Huge hack to stream valid json
private boolean json;
private String nextJsonPrefix;
private boolean hadItem;

private final Publisher publisher;
private volatile String nextJsonPrefix;
private volatile boolean hadItem;

StreamingMultiSubscriber(ResteasyReactiveRequestContext requestContext,
List<StreamingResponseCustomizer> staticCustomizers, Publisher publisher,
boolean json) {
super(requestContext, staticCustomizers);
boolean json, long demand, boolean encodeAsJsonArray) {
super(requestContext, staticCustomizers, demand);
this.publisher = publisher;
this.json = json;
this.nextJsonPrefix = "[";
// encodeAsJsonArray == true means JSON array "encoding"
// encodeAsJsonArray == false mean no prefix, no suffix and LF as message separator,
// also used for/same as chunked-streaming
this.encodeAsJsonArray = encodeAsJsonArray;
this.nextJsonPrefix = encodeAsJsonArray ? "[" : null;
this.hadItem = false;
}

Expand All @@ -132,33 +106,29 @@ public void onNext(Object item) {
List<StreamingResponseCustomizer> customizers = determineCustomizers(!hadItem);
hadItem = true;
StreamingUtil.send(requestContext, customizers, item, messagePrefix(), messageSuffix())
.handle(new BiFunction<Object, Throwable, Object>() {
@Override
public Object apply(Object v, Throwable t) {
if (t != null) {
// need to cancel because the exception didn't come from the Multi
try {
subscription.cancel();
} catch (Throwable t2) {
t2.printStackTrace();
}
handleException(requestContext, t);
} else {
// next item will need this prefix if json
nextJsonPrefix = ",";
// send in the next item
subscription.request(1);
.handle((v, t) -> {
if (t != null) {
// need to cancel because the exception didn't come from the Multi
try {
subscription.cancel();
} catch (Throwable t2) {
t2.printStackTrace();
}
return null;
handleException(requestContext, t);
} else {
// next item will need this prefix if json
nextJsonPrefix = encodeAsJsonArray ? "," : "\"";
// send in the next item
subscription.request(demand);
}
return null;
});
}

private List<StreamingResponseCustomizer> determineCustomizers(boolean isFirst) {
// we only need to obtain the customizers from the Publisher if it's the first time we are sending data and the Publisher has customizable data
// at this point no matter the type of RestMulti we can safely obtain the headers and status
if (isFirst && (publisher instanceof RestMulti)) {
RestMulti<?> restMulti = (RestMulti<?>) publisher;
if (isFirst && (publisher instanceof RestMulti<?> restMulti)) {
Map<String, List<String>> headers = restMulti.getHeaders();
Integer status = restMulti.getStatus();
if (headers.isEmpty() && (status == null)) {
Expand Down Expand Up @@ -201,6 +171,10 @@ public void onComplete() {
}

protected String onCompleteText() {
if (!encodeAsJsonArray) {
return null;
}

String postfix;
// check if we never sent the open prefix
if (!hadItem) {
Expand All @@ -218,20 +192,23 @@ protected String messagePrefix() {
}

protected String messageSuffix() {
return null;
return json && !encodeAsJsonArray ? LINE_SEPARATOR : null;
}
}

static abstract class AbstractMultiSubscriber implements Subscriber<Object> {
protected Subscription subscription;
protected ResteasyReactiveRequestContext requestContext;
protected List<StreamingResponseCustomizer> staticCustomizers;
private boolean weClosed = false;
protected final ResteasyReactiveRequestContext requestContext;
protected final List<StreamingResponseCustomizer> staticCustomizers;
protected final long demand;

protected volatile Subscription subscription;
private volatile boolean weClosed = false;

AbstractMultiSubscriber(ResteasyReactiveRequestContext requestContext,
List<StreamingResponseCustomizer> staticCustomizers) {
List<StreamingResponseCustomizer> staticCustomizers, long demand) {
this.requestContext = requestContext;
this.staticCustomizers = staticCustomizers;
this.demand = demand;
// let's make sure we never restart by accident, also make sure we're not marked as completed
requestContext.restart(AWOL, true);
requestContext.serverResponse().addCloseHandler(() -> {
Expand All @@ -245,7 +222,7 @@ static abstract class AbstractMultiSubscriber implements Subscriber<Object> {
public void onSubscribe(Subscription s) {
this.subscription = s;
// initially ask for one item
s.request(1);
s.request(demand);
}

@Override
Expand Down Expand Up @@ -296,8 +273,7 @@ public void handle(ResteasyReactiveRequestContext requestContext) throws Excepti
if (requestContextResult instanceof org.reactivestreams.Publisher) {
requestContextResult = AdaptersToFlow.publisher((org.reactivestreams.Publisher<?>) requestContextResult);
}
if (requestContextResult instanceof Publisher) {
Publisher<?> result = (Publisher<?>) requestContextResult;
if (requestContextResult instanceof Publisher<?> result) {
// FIXME: if we make a pretend Response and go through the normal route, we will get
// media type negotiation and fixed entity writer set up, perhaps it's better than
// cancelling the normal route?
Expand All @@ -310,7 +286,6 @@ public void handle(ResteasyReactiveRequestContext requestContext) throws Excepti
throw new IllegalStateException(
"Negotiation or dynamic media type resolution for Multi is only supported when using 'org.jboss.resteasy.reactive.RestMulti'");
}

}
MediaType[] mediaTypes = produces.getSortedOriginalMediaTypes();
if (mediaTypes.length != 1) {
Expand Down Expand Up @@ -343,24 +318,43 @@ private boolean requiresChunkedStream(MediaType mediaType) {
}

private void handleChunkedStreaming(ResteasyReactiveRequestContext requestContext, Publisher<?> result, boolean json) {
result.subscribe(new ChunkedStreamingMultiSubscriber(requestContext, streamingResponseCustomizers, result, json));
long demand = 1L;
if (result instanceof RestMulti.SyncRestMulti) {
RestMulti.SyncRestMulti rest = (RestMulti.SyncRestMulti) result;
demand = rest.getDemand();
}
result.subscribe(
new StreamingMultiSubscriber(requestContext, streamingResponseCustomizers, result, json, demand, false));
}

private void handleStreaming(ResteasyReactiveRequestContext requestContext, Publisher<?> result, boolean json) {
result.subscribe(new StreamingMultiSubscriber(requestContext, streamingResponseCustomizers, result, json));
long demand = 1L;
boolean encodeAsJsonArray = false;
if (result instanceof RestMulti.SyncRestMulti) {
RestMulti.SyncRestMulti rest = (RestMulti.SyncRestMulti) result;
demand = rest.getDemand();
encodeAsJsonArray = rest.encodeAsJsonArray();
}
result.subscribe(new StreamingMultiSubscriber(requestContext, streamingResponseCustomizers, result, json, demand,
encodeAsJsonArray));
}

private void handleSse(ResteasyReactiveRequestContext requestContext, Publisher<?> result) {
long demand;
if (result instanceof RestMulti.SyncRestMulti) {
RestMulti.SyncRestMulti rest = (RestMulti.SyncRestMulti) result;
demand = rest.getDemand();
} else {
demand = 1L;
}

SseUtil.setHeaders(requestContext, requestContext.serverResponse(), streamingResponseCustomizers);
requestContext.suspend();
requestContext.serverResponse().write(EMPTY_BUFFER, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
if (throwable == null) {
result.subscribe(new SseMultiSubscriber(requestContext, streamingResponseCustomizers));
} else {
requestContext.resume(throwable);
}
requestContext.serverResponse().write(EMPTY_BUFFER, throwable -> {
if (throwable == null) {
result.subscribe(new SseMultiSubscriber(requestContext, streamingResponseCustomizers, demand));
} else {
requestContext.resume(throwable);
}
});
}
Expand Down

0 comments on commit 6db9089

Please sign in to comment.