Skip to content

Commit

Permalink
Implement support for NDJSON streaming in vertex-web
Browse files Browse the repository at this point in the history
  • Loading branch information
ntrp committed Jun 27, 2021
1 parent 0473326 commit 18fdfe4
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,7 @@
import io.quarkus.gizmo.FieldCreator;
import io.quarkus.gizmo.MethodDescriptor;
import io.quarkus.gizmo.ResultHandle;
import io.quarkus.vertx.web.runtime.MultiJsonArraySupport;
import io.quarkus.vertx.web.runtime.MultiSseSupport;
import io.quarkus.vertx.web.runtime.MultiSupport;
import io.quarkus.vertx.web.runtime.RouteHandler;
import io.quarkus.vertx.web.runtime.RouteHandlers;
import io.quarkus.vertx.web.runtime.ValidationSupport;
import io.quarkus.vertx.web.runtime.*;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniSubscribe;
Expand Down Expand Up @@ -114,6 +109,15 @@ class Methods {
"subscribeObject",
Void.TYPE, Multi.class, RoutingContext.class);

static final MethodDescriptor IS_NDJSON = MethodDescriptor.ofMethod(MultiNdjsonSupport.class, "isNdjson", Boolean.TYPE,
Multi.class);
static final MethodDescriptor MULTI_NDJSON_SUBSCRIBE_STRING = MethodDescriptor.ofMethod(MultiNdjsonSupport.class,
"subscribeString",
Void.TYPE, Multi.class, RoutingContext.class);
static final MethodDescriptor MULTI_NDJSON_SUBSCRIBE_OBJECT = MethodDescriptor.ofMethod(MultiNdjsonSupport.class,
"subscribeObject",
Void.TYPE, Multi.class, RoutingContext.class);

static final MethodDescriptor IS_JSON_ARRAY = MethodDescriptor.ofMethod(MultiJsonArraySupport.class, "isJsonArray",
Boolean.TYPE, Multi.class);
static final MethodDescriptor MULTI_JSON_SUBSCRIBE_VOID = MethodDescriptor.ofMethod(MultiJsonArraySupport.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,13 @@ void implementInvoke(HandlerDescriptor descriptor, BeanInfo bean, MethodInfo met
isSSE.close();

BytecodeCreator isNotSSE = isItSSE.falseBranch();
BranchResult isItJson = isNotSSE.ifTrue(isNotSSE.invokeStaticMethod(Methods.IS_JSON_ARRAY, res));
BranchResult isItNdJson = isNotSSE.ifTrue(isNotSSE.invokeStaticMethod(Methods.IS_NDJSON, res));
BytecodeCreator isNdjson = isItNdJson.trueBranch();
handleNdjsonMulti(descriptor, isNdjson, routingContext, res);
isNdjson.close();

BytecodeCreator isNotNdjson = isItNdJson.falseBranch();
BranchResult isItJson = isNotNdjson.ifTrue(isNotNdjson.invokeStaticMethod(Methods.IS_JSON_ARRAY, res));
BytecodeCreator isJson = isItJson.trueBranch();
handleJsonArrayMulti(descriptor, isJson, routingContext, res);
isJson.close();
Expand Down Expand Up @@ -918,6 +924,28 @@ private void handleSSEMulti(HandlerDescriptor descriptor, BytecodeCreator writer
}
}

private void handleNdjsonMulti(HandlerDescriptor descriptor, BytecodeCreator writer, ResultHandle rc,
ResultHandle res) {
// The method returns a Multi that needs to be written as server-sent event.
// We subscribe to this Multi and write the provided items (one by one) in the HTTP response.
// On completion, we "end" the response
// If the method returned null, we fail
// If the provided item is null we fail
// If the multi is empty, and the method return a Multi<Void>, we reply with a 204 - NO CONTENT (as regular)
// If the produced item is a string or buffer, the response.write method is used to write the events in the response
// If the produced item is an object, the item is mapped to JSON and included in the `data` section of the event.

if (Methods.isNoContent(descriptor)) { // Multi<Void> - so return a 204.
writer.invokeStaticMethod(Methods.MULTI_SUBSCRIBE_VOID, res, rc);
} else if (descriptor.isContentTypeString()) {
writer.invokeStaticMethod(Methods.MULTI_NDJSON_SUBSCRIBE_STRING, res, rc);
} else if (descriptor.isContentTypeBuffer() || descriptor.isContentTypeMutinyBuffer()) {
writer.invokeStaticMethod(Methods.MULTI_JSON_FAIL, rc);
} else { // Multi<Object> - encode to json.
writer.invokeStaticMethod(Methods.MULTI_NDJSON_SUBSCRIBE_OBJECT, res, rc);
}
}

private void handleJsonArrayMulti(HandlerDescriptor descriptor, BytecodeCreator writer, ResultHandle rc,
ResultHandle res) {
// The method returns a Multi that needs to be written as JSON Array.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.Objects;

import io.quarkus.vertx.web.runtime.JsonArrayMulti;
import io.quarkus.vertx.web.runtime.NdjsonMulti;
import io.quarkus.vertx.web.runtime.SSEMulti;
import io.smallrye.mutiny.Multi;

Expand Down Expand Up @@ -33,7 +34,7 @@ private ReactiveRoutes() {
* {@link ServerSentEvent#event()}.
* <p>
* Example of usage:
*
*
* <pre>
* &#64;Route(path = "/people")
* Multi&lt;Person&gt; people(RoutingContext context) {
Expand All @@ -52,6 +53,43 @@ public static <T> Multi<T> asEventStream(Multi<T> multi) {
return new SSEMulti<>(Objects.requireNonNull(multi, "The passed multi must not be `null`"));
}

/**
* Indicates the the given stream should be written as a Json stream in the response.
* Returning a {@code multi} wrapped using this method produces a {@code application/x-ndjson} response. Each item
* is written as an serialized json on a new line in the response. The response automatically enables the chunked
* encoding and set the content type.
* <p>
* If the item is a String, the content will be wrapped in quotes and written.
* If the item is an Object, then the JSON representation of this object will be written.
* <p>
* Example of usage:
*
* <pre>
* &#64;Route(path = "/people")
* Multi&lt;Person&gt; people(RoutingContext context) {
* return ReactiveRoutes.asJsonStream(Multi.createFrom().items(
* new Person("superman", 1),
* new Person("batman", 2),
* new Person("spiderman", 3)));
* }
* </pre>
*
* This example produces:
*
* <pre>
* {"name":"superman", "id":1}
* {...}
* {...}
* </pre>
*
* @param multi the multi to be written
* @param <T> the type of item, can be string, object
* @return the wrapped multi
*/
public static <T> Multi<T> asJsonStream(Multi<T> multi) {
return new NdjsonMulti<>(Objects.requireNonNull(multi, "The passed multi must not be `null`"));
}

/**
* Indicates the the given stream should be written as a <em>chunked</em> JSON array in the response.
* Returning a {@code multi} wrapped using this method produces a {@code application/json} response. Each item
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package io.quarkus.vertx.web.runtime;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import io.smallrye.mutiny.Multi;
import io.vertx.core.AsyncResult;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.Json;
import io.vertx.ext.web.RoutingContext;

@SuppressWarnings("ReactiveStreamsSubscriberImplementation")
public class MultiNdjsonSupport {

private MultiNdjsonSupport() {
// Avoid direct instantiation.
}

private static void initialize(HttpServerResponse response, RoutingContext rc) {
if (response.bytesWritten() == 0) {
MultiMap headers = response.headers();
if (rc.getAcceptableContentType() == null) {
headers.set(HttpHeaders.CONTENT_TYPE, "application/x-ndjson");
}
response.setChunked(true);
}
}

public static void subscribeString(Multi<String> multi, RoutingContext rc) {
write(multi.map(s -> Buffer.buffer("\"" + s + "\"")), rc);
}

public static void subscribeObject(Multi<Object> multi, RoutingContext rc) {
write(multi.map(o -> Buffer.buffer(Json.encode(o) + "\n")), rc);
}

private static void onWriteDone(Subscription subscription, AsyncResult<Void> ar, RoutingContext rc) {
if (ar.failed()) {
rc.fail(ar.cause());
} else {
subscription.request(1);
}
}

public static void write(Multi<Buffer> multi, RoutingContext rc) {
HttpServerResponse response = rc.response();
multi.subscribe().withSubscriber(new Subscriber<Buffer>() {
Subscription upstream;

@Override
public void onSubscribe(Subscription subscription) {
this.upstream = subscription;
this.upstream.request(1);
}

@Override
public void onNext(Buffer item) {
initialize(response, rc);
response.write(item, ar -> onWriteDone(upstream, ar, rc));
}

@Override
public void onError(Throwable throwable) {
rc.fail(throwable);
}

@Override
public void onComplete() {
endOfStream(response, rc);
}
});
}

private static void endOfStream(HttpServerResponse response, RoutingContext rc) {
if (response.bytesWritten() == 0) { // No item
MultiMap headers = response.headers();
if (rc.getAcceptableContentType() == null) {
headers.set(HttpHeaders.CONTENT_TYPE, "application/x-ndjson");
}
}
response.end();
}

public static boolean isNdjson(Multi<?> multi) {
return multi instanceof NdjsonMulti;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.quarkus.vertx.web.runtime;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.operators.AbstractMulti;
import io.smallrye.mutiny.subscription.MultiSubscriber;

/**
* Just a wrapped to capture the fact that the items must be written as SSE.
*
* @param <T> the type of item.
*/
public class NdjsonMulti<T> extends AbstractMulti<T> {

private final Multi<T> multi;

public NdjsonMulti(Multi<T> multi) {
this.multi = multi;
}

@Override
public void subscribe(MultiSubscriber<? super T> subscriber) {
multi.subscribe(Infrastructure.onMultiSubscription(multi, subscriber));
}
}

0 comments on commit 18fdfe4

Please sign in to comment.