Skip to content

Commit

Permalink
Process messages immediately after is sent in chuncked streaming
Browse files Browse the repository at this point in the history
Fix #30690
  • Loading branch information
Sgitario committed Apr 28, 2023
1 parent 3f4e1fe commit 829ad18
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ public void testStreamJsonMultiFromMulti() {

private void testJsonMulti(String path) {
Client client = ClientBuilder.newBuilder().register(new JacksonBasicMessageBodyReader(new ObjectMapper())).build();
;
WebTarget target = client.target(uri.toString() + path);
Multi<Message> multi = target.request().rx(MultiInvoker.class).get(Message.class);
List<Message> list = multi.collect().asList().await().atMost(Duration.ofSeconds(30));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.junit.jupiter.api.Assertions.fail;

import java.net.URI;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -32,10 +33,14 @@
import io.quarkus.vertx.web.ReactiveRoutes;
import io.quarkus.vertx.web.Route;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.test.AssertSubscriber;
import io.vertx.core.Vertx;
import io.vertx.ext.web.RoutingContext;

public class StreamJsonTest {

private static final long TICK_EVERY_MS = 200;

@RegisterExtension
static final QuarkusUnitTest TEST = new QuarkusUnitTest()
.withApplicationRoot((jar) -> jar.addClasses(TestJacksonBasicMessageBodyReader.class));
Expand Down Expand Up @@ -109,6 +114,21 @@ void shouldReadNdjsonFromSingleMessage() throws InterruptedException {
assertThat(collected).hasSize(4).containsAll(expected);
}

/**
* Reproduce <a href="https://github.com/quarkusio/quarkus/issues/30690">#30690</a>.
*/
@Test
public void shouldReadUpToThreeTicks() {
createClient(uri)
.ticks()
.onItem()
.invoke(Objects::nonNull)
.subscribe()
.withSubscriber(AssertSubscriber.create(3))
// wait for 3 ticks plus some half tick ms of extra time (this should not be necessary, but CI is slow)
.awaitItems(3, Duration.ofMillis((TICK_EVERY_MS * 3) + (TICK_EVERY_MS / 2)));
}

private Client createClient(URI uri) {
return RestClientBuilder.newBuilder().baseUri(uri).register(new TestJacksonBasicMessageBodyReader())
.build(Client.class);
Expand All @@ -133,6 +153,12 @@ public interface Client {
@Produces(RestMediaType.APPLICATION_STREAM_JSON)
@RestStreamElementType(MediaType.APPLICATION_JSON)
Multi<Message> readPojoSingle();

@GET
@Path("/ticks")
@Produces(RestMediaType.APPLICATION_STREAM_JSON)
@RestStreamElementType(MediaType.APPLICATION_JSON)
Multi<String> ticks();
}

public static class ReactiveRoutesResource {
Expand Down Expand Up @@ -199,6 +225,19 @@ public String getPojosAsString() throws JsonProcessingException {
}
return result.toString();
}

@GET
@Path("/ticks")
@Produces(RestMediaType.APPLICATION_STREAM_JSON)
@RestStreamElementType(MediaType.APPLICATION_JSON)
public Multi<String> getTicks() {
return Multi.createFrom()
.ticks()
.every(Duration.ofMillis(TICK_EVERY_MS))
.log()
.onItem()
.transform((Long tick) -> "tick " + tick);
}
}

public static class Message {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,9 @@ public void handle(Buffer buffer) {
charset = charset == null ? "UTF-8" : charset;
byte[] separator = "\n".getBytes(charset);
int start = 0;
if (startsWith(bytes, separator)) {
start += separator.length;
}
while (start < bytes.length) {
int end = bytes.length;
for (int i = start; i < bytes.length - separator.length; i++) {
for (int i = start; i < end; i++) {
if (bytes[i] == separator[0]) {
int j;
boolean matches = true;
Expand All @@ -222,7 +219,7 @@ public void handle(Buffer buffer) {
}

if (start < end) {
ByteArrayInputStream in = new ByteArrayInputStream(bytes, start, end - start);
ByteArrayInputStream in = new ByteArrayInputStream(bytes, start, end);
R item = restClientRequestContext.readEntity(in, responseType, mediaType,
response.getMetadata());
multiRequest.emitter.emit(item);
Expand All @@ -241,18 +238,6 @@ public void handle(Buffer buffer) {
multiRequest.emitter.fail(t);
}
}

private boolean startsWith(byte[] array, byte[] prefix) {
if (array.length < prefix.length) {
return false;
}
for (int i = 0; i < prefix.length; i++) {
if (array[i] != prefix[i]) {
return false;
}
}
return true;
}
});

// this captures the end of the response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
public class StreamingUtil {

public static CompletionStage<?> send(ResteasyReactiveRequestContext context,
List<PublisherResponseHandler.StreamingResponseCustomizer> customizers, Object entity, String prefix) {
List<PublisherResponseHandler.StreamingResponseCustomizer> customizers, Object entity, String prefix,
String suffix) {
ServerHttpResponse response = context.serverResponse();
if (response.closed()) {
// FIXME: check spec
Expand All @@ -46,6 +47,13 @@ public static CompletionStage<?> send(ResteasyReactiveRequestContext context,
System.arraycopy(data, 0, prefixedData, prefixBytes.length, data.length);
data = prefixedData;
}
if (suffix != null) {
byte[] suffixBytes = suffix.getBytes(StandardCharsets.US_ASCII);
byte[] suffixedData = new byte[data.length + suffixBytes.length];
System.arraycopy(data, 0, suffixedData, 0, data.length);
System.arraycopy(suffixBytes, 0, suffixedData, data.length, suffixBytes.length);
data = suffixedData;
}
return response.write(data);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,18 @@ private static class ChunkedStreamingMultiSubscriber extends StreamingMultiSubsc
@Override
protected String messagePrefix() {
// When message is chunked, we don't need to add prefixes at first
if (isFirstItem) {
isFirstItem = false;
return null;
}
return null;
}

// If it's not the first message, we need to append the messages with end of line delimiter.
@Override
protected String messageSuffix() {
return LINE_SEPARATOR;
}

@Override
protected String onCompleteText() {
return LINE_SEPARATOR;
// When message is chunked, we don't need to add text at the end of the messages
return null;
}
}

Expand All @@ -121,7 +121,7 @@ private static class StreamingMultiSubscriber extends AbstractMultiSubscriber {
@Override
public void onNext(Object item) {
hadItem = true;
StreamingUtil.send(requestContext, customizers, item, messagePrefix())
StreamingUtil.send(requestContext, customizers, item, messagePrefix(), messageSuffix())
.handle(new BiFunction<Object, Throwable, Object>() {
@Override
public Object apply(Object v, Throwable t) {
Expand Down Expand Up @@ -151,11 +151,15 @@ public void onComplete() {
}
if (json) {
String postfix = onCompleteText();
byte[] postfixBytes = postfix.getBytes(StandardCharsets.US_ASCII);
requestContext.serverResponse().write(postfixBytes).handle((v, t) -> {
if (postfix != null) {
byte[] postfixBytes = postfix.getBytes(StandardCharsets.US_ASCII);
requestContext.serverResponse().write(postfixBytes).handle((v, t) -> {
super.onComplete();
return null;
});
} else {
super.onComplete();
return null;
});
}
} else {
super.onComplete();
}
Expand All @@ -177,6 +181,10 @@ protected String messagePrefix() {
// if it's json, the message prefix starts with `[`.
return json ? nextJsonPrefix : null;
}

protected String messageSuffix() {
return null;
}
}

static abstract class AbstractMultiSubscriber implements Subscriber<Object> {
Expand Down

0 comments on commit 829ad18

Please sign in to comment.