-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(client): support streaming inserts in Java client #5641
Merged
vcrfxia
merged 17 commits into
confluentinc:master
from
vcrfxia:java-client-stream-inserts
Jun 25, 2020
Merged
Changes from 11 commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
23955f6
feat(client): support streaming inserts (wip)
vcrfxia ec060cb
chore: add example publisher
vcrfxia 309e874
chore: clean up isFailed() from example publisher
vcrfxia f56d070
test: add unit test
vcrfxia 1a61ed0
fix: switch to writing HTTP/2 frames
vcrfxia 4fb75c2
fix: fix failing test by disabling http2 cleartext upgrade
vcrfxia 7189c40
test: more unit tests
vcrfxia 5de76cf
test: add integration test
vcrfxia 62004d9
Merge branch '6.0.x' into java-client-stream-inserts
vcrfxia 627e0f4
chore: checkstyle and javadocs
vcrfxia 1d85b2a
Merge branch 'master' into java-client-stream-inserts
vcrfxia 059a8be
Merge branch 'master' into java-client-stream-inserts
vcrfxia 89ae593
Merge branch 'master' into java-client-stream-inserts
vcrfxia b2ec8e4
chore: fix merge from master
vcrfxia 559aec6
Merge branch 'master' into java-client-stream-inserts
vcrfxia e3cb408
chore: add equals/hashCode/toString methods to InsertAckImpl
vcrfxia 685d4e9
test: add EqualsTester test
vcrfxia File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
49 changes: 49 additions & 0 deletions
49
ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/AcksPublisher.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
/* | ||
* Copyright 2020 Confluent Inc. | ||
* | ||
* Licensed under the Confluent Community License (the "License"); you may not use | ||
* this file except in compliance with the License. You may obtain a copy of the | ||
* License at | ||
* | ||
* http://www.confluent.io/confluent-community-license | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
*/ | ||
|
||
package io.confluent.ksql.api.client; | ||
|
||
import org.reactivestreams.Publisher; | ||
|
||
/** | ||
* A Reactive Streams Publisher that publishes server acknowledgments for rows inserted into | ||
* an existing ksqlDB stream via {@link Client#streamInserts(String, Publisher)}. | ||
*/ | ||
public interface AcksPublisher extends Publisher<InsertAck> { | ||
|
||
/** | ||
* Returns whether the {@code AcksPublisher} is complete. | ||
* | ||
* <p>An {@code AcksPublisher} is complete if the HTTP connection associated with this | ||
* {@link Client#streamInserts} request has been ended gracefully. Once complete, the | ||
* {@code AcksPublisher} will continue to deliver any remaining rows, then call | ||
* {@code onComplete()} on the subscriber, if present. | ||
* | ||
* @return whether the {@code AcksPublisher} is complete. | ||
*/ | ||
boolean isComplete(); | ||
|
||
/** | ||
* Returns whether the {@code AcksPublisher} is failed. | ||
* | ||
* <p>An {@code AcksPublisher} is failed if an error is received from the server. Once | ||
* failed, {@code onError()} is called on the subscriber, if present, and new calls to | ||
* {@code subscribe()} will be rejected. | ||
* | ||
* @return whether the {@code AcksPublisher} is failed. | ||
*/ | ||
boolean isFailed(); | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
34 changes: 34 additions & 0 deletions
34
ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/InsertAck.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
/* | ||
* Copyright 2020 Confluent Inc. | ||
* | ||
* Licensed under the Confluent Community License (the "License"); you may not use | ||
* this file except in compliance with the License. You may obtain a copy of the | ||
* License at | ||
* | ||
* http://www.confluent.io/confluent-community-license | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
*/ | ||
|
||
package io.confluent.ksql.api.client; | ||
|
||
import org.reactivestreams.Publisher; | ||
|
||
/** | ||
* An acknowledgment from the ksqlDB server that a row has been successfully inserted into a | ||
* ksqlDB stream. See {@link Client#streamInserts(String, Publisher)} for details. | ||
*/ | ||
public interface InsertAck { | ||
|
||
/** | ||
* Returns the corresponding sequence number for this acknowledgment. Sequence numbers start at | ||
* zero for each new {@link Client#streamInserts} request. | ||
* | ||
* @return the sequence number | ||
*/ | ||
long seqNum(); | ||
|
||
} |
198 changes: 198 additions & 0 deletions
198
ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/InsertsPublisher.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,198 @@ | ||
/* | ||
* Copyright 2020 Confluent Inc. | ||
* | ||
* Licensed under the Confluent Community License (the "License"); you may not use | ||
* this file except in compliance with the License. You may obtain a copy of the | ||
* License at | ||
* | ||
* http://www.confluent.io/confluent-community-license | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
*/ | ||
|
||
package io.confluent.ksql.api.client; | ||
|
||
import java.util.ArrayDeque; | ||
import java.util.Objects; | ||
import java.util.Queue; | ||
import org.reactivestreams.Publisher; | ||
import org.reactivestreams.Subscriber; | ||
import org.reactivestreams.Subscription; | ||
|
||
/** | ||
* A {@code org.reactivestreams.Publisher} suitable for use with the | ||
* {@link Client#streamInserts(String, Publisher)} method. Rows for insertion are passed to the | ||
* publisher via the {@link #accept(KsqlObject)} method, and buffered for delivery once the | ||
* {@link Client#streamInserts} request is made and the server-side subscriber has been subscribed. | ||
*/ | ||
public class InsertsPublisher implements Publisher<KsqlObject> { | ||
|
||
/** | ||
* The buffer max size indicator value used by the default constructor. See | ||
* {@link #InsertsPublisher(int)} for how this value is used. | ||
*/ | ||
public static final int DEFAULT_BUFFER_MAX_SIZE = 200; | ||
|
||
private Subscriber<? super KsqlObject> subscriber; | ||
private final Queue<KsqlObject> buffer = new ArrayDeque<>(); | ||
private final int bufferMaxSize; | ||
private long demand; | ||
private Runnable drainHandler; | ||
private volatile boolean cancelled; | ||
private boolean complete; | ||
private boolean shouldSendComplete; | ||
private boolean sentComplete; | ||
|
||
/** | ||
* Creates an {@code InsertsPublisher}. | ||
*/ | ||
public InsertsPublisher() { | ||
this(DEFAULT_BUFFER_MAX_SIZE); | ||
} | ||
|
||
/** | ||
* Creates an {@code InsertsPublisher}. | ||
* | ||
* @param bufferMaxSize Indicative max number of elements to store in the buffer. Note that this | ||
* value is not enforced, but it used to determine what to return from the | ||
* {@link #accept(KsqlObject)} method so the caller can stop sending more | ||
* rows and set a drainHandler to be notified when the buffer is cleared | ||
*/ | ||
public InsertsPublisher(final int bufferMaxSize) { | ||
this.bufferMaxSize = bufferMaxSize; | ||
} | ||
|
||
/** | ||
* Provides a new row for insertion. The publisher will attempt to deliver it to server endpoint, | ||
* once the {@link Client#streamInserts} request has been made. The publisher will buffer the row | ||
* internally if it can't deliver it immediately. Note that the row will be buffered even if the | ||
* buffer is 'full', i.e., if number of elements is at least {@code bufferMaxSize}, as the | ||
* {@code bufferMaxSize} value is not a hard limit. See {@link #InsertsPublisher(int)} for more. | ||
* | ||
* @param row the row to insert | ||
* @return whether the internal buffer is 'full', i.e., if number of elements is at least | ||
* {@code bufferMaxSize}. | ||
*/ | ||
public synchronized boolean accept(final KsqlObject row) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Couldn't think of a way to make the methods in this class thread-safe without synchronizing all the public methods, but that doesn't feel right. Hopefully someone else has better ideas? |
||
if (complete || sentComplete) { | ||
throw new IllegalStateException("Cannot call accept after complete is called"); | ||
} | ||
if (!cancelled) { | ||
if (demand == 0) { | ||
buffer.add(row); | ||
} else { | ||
doOnNext(row); | ||
} | ||
} | ||
return buffer.size() >= bufferMaxSize; | ||
} | ||
|
||
/** | ||
* Sets a drain handler on the publisher. The drain handler will be called if after a row is | ||
* delivered there are zero elements buffered internally and there is demand from the subscriber | ||
* for more elements. Drain handlers may be used in combination with the return value from | ||
* {@link #accept(KsqlObject)} to ensure the publisher's buffer does not grow too large. | ||
* | ||
* <p>Drain handlers are one shot handlers; after a drain handler is called it | ||
* will never be called again. Instead, the caller should set a new drain handler for subsequent | ||
* use. | ||
* | ||
* @param handler the drain handler | ||
*/ | ||
public synchronized void drainHandler(final Runnable handler) { | ||
if (drainHandler != null) { | ||
throw new IllegalStateException("drainHandler already set"); | ||
} | ||
this.drainHandler = Objects.requireNonNull(handler); | ||
} | ||
|
||
/** | ||
* Marks the incoming stream of elements as complete. This means no further rows will be accepted | ||
* by the publisher and the {@link Client#streamInserts} connection will be closed once any | ||
* buffered rows have been delivered for insertion. | ||
*/ | ||
public synchronized void complete() { | ||
if (complete) { | ||
return; | ||
} | ||
complete = true; | ||
if (buffer.isEmpty() && subscriber != null) { | ||
sendComplete(); | ||
} else { | ||
shouldSendComplete = true; | ||
} | ||
} | ||
|
||
@Override | ||
public synchronized void subscribe(final Subscriber<? super KsqlObject> subscriber) { | ||
if (this.subscriber != null) { | ||
throw new IllegalStateException( | ||
"Cannot subscribe a new subscriber: A subscriber is already present."); | ||
} | ||
|
||
this.subscriber = subscriber; | ||
subscriber.onSubscribe(new Subscription() { | ||
@Override | ||
public void request(final long l) { | ||
doRequest(l); | ||
} | ||
|
||
@Override | ||
public void cancel() { | ||
doCancel(); | ||
} | ||
}); | ||
} | ||
|
||
private synchronized void doRequest(final long n) { | ||
if (n <= 0) { | ||
subscriber.onError((new IllegalArgumentException("Amount requested must be > 0"))); | ||
} else if (demand + n < 1) { | ||
// Catch overflow and set to "infinite" | ||
demand = Long.MAX_VALUE; | ||
maybeSend(); | ||
} else { | ||
demand += n; | ||
maybeSend(); | ||
} | ||
} | ||
|
||
private synchronized void doCancel() { | ||
cancelled = true; | ||
subscriber = null; | ||
} | ||
|
||
private void maybeSend() { | ||
while (demand > 0 && !buffer.isEmpty()) { | ||
final KsqlObject val = buffer.poll(); | ||
doOnNext(val); | ||
} | ||
|
||
if (buffer.isEmpty()) { | ||
if (shouldSendComplete) { | ||
sendComplete(); | ||
shouldSendComplete = false; | ||
} else if (demand > 0 && drainHandler != null) { | ||
drainHandler.run(); | ||
drainHandler = null; | ||
} | ||
} | ||
} | ||
|
||
private void doOnNext(final KsqlObject row) { | ||
subscriber.onNext(row); | ||
|
||
// If demand == Long.MAX_VALUE this means "infinite demand" | ||
if (demand != Long.MAX_VALUE) { | ||
demand--; | ||
} | ||
} | ||
|
||
private void sendComplete() { | ||
sentComplete = true; | ||
subscriber.onComplete(); | ||
} | ||
} |
42 changes: 42 additions & 0 deletions
42
ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/AcksPublisherImpl.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
/* | ||
* Copyright 2020 Confluent Inc. | ||
* | ||
* Licensed under the Confluent Community License (the "License"); you may not use | ||
* this file except in compliance with the License. You may obtain a copy of the | ||
* License at | ||
* | ||
* http://www.confluent.io/confluent-community-license | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
*/ | ||
|
||
package io.confluent.ksql.api.client.impl; | ||
|
||
import io.confluent.ksql.api.client.AcksPublisher; | ||
import io.confluent.ksql.api.client.InsertAck; | ||
import io.confluent.ksql.reactive.BufferedPublisher; | ||
import io.vertx.core.Context; | ||
|
||
public class AcksPublisherImpl extends BufferedPublisher<InsertAck> implements AcksPublisher { | ||
|
||
public AcksPublisherImpl(final Context context) { | ||
super(context); | ||
} | ||
|
||
@Override | ||
public boolean isComplete() { | ||
return super.isComplete(); | ||
} | ||
|
||
@Override | ||
public boolean isFailed() { | ||
return super.isFailed(); | ||
} | ||
|
||
public void handleError(final Exception e) { | ||
sendError(e); | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not extend BufferedPublisher? Then you get most of this for free.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought when we first discussed providing InsertsPublisher and RowSubscriber (I plan to add this in a future PR) implementations for use with the client, we said the implementations shouldn't be tied to Vert.x. Did I misunderstand what you meant at the time?
Besides not having Vert.x, this InsertsPublisher is also simpler than BufferedPublisher in that it doesn't expose sending custom errors to the subscriber (since there's no reason to do that here), and a few other minor bits.
Also, by decoupling InsertsPublisher from BufferedPublisher, we can evolve BufferedPublisher in breaking ways without affecting the user-facing API.
WDYT?