-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(client): support streaming inserts in Java client #5641
feat(client): support streaming inserts in Java client #5641
Conversation
@@ -254,6 +299,7 @@ private static HttpClient createHttpClient(final Vertx vertx, final ClientOption | |||
.setSsl(clientOptions.isUseTls()) | |||
.setUseAlpn(clientOptions.isUseAlpn()) | |||
.setProtocolVersion(HttpVersion.HTTP_2) | |||
.setHttp2ClearTextUpgrade(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @purplefox I had to set this since otherwise the new tests were failing with
java.lang.IllegalStateException: Cannot write an HTTP/2 frame over an HTTP/1.x connection
but I don't understand why that is. How come the cleartext upgrade prevents writing of HTTP/2 frames? Is the issue a race condition where the upgrade hasn't taken place by the time I try writing an HTTP/2 frame?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Relatedly, do you think this change needs to be backported to the 6.0.x branch? (The old integrations tests pass on 6.0.x without this change so it seems fine to not backport but I wonder if I'm not realizing additional implications of this change.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tbh I am not an expert on the http2 stuff here. I'd recommend posting on the vert.x google group.
* @return whether the internal buffer is 'full', i.e., if number of elements is at least | ||
* {@code bufferMaxSize}. | ||
*/ | ||
public synchronized boolean accept(final KsqlObject row) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't think of a way to make the methods in this class thread-safe without synchronizing all the public methods, but that doesn't feel right. Hopefully someone else has better ideas?
if ("ok".equals(status)) { | ||
final InsertAck ack = new InsertAckImpl(seqNum); | ||
final boolean full = acksPublisher.accept(ack); | ||
if (full && !paused) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because AcksPublisherImpl is a buffered publisher, if a user that is not interested in acks and does not subscribe a subscriber to the AcksPublisher, then the record parser will be paused and any errors later written to the response will not be read. On the one hand this seems bad, but OTOH if the user isn't consuming acks then they're likely not interested? An alternative would be to switch the AcksPublisher to not be buffered, which means we can continue to consume messages from the response and log any potential errors even if no subscriber is subscribed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's ok. If they don't subscribe they can't expect to get errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just a few comments
* publisher via the {@link #accept(KsqlObject)} method, and buffered for delivery once the | ||
* {@link Client#streamInserts} request is made and the server-side subscriber has been subscribed. | ||
*/ | ||
public class InsertsPublisher implements Publisher<KsqlObject> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not extend BufferedPublisher? Then you get most of this for free.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought when we first discussed providing InsertsPublisher and RowSubscriber (I plan to add this in a future PR) implementations for use with the client, we said the implementations shouldn't be tied to Vert.x. Did I misunderstand what you meant at the time?
Besides not having Vert.x, this InsertsPublisher is also simpler than BufferedPublisher in that it doesn't expose sending custom errors to the subscriber (since there's no reason to do that here), and a few other minor bits.
Also, by decoupling InsertsPublisher from BufferedPublisher, we can evolve BufferedPublisher in breaking ways without affecting the user-facing API.
WDYT?
if (endRequest) { | ||
request.end(requestBody); | ||
} else { | ||
final HttpClientRequest finalRequest = request; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the purpose of this block? Is it to write some data without ending the request? If so, request.write() should do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually had request.write() initially but hit
java.lang.IllegalStateException: You must set the Content-Length header to be the total size of the message body BEFORE sending any data if you are not using HTTP chunked encoding.
at io.vertx.core.http.impl.HttpClientRequestImpl.write(HttpClientRequestImpl.java:619)
at io.vertx.core.http.impl.HttpClientRequestImpl.write(HttpClientRequestImpl.java:575)
at io.vertx.core.http.impl.HttpClientRequestImpl.write(HttpClientRequestImpl.java:569)
at io.confluent.ksql.api.client.impl.ClientImpl.makeRequest(ClientImpl.java:241)
at io.confluent.ksql.api.client.impl.ClientImpl.streamInserts(ClientImpl.java:152)
at io.confluent.ksql.api.client.ClientTest.shouldStreamInserts(ClientTest.java:576)
My understanding is that request.write() is meant for (1) non-streaming use cases where the content length is known upfront, and (2) HTTP/1.x streaming with chunked encoding, whereas request.writeCustomFrame() is meant for HTTP/2 streaming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should be able to call write with HTTP 2 with setting content-length, perhaps this is due to the alpn/cleartext weirdness?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would I know what to set the content-length to, though? This is for the inserts stream use case, so more data may be written to the connection later (as more rows are inserted).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In HTTP2 you don't need to set content-length
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's what writeCustomFrame() lets us achieve, right? I guess I'm not understanding how the behavior of write(), assuming it's allowed for HTTP/2 without setting content length upfront, would differ from writeCustomFrame() as used in the current implementation.
(Just asking to improve my understanding of HTTP at this point. IIUC, this discussion is not blocking this PR :) )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merged the PR for now. Can update in a follow-up if we decide writeCustomFrame() isn't the way to go.
@@ -254,6 +299,7 @@ private static HttpClient createHttpClient(final Vertx vertx, final ClientOption | |||
.setSsl(clientOptions.isUseTls()) | |||
.setUseAlpn(clientOptions.isUseAlpn()) | |||
.setProtocolVersion(HttpVersion.HTTP_2) | |||
.setHttp2ClearTextUpgrade(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tbh I am not an expert on the http2 stuff here. I'd recommend posting on the vert.x google group.
if ("ok".equals(status)) { | ||
final InsertAck ack = new InsertAckImpl(seqNum); | ||
final boolean full = acksPublisher.accept(ack); | ||
if (full && !paused) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's ok. If they don't subscribe they can't expect to get errors.
Description
Fixes #5583
Docs will come in a follow-up PR. Javadocs on the new methods/interfaces/user-facing classes are included in this one.
Testing done
Added unit and integration tests.
Reviewer checklist