Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process messages immediately after is sent in chuncked streaming #30737

Merged
merged 1 commit into from
May 17, 2023

Conversation

Sgitario
Copy link
Contributor

Fix #30690

while (start < bytes.length) {
int end = bytes.length;
for (int i = start; i < bytes.length - separator.length; i++) {
for (int i = start; i < end; i++) {
Copy link
Contributor Author

@Sgitario Sgitario Jan 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I modified this logic to work regardless of the position of the delimiter.

@@ -141,7 +141,6 @@ public void testStreamJsonMultiFromMulti() {

private void testJsonMulti(String path) {
Client client = ClientBuilder.newBuilder().register(new JacksonBasicMessageBodyReader(new ObjectMapper())).build();
;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unrelated to the changes, but it was a leftover

@geoand
Copy link
Contributor

geoand commented Jan 31, 2023

@FroMage mind taking a look as well?

@quarkus-bot

This comment has been minimized.

.subscribe()
.withSubscriber(AssertSubscriber.create(3))
// wait for 3 ticks plus some half tick ms of extra time (this should not be necessary, but CI is slow)
.awaitItems(3, Duration.ofMillis((TICK_EVERY_MS * 3) + (TICK_EVERY_MS / 2)));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be duration of (TICK_EVERY_MS * 2) ... as 1st item is issued at time 0, 2nd at time 200ms, 3rd at time 400ms. So wait time 200ms * 2,5 should be correct

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not have to be precise taking into account that there are more things involved (REST call, subscriptions, etc).
According to my experience, 200 ms of back time is enough to reproduce the issue and verify this is indeed fixed.

@gsmet gsmet added the triage/needs-rebase This PR needs to be rebased first because it has merge conflicts label Feb 4, 2023
@Sgitario Sgitario removed the triage/needs-rebase This PR needs to be rebased first because it has merge conflicts label Feb 6, 2023
@geoand geoand requested a review from FroMage February 6, 2023 06:14
@quarkus-bot

This comment has been minimized.

@paloliska
Copy link

Will this be also backported to 2.16.x branch?

@geoand
Copy link
Contributor

geoand commented Feb 24, 2023

@FroMage please review this

@paloliska
Copy link

Is it possible to merge this, please?

@paloliska
Copy link

@FroMage could you please check this?
or @geoand is it possible to merge this with just 1 approval?

@geoand
Copy link
Contributor

geoand commented Mar 14, 2023

It's possible, but really want a review from @FroMage on this one

Copy link
Member

@FroMage FroMage left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds fair to send a newline after each element… except that we don't know if there will be a next one, at this stage.
So won't a stream of one string be indistinguishable from a stream of one string plus one empty string, from the client's perspective?

@Sgitario
Copy link
Contributor Author

It sounds fair to send a newline after each element… except that we don't know if there will be a next one, at this stage. So won't a stream of one string be indistinguishable from a stream of one string plus one empty string, from the client's perspective?

Tbh, I don't follow what you're asking or requesting as per your comment. Is there a use case you foresee that these changes won't work? In terms of functionality, this will work the same as before, the only thing that changes is when to process the message.

@paloliska
Copy link

@FroMage do you need some input or help to test this PR? I could make some additional tests if you have any scenarios in mind.

@paloliska
Copy link

issue #30690 updated with reproducer for quarkus 3.0.1.Final

@Sgitario
Copy link
Contributor Author

issue #30690 updated with reproducer for quarkus 3.0.1.Final

Thanks! @FroMage any updates on this?

@quarkus-bot

This comment has been minimized.

@geoand
Copy link
Contributor

geoand commented May 9, 2023

@FroMage can you have a look at this one please?

Copy link
Member

@FroMage FroMage left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay.

It's the names that are super confusing.

My concern was whether we would be able from the client's perspective to deal with adding \n at the end of each element, even when we don't know there's a next one.

Consider the following data sent and how they're received:

  • Sending nothing → client receives nothing, that's a 0-element reception of no chunks
  • Sending "foo" → client receives one element with data foo
  • Sending "foo\nbar" → client receives two elements with data foo and bar
  • Sending "foo\n" → client receives two elements with data foo and `` (the empty string)

So, previously, \n was a separator between elements, and now it's a line terminator. This could lead to bad issues.

Now, it happens that we're streaming differently in different cases:

  • APPLICATION_NDJSON_TYPE and APPLICATION_STREAM_JSON_TYPE will stream using ChunkedStreamingMultiSubscriber, and since both types are JSON types, we can only stream JSON values, where Strings are delimited by "" and so a trailing empty line will never be considered as a valid value, and hopefully will be discarded by the client. I haven't checked the specs, though, so I'm not sure it's even valid.
  • Any other streaming case that isn't SSE will use StreamingMultiSubscriber (which is a supertype of ChunkedStreamingMultiSubscriber and frankly that is confusing, because the names are just confusing), which allows JSON or regular HTTP streaming. Now, in this case, adding \n as a line terminator would be very problematic, especially for regular HTTP streaming. But this class doesn't add line separators in this PR, so it's fine.

Frankly, this is much more complex and delicate than it appears at first by just glancing at this PR. I haven't checked the two specs whose behaviour this modifies and I hope it won't affect clients with a trailing empty chunk. I am reasonably confident it should not, given that JSON doesn't allow empty values and would consider it as whitespace, but I hope a JS client would not produce an error. I hope the PR authors checked the specs :)

@geoand geoand added the triage/needs-rebase This PR needs to be rebased first because it has merge conflicts label May 16, 2023
@Sgitario
Copy link
Contributor Author

Consider the following data sent and how they're received:

  • Sending nothing → client receives nothing, that's a 0-element reception of no chunks
  • Sending "foo" → client receives one element with data foo
  • Sending "foo\nbar" → client receives two elements with data foo and bar
  • Sending "foo\n" → client receives two elements with data foo and `` (the empty string)

I've just checked that when sending foo\n, the client will only receive one element, so I think we're fine here.

@FroMage
Copy link
Member

FroMage commented May 16, 2023

I've just checked that when sending foo\n, the client will only receive one element, so I think we're fine here.

Which client? With what mime type? Using chunked encoding?

@Sgitario
Copy link
Contributor Author

I've just checked that when sending foo\n, the client will only receive one element, so I think we're fine here.

Which client? With what mime type? Using chunked encoding?

REST Client Reactive:

@GET
@Path("/stream/string")
@Produces(RestMediaType.APPLICATION_STREAM_JSON)
@RestStreamElementType(MediaType.APPLICATION_JSON)
Multi<String> readString();

Server side:

@Path("/stream")
public static class StreamingResource {
    @GET
    @Path("/string")
    @Produces(RestMediaType.APPLICATION_STREAM_JSON)
    @RestStreamElementType(MediaType.APPLICATION_JSON)
    public String readString() {
        return "\"one\"\n\"two\"\n\"3\"\n\"four\"\n";
    }

@FroMage
Copy link
Member

FroMage commented May 16, 2023

Yeah, those are not the clients I was most worried about. But I guess we'll hear about JS clients if it breaks :)

@Sgitario
Copy link
Contributor Author

Yeah, those are not the clients I was most worried about. But I guess we'll hear about JS clients if it breaks :)

Oh, I see now, sorry for my misunderstanding!

@Sgitario Sgitario removed the triage/needs-rebase This PR needs to be rebased first because it has merge conflicts label May 16, 2023
@FroMage
Copy link
Member

FroMage commented May 16, 2023

Oh, I see now, sorry for my misunderstanding!

My fault, I often make answers too short to properly understand, sorry :)

@quarkus-bot
Copy link

quarkus-bot bot commented May 16, 2023

Failing Jobs - Building 1187551

Status Name Step Failures Logs Raw logs
✔️ JVM Tests - JDK 11
JVM Tests - JDK 17 Build Failures Logs Raw logs
✔️ JVM Tests - JDK 19

Full information is available in the Build summary check run.

Failures

⚙️ JVM Tests - JDK 17 #

- Failing: extensions/opentelemetry/deployment 
! Skipped: extensions/micrometer-registry-prometheus/deployment extensions/micrometer/deployment extensions/quartz/deployment and 28 more

📦 extensions/opentelemetry/deployment

io.quarkus.opentelemetry.deployment.instrumentation.GraphQLOpenTelemetryTest.nestedCdiBeanInsideQueryTraceTest line 133 - More details - Source on GitHub

org.opentest4j.AssertionFailedError: Received: [SpanData{spanContext=ImmutableSpanContext{traceId=d21b47de24d914706e15096fbf972bfe, spanId=23156813af914441, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, parentSpanContext=ImmutableSpanContext{traceId=d21b47de24d914706e15096fbf972bfe, spanId=d7af0936e47118d0, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, resource=Resource{schemaUrl=https://opentelemetry.io/schemas/1.19.0, attributes={service.name="quarkus-opentelemetry-deployment", service.version="999-SNAPSHOT", telemetry.sdk.language="java", telemetry.sdk.name="opentelemetry", telemetry.sdk.version="1.25.0", webengine.name="Quarkus", webengine.version="999-SNAPSHOT"}}, instrumentationScopeInfo=InstrumentationScopeInfo{name=io.quarkus.opentelemetry, version=null, schemaUrl=null, attributes={}}, name=CustomCDIBean.waitForSomeTime, kind=INTERNAL, startEpochNanos=1684235590623271852, endEpochNanos=16842355...

@Sgitario
Copy link
Contributor Author

The test failures are unrelated. Merging.

@Sgitario Sgitario merged commit b9fa4dd into quarkusio:main May 17, 2023
@Sgitario Sgitario deleted the 30690 branch May 17, 2023 05:39
@quarkus-bot quarkus-bot bot added this to the 3.1 - main milestone May 17, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Json stream item is not processed immediately after receiving in ChunkedStreamingMultiSubscriber.class
5 participants