Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client): support streaming inserts in Java client #5641

Merged
merged 17 commits into from
Jun 25, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.client;

import org.reactivestreams.Publisher;

/**
* A Reactive Streams Publisher that publishes server acknowledgments for rows inserted into
* an existing ksqlDB stream via {@link Client#streamInserts(String, Publisher)}.
*/
public interface AcksPublisher extends Publisher<InsertAck> {

/**
* Returns whether the {@code AcksPublisher} is complete.
*
* <p>An {@code AcksPublisher} is complete if the HTTP connection associated with this
* {@link Client#streamInserts} request has been ended gracefully. Once complete, the
* {@code AcksPublisher} will continue to deliver any remaining rows, then call
* {@code onComplete()} on the subscriber, if present.
*
* @return whether the {@code AcksPublisher} is complete.
*/
boolean isComplete();

/**
* Returns whether the {@code AcksPublisher} is failed.
*
* <p>An {@code AcksPublisher} is failed if an error is received from the server. Once
* failed, {@code onError()} is called on the subscriber, if present, and new calls to
* {@code subscribe()} will be rejected.
*
* @return whether the {@code AcksPublisher} is failed.
*/
boolean isFailed();

}
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.reactivestreams.Publisher;

/**
* A client that connects to a specific ksqlDB server.
@@ -82,6 +83,24 @@ public interface Client {
*/
CompletableFuture<Void> insertInto(String streamName, KsqlObject row);

/**
* Inserts rows into a ksqlDB stream. Rows to insert are supplied by a
* {@code org.reactivestreams.Publisher} and server acknowledgments are exposed similarly.
*
* <p>The {@code CompletableFuture} will be failed if a non-200 response is received from the
* server.
*
* <p>See {@link InsertsPublisher} for an example publisher that may be passed an argument to
* this method.
*
* @param streamName name of the target stream
* @param insertsPublisher the publisher to provide rows to insert
* @return a future that completes once the initial server response is received, and contains a
* publisher that publishes server acknowledgments for inserted rows.
*/
CompletableFuture<AcksPublisher>
streamInserts(String streamName, Publisher<KsqlObject> insertsPublisher);

/**
* Terminates a push query with the specified query ID.
*
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.client;

import org.reactivestreams.Publisher;

/**
* An acknowledgment from the ksqlDB server that a row has been successfully inserted into a
* ksqlDB stream. See {@link Client#streamInserts(String, Publisher)} for details.
*/
public interface InsertAck {

/**
* Returns the corresponding sequence number for this acknowledgment. Sequence numbers start at
* zero for each new {@link Client#streamInserts} request.
*
* @return the sequence number
*/
long seqNum();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.client;

import java.util.ArrayDeque;
import java.util.Objects;
import java.util.Queue;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/**
* A {@code org.reactivestreams.Publisher} suitable for use with the
* {@link Client#streamInserts(String, Publisher)} method. Rows for insertion are passed to the
* publisher via the {@link #accept(KsqlObject)} method, and buffered for delivery once the
* {@link Client#streamInserts} request is made and the server-side subscriber has been subscribed.
*/
public class InsertsPublisher implements Publisher<KsqlObject> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not extend BufferedPublisher? Then you get most of this for free.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought when we first discussed providing InsertsPublisher and RowSubscriber (I plan to add this in a future PR) implementations for use with the client, we said the implementations shouldn't be tied to Vert.x. Did I misunderstand what you meant at the time?

Besides not having Vert.x, this InsertsPublisher is also simpler than BufferedPublisher in that it doesn't expose sending custom errors to the subscriber (since there's no reason to do that here), and a few other minor bits.

Also, by decoupling InsertsPublisher from BufferedPublisher, we can evolve BufferedPublisher in breaking ways without affecting the user-facing API.

WDYT?


/**
* The buffer max size indicator value used by the default constructor. See
* {@link #InsertsPublisher(int)} for how this value is used.
*/
public static final int DEFAULT_BUFFER_MAX_SIZE = 200;

private Subscriber<? super KsqlObject> subscriber;
private final Queue<KsqlObject> buffer = new ArrayDeque<>();
private final int bufferMaxSize;
private long demand;
private Runnable drainHandler;
private volatile boolean cancelled;
private boolean complete;
private boolean shouldSendComplete;
private boolean sentComplete;

/**
* Creates an {@code InsertsPublisher}.
*/
public InsertsPublisher() {
this(DEFAULT_BUFFER_MAX_SIZE);
}

/**
* Creates an {@code InsertsPublisher}.
*
* @param bufferMaxSize Indicative max number of elements to store in the buffer. Note that this
* value is not enforced, but it used to determine what to return from the
* {@link #accept(KsqlObject)} method so the caller can stop sending more
* rows and set a drainHandler to be notified when the buffer is cleared
*/
public InsertsPublisher(final int bufferMaxSize) {
this.bufferMaxSize = bufferMaxSize;
}

/**
* Provides a new row for insertion. The publisher will attempt to deliver it to server endpoint,
* once the {@link Client#streamInserts} request has been made. The publisher will buffer the row
* internally if it can't deliver it immediately. Note that the row will be buffered even if the
* buffer is 'full', i.e., if number of elements is at least {@code bufferMaxSize}, as the
* {@code bufferMaxSize} value is not a hard limit. See {@link #InsertsPublisher(int)} for more.
*
* @param row the row to insert
* @return whether the internal buffer is 'full', i.e., if number of elements is at least
* {@code bufferMaxSize}.
*/
public synchronized boolean accept(final KsqlObject row) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't think of a way to make the methods in this class thread-safe without synchronizing all the public methods, but that doesn't feel right. Hopefully someone else has better ideas?

if (complete || sentComplete) {
throw new IllegalStateException("Cannot call accept after complete is called");
}
if (!cancelled) {
if (demand == 0) {
buffer.add(row);
} else {
doOnNext(row);
}
}
return buffer.size() >= bufferMaxSize;
}

/**
* Sets a drain handler on the publisher. The drain handler will be called if after a row is
* delivered there are zero elements buffered internally and there is demand from the subscriber
* for more elements. Drain handlers may be used in combination with the return value from
* {@link #accept(KsqlObject)} to ensure the publisher's buffer does not grow too large.
*
* <p>Drain handlers are one shot handlers; after a drain handler is called it
* will never be called again. Instead, the caller should set a new drain handler for subsequent
* use.
*
* @param handler the drain handler
*/
public synchronized void drainHandler(final Runnable handler) {
if (drainHandler != null) {
throw new IllegalStateException("drainHandler already set");
}
this.drainHandler = Objects.requireNonNull(handler);
}

/**
* Marks the incoming stream of elements as complete. This means no further rows will be accepted
* by the publisher and the {@link Client#streamInserts} connection will be closed once any
* buffered rows have been delivered for insertion.
*/
public synchronized void complete() {
if (complete) {
return;
}
complete = true;
if (buffer.isEmpty() && subscriber != null) {
sendComplete();
} else {
shouldSendComplete = true;
}
}

@Override
public synchronized void subscribe(final Subscriber<? super KsqlObject> subscriber) {
if (this.subscriber != null) {
throw new IllegalStateException(
"Cannot subscribe a new subscriber: A subscriber is already present.");
}

this.subscriber = subscriber;
subscriber.onSubscribe(new Subscription() {
@Override
public void request(final long l) {
doRequest(l);
}

@Override
public void cancel() {
doCancel();
}
});
}

private synchronized void doRequest(final long n) {
if (n <= 0) {
subscriber.onError((new IllegalArgumentException("Amount requested must be > 0")));
} else if (demand + n < 1) {
// Catch overflow and set to "infinite"
demand = Long.MAX_VALUE;
maybeSend();
} else {
demand += n;
maybeSend();
}
}

private synchronized void doCancel() {
cancelled = true;
subscriber = null;
}

private void maybeSend() {
while (demand > 0 && !buffer.isEmpty()) {
final KsqlObject val = buffer.poll();
doOnNext(val);
}

if (buffer.isEmpty()) {
if (shouldSendComplete) {
sendComplete();
shouldSendComplete = false;
} else if (demand > 0 && drainHandler != null) {
drainHandler.run();
drainHandler = null;
}
}
}

private void doOnNext(final KsqlObject row) {
subscriber.onNext(row);

// If demand == Long.MAX_VALUE this means "infinite demand"
if (demand != Long.MAX_VALUE) {
demand--;
}
}

private void sendComplete() {
sentComplete = true;
subscriber.onComplete();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.client.impl;

import io.confluent.ksql.api.client.AcksPublisher;
import io.confluent.ksql.api.client.InsertAck;
import io.confluent.ksql.reactive.BufferedPublisher;
import io.vertx.core.Context;

public class AcksPublisherImpl extends BufferedPublisher<InsertAck> implements AcksPublisher {

public AcksPublisherImpl(final Context context) {
super(context);
}

@Override
public boolean isComplete() {
return super.isComplete();
}

@Override
public boolean isFailed() {
return super.isFailed();
}

public void handleError(final Exception e) {
sendError(e);
}
}
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
import static io.netty.handler.codec.http.HttpHeaderNames.AUTHORIZATION;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;

import io.confluent.ksql.api.client.AcksPublisher;
import io.confluent.ksql.api.client.BatchedQueryResult;
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
@@ -48,8 +49,11 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.reactivestreams.Publisher;

// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling
public class ClientImpl implements Client {
// CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling

private static final String QUERY_STREAM_ENDPOINT = "/query-stream";
private static final String INSERTS_ENDPOINT = "/inserts-stream";
@@ -101,7 +105,8 @@ public CompletableFuture<StreamedQueryResult> streamQuery(
final Map<String, Object> properties
) {
final CompletableFuture<StreamedQueryResult> cf = new CompletableFuture<>();
makeQueryRequest(sql, properties, cf, StreamQueryResponseHandler::new);
makeQueryRequest(sql, properties, cf,
(ctx, rp, fut, req) -> new StreamQueryResponseHandler(ctx, rp, fut));
return cf;
}

@@ -120,7 +125,7 @@ public BatchedQueryResult executeQuery(
sql,
properties,
result,
(context, recordParser, cf) -> new ExecuteQueryResponseHandler(
(context, recordParser, cf, request) -> new ExecuteQueryResponseHandler(
context, recordParser, cf, clientOptions.getExecuteQueryMaxResultRows())
);
return result;
@@ -139,7 +144,31 @@ public CompletableFuture<Void> insertInto(final String streamName, final KsqlObj
INSERTS_ENDPOINT,
requestBody,
cf,
response -> handleStreamedResponse(response, cf, InsertsResponseHandler::new)
response -> handleStreamedResponse(response, cf,
(ctx, rp, fut, req) -> new InsertIntoResponseHandler(ctx, rp, fut))
);

return cf;
}

@Override
public CompletableFuture<AcksPublisher> streamInserts(
final String streamName,
final Publisher<KsqlObject> insertsPublisher) {
final CompletableFuture<AcksPublisher> cf = new CompletableFuture<>();

final Buffer requestBody = Buffer.buffer();
final JsonObject params = new JsonObject().put("target", streamName);
requestBody.appendBuffer(params.toBuffer()).appendString("\n");

makeRequest(
"/inserts-stream",
requestBody,
cf,
response -> handleStreamedResponse(response, cf,
(ctx, rp, fut, req) ->
new StreamInsertsResponseHandler(ctx, rp, fut, req, insertsPublisher)),
false
);

return cf;
@@ -214,7 +243,7 @@ public void close() {

@FunctionalInterface
private interface StreamedResponseHandlerSupplier<T extends CompletableFuture<?>> {
ResponseHandler<T> get(Context ctx, RecordParser recordParser, T cf);
ResponseHandler<T> get(Context ctx, RecordParser recordParser, T cf, HttpClientRequest request);
}

@FunctionalInterface
@@ -251,6 +280,15 @@ private <T extends CompletableFuture<?>> void makeRequest(
final Buffer requestBody,
final T cf,
final Handler<HttpClientResponse> responseHandler) {
makeRequest(path, requestBody, cf, responseHandler, true);
}

private <T extends CompletableFuture<?>> void makeRequest(
final String path,
final Buffer requestBody,
final T cf,
final Handler<HttpClientResponse> responseHandler,
final boolean endRequest) {
HttpClientRequest request = httpClient.request(HttpMethod.POST,
serverSocketAddress, clientOptions.getPort(), clientOptions.getHost(),
path,
@@ -259,7 +297,14 @@ private <T extends CompletableFuture<?>> void makeRequest(
if (clientOptions.isUseBasicAuth()) {
request = configureBasicAuth(request);
}
request.end(requestBody);
if (endRequest) {
request.end(requestBody);
} else {
final HttpClientRequest finalRequest = request;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of this block? Is it to write some data without ending the request? If so, request.write() should do that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually had request.write() initially but hit

java.lang.IllegalStateException: You must set the Content-Length header to be the total size of the message body BEFORE sending any data if you are not using HTTP chunked encoding.

	at io.vertx.core.http.impl.HttpClientRequestImpl.write(HttpClientRequestImpl.java:619)
	at io.vertx.core.http.impl.HttpClientRequestImpl.write(HttpClientRequestImpl.java:575)
	at io.vertx.core.http.impl.HttpClientRequestImpl.write(HttpClientRequestImpl.java:569)
	at io.confluent.ksql.api.client.impl.ClientImpl.makeRequest(ClientImpl.java:241)
	at io.confluent.ksql.api.client.impl.ClientImpl.streamInserts(ClientImpl.java:152)
	at io.confluent.ksql.api.client.ClientTest.shouldStreamInserts(ClientTest.java:576)

My understanding is that request.write() is meant for (1) non-streaming use cases where the content length is known upfront, and (2) HTTP/1.x streaming with chunked encoding, whereas request.writeCustomFrame() is meant for HTTP/2 streaming.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should be able to call write with HTTP 2 with setting content-length, perhaps this is due to the alpn/cleartext weirdness?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would I know what to set the content-length to, though? This is for the inserts stream use case, so more data may be written to the connection later (as more rows are inserted).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In HTTP2 you don't need to set content-length

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what writeCustomFrame() lets us achieve, right? I guess I'm not understanding how the behavior of write(), assuming it's allowed for HTTP/2 without setting content length upfront, would differ from writeCustomFrame() as used in the current implementation.

(Just asking to improve my understanding of HTTP at this point. IIUC, this discussion is not blocking this PR :) )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merged the PR for now. Can update in a follow-up if we decide writeCustomFrame() isn't the way to go.

finalRequest.sendHead(version -> {
finalRequest.writeCustomFrame(0, 0, requestBody);
});
}
}

private HttpClientRequest configureBasicAuth(final HttpClientRequest request) {
@@ -273,7 +318,7 @@ private static <T extends CompletableFuture<?>> void handleStreamedResponse(
if (response.statusCode() == OK.code()) {
final RecordParser recordParser = RecordParser.newDelimited("\n", response);
final ResponseHandler<T> responseHandler =
responseHandlerSupplier.get(Vertx.currentContext(), recordParser, cf);
responseHandlerSupplier.get(Vertx.currentContext(), recordParser, cf, response.request());

recordParser.handler(responseHandler::handleBodyBuffer);
recordParser.endHandler(responseHandler::handleBodyEnd);
@@ -344,6 +389,7 @@ private static HttpClient createHttpClient(final Vertx vertx, final ClientOption
.setSsl(clientOptions.isUseTls())
.setUseAlpn(clientOptions.isUseAlpn())
.setProtocolVersion(HttpVersion.HTTP_2)
.setHttp2ClearTextUpgrade(false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @purplefox I had to set this since otherwise the new tests were failing with

java.lang.IllegalStateException: Cannot write an HTTP/2 frame over an HTTP/1.x connection

but I don't understand why that is. How come the cleartext upgrade prevents writing of HTTP/2 frames? Is the issue a race condition where the upgrade hasn't taken place by the time I try writing an HTTP/2 frame?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relatedly, do you think this change needs to be backported to the 6.0.x branch? (The old integrations tests pass on 6.0.x without this change so it seems fine to not backport but I wonder if I'm not realizing additional implications of this change.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tbh I am not an expert on the http2 stuff here. I'd recommend posting on the vert.x google group.

.setVerifyHost(clientOptions.isVerifyHost())
.setDefaultHost(clientOptions.getHost())
.setDefaultPort(clientOptions.getPort());
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.client.impl;

import io.confluent.ksql.api.client.InsertAck;
import java.util.Objects;

public class InsertAckImpl implements InsertAck {

private final long num;

InsertAckImpl(final long num) {
this.num = num;
}

@Override
public long seqNum() {
return num;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final InsertAckImpl insertAck = (InsertAckImpl) o;
return num == insertAck.num;
}

@Override
public int hashCode() {
return Objects.hash(num);
}

@Override
public String toString() {
return "InsertAckImpl{"
+ "num=" + num
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -22,11 +22,11 @@
import io.vertx.core.parsetools.RecordParser;
import java.util.concurrent.CompletableFuture;

public class InsertsResponseHandler extends ResponseHandler<CompletableFuture<Void>> {
public class InsertIntoResponseHandler extends ResponseHandler<CompletableFuture<Void>> {

private int numAcks;

InsertsResponseHandler(
InsertIntoResponseHandler(
final Context context, final RecordParser recordParser, final CompletableFuture<Void> cf) {
super(context, recordParser, cf);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.client.impl;

import io.confluent.ksql.api.client.AcksPublisher;
import io.confluent.ksql.api.client.InsertAck;
import io.confluent.ksql.api.client.KsqlObject;
import io.confluent.ksql.api.client.exception.KsqlClientException;
import io.vertx.core.Context;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.parsetools.RecordParser;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.reactivestreams.Publisher;

public class StreamInsertsResponseHandler
extends ResponseHandler<CompletableFuture<AcksPublisher>> {

private static final Logger log = LoggerFactory.getLogger(StreamInsertsResponseHandler.class);

private final AcksPublisherImpl acksPublisher;
private boolean paused;

StreamInsertsResponseHandler(
final Context context,
final RecordParser recordParser,
final CompletableFuture<AcksPublisher> cf,
final HttpClientRequest request,
final Publisher<KsqlObject> insertsPublisher
) {
super(context, recordParser, cf);

Objects.requireNonNull(request);
insertsPublisher.subscribe(new StreamInsertsSubscriber(context, request));

this.acksPublisher = new AcksPublisherImpl(context);
cf.complete(acksPublisher);
}

@Override
protected void doHandleBodyBuffer(final Buffer buff) {
final JsonObject jsonObject = new JsonObject(buff);
final long seqNum = jsonObject.getLong("seq");
final String status = jsonObject.getString("status");
if ("ok".equals(status)) {
final InsertAck ack = new InsertAckImpl(seqNum);
final boolean full = acksPublisher.accept(ack);
if (full && !paused) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because AcksPublisherImpl is a buffered publisher, if a user that is not interested in acks and does not subscribe a subscriber to the AcksPublisher, then the record parser will be paused and any errors later written to the response will not be read. On the one hand this seems bad, but OTOH if the user isn't consuming acks then they're likely not interested? An alternative would be to switch the AcksPublisher to not be buffered, which means we can continue to consume messages from the response and log any potential errors even if no subscriber is subscribed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok. If they don't subscribe they can't expect to get errors.

recordParser.pause();
acksPublisher.drainHandler(this::publisherReceptive);
paused = true;
}
} else if ("error".equals(status)) {
acksPublisher.handleError(new KsqlClientException(String.format(
"Received error from /inserts-stream. Inserts sequence number: %d. "
+ "Error code: %d. Message: %s",
seqNum,
jsonObject.getInteger("error_code"),
jsonObject.getString("message")
)));
} else {
throw new IllegalStateException(
"Unrecognized status response from /inserts-stream: " + status);
}
}

@Override
protected void doHandleException(final Throwable t) {
log.error(t);
acksPublisher.handleError(new Exception(t));
}

@Override
protected void doHandleBodyEnd() {
acksPublisher.complete();
}

private void publisherReceptive() {
checkContext();

paused = false;
recordParser.resume();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.client.impl;

import io.confluent.ksql.api.client.KsqlObject;
import io.confluent.ksql.reactive.BaseSubscriber;
import io.vertx.core.Context;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import java.util.Objects;
import org.reactivestreams.Subscription;

public class StreamInsertsSubscriber extends BaseSubscriber<KsqlObject> {

private static final Logger log = LoggerFactory.getLogger(StreamInsertsSubscriber.class);

private static final int REQUEST_BATCH_SIZE = 200;

private final HttpClientRequest httpRequest;
private int outstandingTokens;
private boolean drainHandlerSet;

public StreamInsertsSubscriber(final Context context, final HttpClientRequest httpRequest) {
super(context);
this.httpRequest = Objects.requireNonNull(httpRequest);
}

@Override
protected void afterSubscribe(final Subscription subscription) {
checkRequest();
}

@Override
protected void handleValue(final KsqlObject row) {
httpRequest.writeCustomFrame(
0, 0,
Buffer.buffer().appendString(row.toJsonString()).appendString("\n")
);
outstandingTokens--;

if (httpRequest.writeQueueFull()) {
if (!drainHandlerSet) {
httpRequest.drainHandler(this::httpRequestReceptive);
drainHandlerSet = true;
} else {
checkRequest();
}
}
}

@Override
protected void handleComplete() {
httpRequest.end();
}

@Override
protected void handleError(final Throwable t) {
log.error("Received error from streamInserts() publisher. Ending connection.", t);
httpRequest.end();
}

private void checkRequest() {
if (outstandingTokens == 0) {
outstandingTokens = REQUEST_BATCH_SIZE;
makeRequest(REQUEST_BATCH_SIZE);
}
}

private void httpRequestReceptive(final Void v) {
drainHandlerSet = false;
checkRequest();
}
}
Original file line number Diff line number Diff line change
@@ -576,6 +576,96 @@ public void shouldHandleErrorFromInsertInto() {
}

@Test
public void shouldStreamInserts() throws Exception {
// Given:
final InsertsPublisher insertsPublisher = new InsertsPublisher();

// When:
final AcksPublisher acksPublisher = javaClient.streamInserts("test-stream", insertsPublisher).get();
for (final KsqlObject row : INSERT_ROWS) {
insertsPublisher.accept(row);
}

TestSubscriber<InsertAck> acksSubscriber = subscribeAndWait(acksPublisher);
acksSubscriber.getSub().request(INSERT_ROWS.size());

// Then:
assertThatEventually(() -> testEndpoints.getInsertsSubscriber().getRowsInserted(), hasSize(INSERT_ROWS.size()));
for (int i = 0; i < INSERT_ROWS.size(); i++) {
assertThat(testEndpoints.getInsertsSubscriber().getRowsInserted().get(i), is(EXPECTED_INSERT_ROWS.get(i)));
}
assertThat(testEndpoints.getLastTarget(), is("test-stream"));

assertThatEventually(acksSubscriber::getValues, hasSize(INSERT_ROWS.size()));
assertThat(acksSubscriber.getError(), is(nullValue()));
for (int i = 0; i < INSERT_ROWS.size(); i++) {
assertThat(acksSubscriber.getValues().get(i).seqNum(), is(Long.valueOf(i)));
}
assertThat(acksSubscriber.isCompleted(), is(false));

assertThat(acksPublisher.isComplete(), is(false));
assertThat(acksPublisher.isFailed(), is(false));

// When:
insertsPublisher.complete();

// Then:
assertThatEventually(acksPublisher::isComplete, is(true));
assertThat(acksPublisher.isFailed(), is(false));
assertThatEventually(acksSubscriber::isCompleted, is(true));
}

@Test
public void shouldHandleErrorResponseFromStreamInserts() {
// Given
KsqlApiException exception = new KsqlApiException("Cannot insert into a table", ERROR_CODE_BAD_REQUEST);
testEndpoints.setCreateInsertsSubscriberException(exception);

// When
final Exception e = assertThrows(
ExecutionException.class, // thrown from .get() when the future completes exceptionally
() -> javaClient.streamInserts("a-table", new InsertsPublisher()).get()
);

// Then
assertThat(e.getCause(), instanceOf(KsqlClientException.class));
assertThat(e.getCause().getMessage(), containsString("Received 400 response from server"));
assertThat(e.getCause().getMessage(), containsString("Cannot insert into a table"));
}

@Test
public void shouldHandleErrorFromStreamInserts() throws Exception {
// Given:
testEndpoints.setAcksBeforePublisherError(INSERT_ROWS.size() - 1);
final InsertsPublisher insertsPublisher = new InsertsPublisher();

// When:
final AcksPublisher acksPublisher = javaClient.streamInserts("test-stream", insertsPublisher).get();
for (int i = 0; i < INSERT_ROWS.size(); i++) {
insertsPublisher.accept(INSERT_ROWS.get(i));
}

TestSubscriber<InsertAck> acksSubscriber = subscribeAndWait(acksPublisher);
acksSubscriber.getSub().request(INSERT_ROWS.size() - 1); // Error is sent even if not requested

// Then:
// No ack is emitted for the row that generates the error, but the row still counts as having been inserted
assertThatEventually(() -> testEndpoints.getInsertsSubscriber().getRowsInserted(), hasSize(INSERT_ROWS.size()));
for (int i = 0; i < INSERT_ROWS.size(); i++) {
assertThat(testEndpoints.getInsertsSubscriber().getRowsInserted().get(i), is(EXPECTED_INSERT_ROWS.get(i)));
}
assertThat(testEndpoints.getLastTarget(), is("test-stream"));

assertThatEventually(acksSubscriber::getValues, hasSize(INSERT_ROWS.size() - 1));
for (int i = 0; i < INSERT_ROWS.size() - 1; i++) {
assertThat(acksSubscriber.getValues().get(i).seqNum(), is(Long.valueOf(i)));
}
assertThatEventually(acksSubscriber::getError, is(notNullValue()));

assertThat(acksPublisher.isFailed(), is(true));
assertThat(acksPublisher.isComplete(), is(false));
}

public void shouldListStreams() throws Exception {
// Given
final List<SourceInfo.Stream> expectedStreams = new ArrayList<>();
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.client.impl;

import com.google.common.testing.EqualsTester;
import org.junit.Test;

public class InsertAckImplTest {

@Test
public void shouldImplementHashCodeAndEquals() {
new EqualsTester()
.addEqualityGroup(
new InsertAckImpl(0L),
new InsertAckImpl(0L)
)
.addEqualityGroup(
new InsertAckImpl(1L)
)
.testEquals();
}

}
Original file line number Diff line number Diff line change
@@ -36,10 +36,13 @@
import com.google.common.collect.Multimap;
import io.confluent.common.utils.IntegrationTest;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.api.client.AcksPublisher;
import io.confluent.ksql.api.client.BatchedQueryResult;
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.confluent.ksql.api.client.ColumnType;
import io.confluent.ksql.api.client.InsertAck;
import io.confluent.ksql.api.client.InsertsPublisher;
import io.confluent.ksql.api.client.KsqlArray;
import io.confluent.ksql.api.client.exception.KsqlClientException;
import io.confluent.ksql.api.client.KsqlObject;
@@ -121,6 +124,11 @@ public class ClientIntegrationTest {
private static final String EMPTY_TEST_TOPIC = EMPTY_TEST_DATA_PROVIDER.topicName();
private static final String EMPTY_TEST_STREAM = EMPTY_TEST_DATA_PROVIDER.kstreamName();

private static final TestDataProvider<String> EMPTY_TEST_DATA_PROVIDER_2 = new TestDataProvider<>(
"EMPTY_STRUCTURED_TYPES_2", TEST_DATA_PROVIDER.schema(), ImmutableListMultimap.of());
private static final String EMPTY_TEST_TOPIC_2 = EMPTY_TEST_DATA_PROVIDER_2.topicName();
private static final String EMPTY_TEST_STREAM_2 = EMPTY_TEST_DATA_PROVIDER_2.kstreamName();

private static final String PUSH_QUERY = "SELECT * FROM " + TEST_STREAM + " EMIT CHANGES;";
private static final String PULL_QUERY = "SELECT * from " + AGG_TABLE + " WHERE STR='" + AN_AGG_KEY + "';";
private static final int PUSH_QUERY_LIMIT_NUM_ROWS = 2;
@@ -159,17 +167,16 @@ public class ClientIntegrationTest {

@BeforeClass
public static void setUpClass() {
TEST_HARNESS.ensureTopics(TEST_TOPIC);
TEST_HARNESS.ensureTopics(TEST_TOPIC, EMPTY_TEST_TOPIC, EMPTY_TEST_TOPIC_2);
TEST_HARNESS.produceRows(TEST_TOPIC, TEST_DATA_PROVIDER, FormatFactory.JSON);
RestIntegrationTestUtil.createStream(REST_APP, TEST_DATA_PROVIDER);
RestIntegrationTestUtil.createStream(REST_APP, EMPTY_TEST_DATA_PROVIDER);
RestIntegrationTestUtil.createStream(REST_APP, EMPTY_TEST_DATA_PROVIDER_2);

makeKsqlRequest("CREATE TABLE " + AGG_TABLE + " AS "
+ "SELECT STR, LATEST_BY_OFFSET(LONG) AS LONG FROM " + TEST_STREAM + " GROUP BY STR;"
);

TEST_HARNESS.ensureTopics(EMPTY_TEST_TOPIC);
RestIntegrationTestUtil.createStream(REST_APP, EMPTY_TEST_DATA_PROVIDER);

TEST_HARNESS.verifyAvailableUniqueRows(
AGG_TABLE,
4, // Only unique keys are counted
@@ -593,6 +600,80 @@ public void shouldExecuteQueryWithProperties() {
assertThat(row.getKsqlObject("COMPLEX"), is(EXPECTED_COMPLEX_FIELD_VALUE));
}

@Test
public void shouldStreamInserts() throws Exception {
// Given
final InsertsPublisher insertsPublisher = new InsertsPublisher();
final int numRows = 5;

// When
final AcksPublisher acksPublisher = client.streamInserts(EMPTY_TEST_STREAM_2, insertsPublisher).get();

TestSubscriber<InsertAck> acksSubscriber = subscribeAndWait(acksPublisher);
assertThat(acksSubscriber.getValues(), hasSize(0));
acksSubscriber.getSub().request(numRows);

for (int i = 0; i < numRows; i++) {
insertsPublisher.accept(new KsqlObject()
.put("STR", "TEST_" + i)
.put("LONG", i)
.put("DEC", new BigDecimal("13.31"))
.put("ARRAY", new KsqlArray().add("v_" + i))
.put("MAP", new KsqlObject().put("k_" + i, "v_" + i))
.put("COMPLEX", COMPLEX_FIELD_VALUE));
}

// Then
assertThatEventually(acksSubscriber::getValues, hasSize(numRows));
for (int i = 0; i < numRows; i++) {
assertThat(acksSubscriber.getValues().get(i).seqNum(), is(Long.valueOf(i)));
}
assertThat(acksSubscriber.getError(), is(nullValue()));
assertThat(acksSubscriber.isCompleted(), is(false));

assertThat(acksPublisher.isComplete(), is(false));
assertThat(acksPublisher.isFailed(), is(false));

// Then: should receive new rows
final String query = "SELECT * FROM " + EMPTY_TEST_STREAM_2 + " EMIT CHANGES LIMIT " + numRows + ";";
final List<Row> rows = client.executeQuery(query).get();

// Verify inserted rows are as expected
assertThat(rows, hasSize(numRows));
for (int i = 0; i < numRows; i++) {
assertThat(rows.get(i).getString("STR"), is("TEST_" + i));
assertThat(rows.get(i).getLong("LONG"), is(Long.valueOf(i)));
assertThat(rows.get(i).getDecimal("DEC"), is(new BigDecimal("13.31")));
assertThat(rows.get(i).getKsqlArray("ARRAY"), is(new KsqlArray().add("v_" + i)));
assertThat(rows.get(i).getKsqlObject("MAP"), is(new KsqlObject().put("k_" + i, "v_" + i)));
assertThat(rows.get(i).getKsqlObject("COMPLEX"), is(EXPECTED_COMPLEX_FIELD_VALUE));
}

// When: end connection
insertsPublisher.complete();

// Then
assertThatEventually(acksSubscriber::isCompleted, is(true));
assertThat(acksSubscriber.getError(), is(nullValue()));

assertThat(acksPublisher.isComplete(), is(true));
assertThat(acksPublisher.isFailed(), is(false));
}

@Test
public void shouldHandleErrorResponseFromStreamInserts() {
// When
final Exception e = assertThrows(
ExecutionException.class, // thrown from .get() when the future completes exceptionally
() -> client.streamInserts(AGG_TABLE, new InsertsPublisher()).get()
);

// Then
assertThat(e.getCause(), instanceOf(KsqlClientException.class));
assertThat(e.getCause().getMessage(), containsString("Received 400 response from server"));
assertThat(e.getCause().getMessage(), containsString("Cannot insert into a table"));
}

@SuppressWarnings("unchecked")
@Test
public void shouldListStreams() throws Exception {
@@ -602,7 +683,8 @@ public void shouldListStreams() throws Exception {
// Then
assertThat(streams, containsInAnyOrder(
streamForProvider(TEST_DATA_PROVIDER),
streamForProvider(EMPTY_TEST_DATA_PROVIDER)
streamForProvider(EMPTY_TEST_DATA_PROVIDER),
streamForProvider(EMPTY_TEST_DATA_PROVIDER_2)
));
}

@@ -625,6 +707,7 @@ public void shouldListTopics() throws Exception {
assertThat(topics, containsInAnyOrder(
topicInfo(TEST_TOPIC),
topicInfo(EMPTY_TEST_TOPIC),
topicInfo(EMPTY_TEST_TOPIC_2),
topicInfo(AGG_TABLE)
));
}