From 829ad1869445e50642165d24f7e2841a894747b8 Mon Sep 17 00:00:00 2001 From: Jose Date: Tue, 31 Jan 2023 10:55:05 +0100 Subject: [PATCH] Process messages immediately after is sent in chuncked streaming Fix https://github.com/quarkusio/quarkus/issues/30690 --- .../test/streams/StreamTestCase.java | 1 - .../rest/client/reactive/StreamJsonTest.java | 39 +++++++++++++++++++ .../reactive/client/impl/MultiInvoker.java | 19 +-------- .../reactive/server/core/StreamingUtil.java | 10 ++++- .../handlers/PublisherResponseHandler.java | 30 ++++++++------ 5 files changed, 69 insertions(+), 30 deletions(-) diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/StreamTestCase.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/StreamTestCase.java index 7414d57504e3ae..a0e4c04a1719ba 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/StreamTestCase.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/StreamTestCase.java @@ -141,7 +141,6 @@ public void testStreamJsonMultiFromMulti() { private void testJsonMulti(String path) { Client client = ClientBuilder.newBuilder().register(new JacksonBasicMessageBodyReader(new ObjectMapper())).build(); - ; WebTarget target = client.target(uri.toString() + path); Multi multi = target.request().rx(MultiInvoker.class).get(Message.class); List list = multi.collect().asList().await().atMost(Duration.ofSeconds(30)); diff --git a/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/StreamJsonTest.java b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/StreamJsonTest.java index 07344da0fae7f8..21ae3dd8b2ed1b 100644 --- a/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/StreamJsonTest.java +++ b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/StreamJsonTest.java @@ -4,6 +4,7 @@ import static org.junit.jupiter.api.Assertions.fail; import java.net.URI; +import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.Objects; @@ -32,10 +33,14 @@ import io.quarkus.vertx.web.ReactiveRoutes; import io.quarkus.vertx.web.Route; import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.helpers.test.AssertSubscriber; import io.vertx.core.Vertx; import io.vertx.ext.web.RoutingContext; public class StreamJsonTest { + + private static final long TICK_EVERY_MS = 200; + @RegisterExtension static final QuarkusUnitTest TEST = new QuarkusUnitTest() .withApplicationRoot((jar) -> jar.addClasses(TestJacksonBasicMessageBodyReader.class)); @@ -109,6 +114,21 @@ void shouldReadNdjsonFromSingleMessage() throws InterruptedException { assertThat(collected).hasSize(4).containsAll(expected); } + /** + * Reproduce #30690. + */ + @Test + public void shouldReadUpToThreeTicks() { + createClient(uri) + .ticks() + .onItem() + .invoke(Objects::nonNull) + .subscribe() + .withSubscriber(AssertSubscriber.create(3)) + // wait for 3 ticks plus some half tick ms of extra time (this should not be necessary, but CI is slow) + .awaitItems(3, Duration.ofMillis((TICK_EVERY_MS * 3) + (TICK_EVERY_MS / 2))); + } + private Client createClient(URI uri) { return RestClientBuilder.newBuilder().baseUri(uri).register(new TestJacksonBasicMessageBodyReader()) .build(Client.class); @@ -133,6 +153,12 @@ public interface Client { @Produces(RestMediaType.APPLICATION_STREAM_JSON) @RestStreamElementType(MediaType.APPLICATION_JSON) Multi readPojoSingle(); + + @GET + @Path("/ticks") + @Produces(RestMediaType.APPLICATION_STREAM_JSON) + @RestStreamElementType(MediaType.APPLICATION_JSON) + Multi ticks(); } public static class ReactiveRoutesResource { @@ -199,6 +225,19 @@ public String getPojosAsString() throws JsonProcessingException { } return result.toString(); } + + @GET + @Path("/ticks") + @Produces(RestMediaType.APPLICATION_STREAM_JSON) + @RestStreamElementType(MediaType.APPLICATION_JSON) + public Multi getTicks() { + return Multi.createFrom() + .ticks() + .every(Duration.ofMillis(TICK_EVERY_MS)) + .log() + .onItem() + .transform((Long tick) -> "tick " + tick); + } } public static class Message { diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java index e3a8820add5f4a..e84f30da72ee6a 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java @@ -199,12 +199,9 @@ public void handle(Buffer buffer) { charset = charset == null ? "UTF-8" : charset; byte[] separator = "\n".getBytes(charset); int start = 0; - if (startsWith(bytes, separator)) { - start += separator.length; - } while (start < bytes.length) { int end = bytes.length; - for (int i = start; i < bytes.length - separator.length; i++) { + for (int i = start; i < end; i++) { if (bytes[i] == separator[0]) { int j; boolean matches = true; @@ -222,7 +219,7 @@ public void handle(Buffer buffer) { } if (start < end) { - ByteArrayInputStream in = new ByteArrayInputStream(bytes, start, end - start); + ByteArrayInputStream in = new ByteArrayInputStream(bytes, start, end); R item = restClientRequestContext.readEntity(in, responseType, mediaType, response.getMetadata()); multiRequest.emitter.emit(item); @@ -241,18 +238,6 @@ public void handle(Buffer buffer) { multiRequest.emitter.fail(t); } } - - private boolean startsWith(byte[] array, byte[] prefix) { - if (array.length < prefix.length) { - return false; - } - for (int i = 0; i < prefix.length; i++) { - if (array[i] != prefix[i]) { - return false; - } - } - return true; - } }); // this captures the end of the response diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/StreamingUtil.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/StreamingUtil.java index 8140b79ca2f394..560274fb4fe535 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/StreamingUtil.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/StreamingUtil.java @@ -24,7 +24,8 @@ public class StreamingUtil { public static CompletionStage send(ResteasyReactiveRequestContext context, - List customizers, Object entity, String prefix) { + List customizers, Object entity, String prefix, + String suffix) { ServerHttpResponse response = context.serverResponse(); if (response.closed()) { // FIXME: check spec @@ -46,6 +47,13 @@ public static CompletionStage send(ResteasyReactiveRequestContext context, System.arraycopy(data, 0, prefixedData, prefixBytes.length, data.length); data = prefixedData; } + if (suffix != null) { + byte[] suffixBytes = suffix.getBytes(StandardCharsets.US_ASCII); + byte[] suffixedData = new byte[data.length + suffixBytes.length]; + System.arraycopy(data, 0, suffixedData, 0, data.length); + System.arraycopy(suffixBytes, 0, suffixedData, data.length, suffixBytes.length); + data = suffixedData; + } return response.write(data); } diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java index d10f6fcfdaecd0..55fb71fbf69421 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java @@ -88,18 +88,18 @@ private static class ChunkedStreamingMultiSubscriber extends StreamingMultiSubsc @Override protected String messagePrefix() { // When message is chunked, we don't need to add prefixes at first - if (isFirstItem) { - isFirstItem = false; - return null; - } + return null; + } - // If it's not the first message, we need to append the messages with end of line delimiter. + @Override + protected String messageSuffix() { return LINE_SEPARATOR; } @Override protected String onCompleteText() { - return LINE_SEPARATOR; + // When message is chunked, we don't need to add text at the end of the messages + return null; } } @@ -121,7 +121,7 @@ private static class StreamingMultiSubscriber extends AbstractMultiSubscriber { @Override public void onNext(Object item) { hadItem = true; - StreamingUtil.send(requestContext, customizers, item, messagePrefix()) + StreamingUtil.send(requestContext, customizers, item, messagePrefix(), messageSuffix()) .handle(new BiFunction() { @Override public Object apply(Object v, Throwable t) { @@ -151,11 +151,15 @@ public void onComplete() { } if (json) { String postfix = onCompleteText(); - byte[] postfixBytes = postfix.getBytes(StandardCharsets.US_ASCII); - requestContext.serverResponse().write(postfixBytes).handle((v, t) -> { + if (postfix != null) { + byte[] postfixBytes = postfix.getBytes(StandardCharsets.US_ASCII); + requestContext.serverResponse().write(postfixBytes).handle((v, t) -> { + super.onComplete(); + return null; + }); + } else { super.onComplete(); - return null; - }); + } } else { super.onComplete(); } @@ -177,6 +181,10 @@ protected String messagePrefix() { // if it's json, the message prefix starts with `[`. return json ? nextJsonPrefix : null; } + + protected String messageSuffix() { + return null; + } } static abstract class AbstractMultiSubscriber implements Subscriber {