Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
vpapavas committed Feb 27, 2020
1 parent 736619f commit 9cb9792
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 362 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.confluent.ksql.rest.entity.KsqlRequest;
import io.confluent.ksql.rest.entity.LagReportingMessage;
import io.confluent.ksql.rest.entity.ServerInfo;
<<<<<<< HEAD
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.util.VertxCompletableFuture;
import io.vertx.core.Vertx;
Expand All @@ -41,12 +40,8 @@
import io.vertx.core.http.HttpMethod;
import io.vertx.core.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
=======
import java.io.InputStream;
import java.net.SocketTimeoutException;
import java.util.Collections;
>>>>>>> 0693542d5... added request properties to ksql request
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -225,7 +220,8 @@ private <T> RestResponse<StreamPublisher<T>> executeQueryRequestWithStreamRespon
final Optional<Long> previousCommandSeqNum,
final Function<Buffer, T> mapper
) {
final KsqlRequest ksqlRequest = createKsqlRequest(ksql, Collections.emptyMap(), previousCommandSeqNum);
final KsqlRequest ksqlRequest = createKsqlRequest(
ksql, Collections.emptyMap(), previousCommandSeqNum);
final AtomicReference<StreamPublisher<T>> pubRef = new AtomicReference<>();
return executeSync(HttpMethod.POST, QUERY_PATH, ksqlRequest, resp -> pubRef.get(),
(resp, vcf) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -112,7 +113,7 @@ public void shouldSendKsqlRequest() {
assertThat(server.getPath(), is("/ksql"));
assertThat(server.getHeaders().get("Accept"), is("application/json"));
assertThat(getKsqlRequest(),
is(new KsqlRequest(ksql, properties, 123L)));
is(new KsqlRequest(ksql, properties, Collections.emptyMap(), 123L)));
}

@Test
Expand Down Expand Up @@ -161,7 +162,7 @@ public void shouldOverrideProperties() {
target.postKsqlRequest("some ksql", Optional.of(123L));

// Then:
assertThat(getKsqlRequest().getStreamsProperties(), is(props));
assertThat(getKsqlRequest().getConfigOverrides(), is(props));
}

@Test
Expand Down Expand Up @@ -300,14 +301,15 @@ public void shouldPostQueryRequest() {

// When:
KsqlTarget target = ksqlClient.target(serverUri);
RestResponse<List<StreamedRow>> response = target.postQueryRequest(sql, Optional.of(321L));
RestResponse<List<StreamedRow>> response = target.postQueryRequest(
sql, Collections.emptyMap(), Optional.of(321L));

// Then:
assertThat(server.getHttpMethod(), is(HttpMethod.POST));

assertThat(server.getPath(), is("/query"));
assertThat(server.getHeaders().get("Accept"), is("application/json"));
assertThat(getKsqlRequest(), is(new KsqlRequest(sql, properties, 321L)));
assertThat(getKsqlRequest(), is(new KsqlRequest(sql, properties, Collections.emptyMap(), 321L)));
assertThat(response.get(), is(expectedResponse));
}

Expand All @@ -330,7 +332,7 @@ public void shouldPostQueryRequestStreamed() throws Exception {

assertThat(server.getPath(), is("/query"));
assertThat(server.getHeaders().get("Accept"), is("application/json"));
assertThat(getKsqlRequest(), is(new KsqlRequest(sql, properties, 321L)));
assertThat(getKsqlRequest(), is(new KsqlRequest(sql, properties, Collections.emptyMap(), 321L)));

List<StreamedRow> rows = getElementsFromPublisher(numRows, response.getResponse());
assertThat(rows, is(expectedResponse));
Expand All @@ -355,7 +357,7 @@ public void shouldPostQueryRequestStreamedWithLimit() throws Exception {

assertThat(server.getPath(), is("/query"));
assertThat(server.getHeaders().get("Accept"), is("application/json"));
assertThat(getKsqlRequest(), is(new KsqlRequest(sql, properties, 321L)));
assertThat(getKsqlRequest(), is(new KsqlRequest(sql, properties, Collections.emptyMap(), 321L)));

List<StreamedRow> rows = getElementsFromPublisher(numRows + 1, response.getResponse());
assertThat(rows, is(expectedResponse));
Expand All @@ -374,7 +376,7 @@ public void shouldCloseConnectionWhenQueryStreamIsClosed() throws Exception {
.postQueryRequestStreamed(sql, Optional.of(321L));

// Then:
assertThat(getKsqlRequest(), is(new KsqlRequest(sql, properties, 321L)));
assertThat(getKsqlRequest(), is(new KsqlRequest(sql, properties, Collections.emptyMap(), 321L)));

// When:
response.getResponse().close();
Expand All @@ -401,7 +403,7 @@ public void shouldExecutePrintTopic() throws Exception {

assertThat(server.getPath(), is("/query"));
assertThat(server.getHeaders().get("Accept"), is("application/json"));
assertThat(getKsqlRequest(), is(new KsqlRequest(command, properties, 123L)));
assertThat(getKsqlRequest(), is(new KsqlRequest(command, properties, Collections.emptyMap(), 123L)));

List<String> lines = getElementsFromPublisher(numRows, response.getResponse());
assertThat(lines, is(expectedResponse));
Expand All @@ -420,7 +422,7 @@ public void shouldCloseConnectionWhenPrintTopicPublisherIsClosed() throws Except
.postPrintTopicRequest(command, Optional.of(123L));

// Then:
assertThat(getKsqlRequest(), is(new KsqlRequest(command, properties, 123L)));
assertThat(getKsqlRequest(), is(new KsqlRequest(command, properties, Collections.emptyMap(), 123L)));

// When:
response.getResponse().close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.json.JsonObject;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
Expand Down Expand Up @@ -170,15 +171,15 @@ public void shouldSerialiseDeserialise() {
// Given:
Map<String, Object> props = new HashMap<>();
props.put("auto.offset.reset", "latest");
KsqlRequest request = new KsqlRequest("some ksql", props, 21345L);
KsqlRequest request = new KsqlRequest("some ksql", props, Collections.emptyMap(), 21345L);

// When:
Buffer buff = KsqlClientUtil.serialize(request);

// Then:
assertThat(buff, is(notNullValue()));
String expectedJson = "{\"ksql\":\"some ksql\",\"streamsProperties\":{\"auto.offset.reset\":\""
+ "latest\"},\"commandSequenceNumber\":21345}";
String expectedJson = "{\"ksql\":\"some ksql\",\"configOverrides\":{\"auto.offset.reset\":\""
+ "latest\"},\"requestProperties\":{},\"commandSequenceNumber\":21345}";
assertThat(new JsonObject(buff), is(new JsonObject(expectedJson)));

// When:
Expand Down
Loading

0 comments on commit 9cb9792

Please sign in to comment.