Skip to content

Commit

Permalink
feat(client): support push query termination in Java client (#5371)
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia authored May 16, 2020
1 parent dd4f928 commit 62dacca
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public interface Client {

Publisher<InsertAck> streamInserts(String streamName, Publisher<List<Object>> insertsPublisher);

CompletableFuture<Void> terminatePushQuery(String queryId);

void close();

static Client create(ClientOptions clientOptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.confluent.ksql.api.client.StreamedQueryResult;
import io.confluent.ksql.rest.client.KsqlRestClientException;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
Expand Down Expand Up @@ -115,6 +116,11 @@ public Publisher<InsertAck> streamInserts(
return null; // not yet implemented
}

@Override
public CompletableFuture<Void> terminatePushQuery(final String queryId) {
return makeCloseQueryRequest(queryId);
}

@Override
public void close() {
httpClient.close();
Expand All @@ -133,29 +139,53 @@ private <T> CompletableFuture<T> makeQueryRequest(
final Map<String, Object> properties,
final ResponseHandlerSupplier<T> responseHandlerSupplier
) {

final JsonObject requestBody = new JsonObject().put("sql", sql).put("properties", properties);

final CompletableFuture<T> cf = new CompletableFuture<>();
makeRequest(
"/query-stream",
requestBody,
cf,
response -> handleQueryResponse(response, cf, responseHandlerSupplier)
);

return cf;
}

private CompletableFuture<Void> makeCloseQueryRequest(final String queryId) {
final CompletableFuture<Void> cf = new CompletableFuture<>();

makeRequest(
"/close-query",
new JsonObject().put("queryId", queryId),
cf,
response -> handleCloseQueryResponse(response, cf)
);

return cf;
}

private <T> void makeRequest(
final String path,
final JsonObject requestBody,
final CompletableFuture<T> cf,
final Handler<HttpClientResponse> responseHandler) {
HttpClientRequest request = httpClient.request(HttpMethod.POST,
serverSocketAddress, clientOptions.getPort(), clientOptions.getHost(),
"/query-stream",
response -> handleResponse(response, cf, responseHandlerSupplier))
path,
responseHandler)
.exceptionHandler(cf::completeExceptionally);
if (clientOptions.isUseBasicAuth()) {
request = configureBasicAuth(request);
}
request.end(requestBody.toBuffer());

return cf;
}

private HttpClientRequest configureBasicAuth(final HttpClientRequest request) {
return request.putHeader(AUTHORIZATION.toString(), basicAuthHeader);
}

private static <T> void handleResponse(
private static <T> void handleQueryResponse(
final HttpClientResponse response,
final CompletableFuture<T> cf,
final ResponseHandlerSupplier<T> responseHandlerSupplier) {
Expand All @@ -168,18 +198,36 @@ private static <T> void handleResponse(
recordParser.endHandler(responseHandler::handleBodyEnd);
recordParser.exceptionHandler(responseHandler::handleException);
} else {
response.bodyHandler(buffer -> {
final JsonObject errorResponse = buffer.toJsonObject();
cf.completeExceptionally(new KsqlRestClientException(String.format(
"Received %d response from server: %s. Error code: %d",
response.statusCode(),
errorResponse.getString("message"),
errorResponse.getInteger("errorCode")
)));
});
handleErrorResponse(response, cf);
}
}

private static void handleCloseQueryResponse(
final HttpClientResponse response,
final CompletableFuture<Void> cf
) {
if (response.statusCode() == OK.code()) {
cf.complete(null);
} else {
handleErrorResponse(response, cf);
}
}

private static <T> void handleErrorResponse(
final HttpClientResponse response,
final CompletableFuture<T> cf
) {
response.bodyHandler(buffer -> {
final JsonObject errorResponse = buffer.toJsonObject();
cf.completeExceptionally(new KsqlRestClientException(String.format(
"Received %d response from server: %s. Error code: %d",
response.statusCode(),
errorResponse.getString("message"),
errorResponse.getInteger("errorCode")
)));
});
}

private static HttpClient createHttpClient(final Vertx vertx, final ClientOptions clientOptions) {
HttpClientOptions options = new HttpClientOptions()
.setSsl(clientOptions.isUseTls())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.api.client;

import static io.confluent.ksql.api.server.ErrorCodes.ERROR_CODE_UNKNOWN_QUERY_ID;
import static io.confluent.ksql.test.util.AssertEventually.assertThatEventually;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
Expand Down Expand Up @@ -44,6 +45,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -446,6 +448,48 @@ public void shouldHandleErrorResponseFromExecuteQuery() {
assertThat(e.getCause().getMessage(), containsString("invalid query blah"));
}

@Test
public void shouldTerminatePushQueryIssuedViaStreamQuery() throws Exception {
// Given
final StreamedQueryResult streamedQueryResult =
javaClient.streamQuery(DEFAULT_PUSH_QUERY, DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
final String queryId = streamedQueryResult.queryID();
assertThat(queryId, is(notNullValue()));

// Query is running on server, and StreamedQueryResult is not complete
assertThat(server.getQueryIDs(), hasSize(1));
assertThat(server.getQueryIDs().contains(new PushQueryId(queryId)), is(true));
assertThat(streamedQueryResult.isComplete(), is(false));

// When
javaClient.terminatePushQuery(queryId).get();

// Then: query is no longer running on server, and StreamedQueryResult is complete
assertThat(server.getQueryIDs(), hasSize(0));
assertThatEventually(streamedQueryResult::isComplete, is(true));
}

@Test
public void shouldTerminatePushQueryIssuedViaExecuteQuery() {
// Will implement once https://github.com/confluentinc/ksql/pull/5236#issuecomment-628997138
// is resolved
}

@Test
public void shouldHandleErrorResponseFromTerminatePushQuery() {
// When
final Exception e = assertThrows(
ExecutionException.class, // thrown from .get() when the future completes exceptionally
() -> javaClient.terminatePushQuery("nonexistent query ID").get()
);

// Then
assertThat(e.getCause(), instanceOf(KsqlRestClientException.class));
assertThat(e.getCause().getMessage(), containsString("Received 400 response from server"));
assertThat(e.getCause().getMessage(), containsString("No query with id"));
assertThat(e.getCause().getMessage(), containsString("Error code: " + ERROR_CODE_UNKNOWN_QUERY_ID));
}

protected Client createJavaClient() {
return Client.create(createJavaClientOptions(), vertx);
}
Expand Down

0 comments on commit 62dacca

Please sign in to comment.