From dfe4e0a6a4380649bc24d59866e03ca992ce56b4 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Fri, 10 Sep 2021 17:28:45 -0500 Subject: [PATCH] feat: add stream pull queries to HTTP/2 --- .../integration/ClientIntegrationTest.java | 496 ++++----------- .../ClientMutationIntegrationTest.java | 593 ++++++++++++++++++ .../confluent/ksql/embedded/KsqlContext.java | 2 - .../confluent/ksql/engine/EngineExecutor.java | 8 +- .../io/confluent/ksql/engine/KsqlEngine.java | 72 +-- .../engine/SandboxedExecutionContext.java | 2 +- .../ksql/query/BlockingRowQueue.java | 6 + ...ueCallback.java => CompletionHandler.java} | 10 +- .../ksql/query/LimitedQueueCallback.java | 61 -- .../confluent/ksql/query/PullQueryQueue.java | 6 + .../io/confluent/ksql/query/QueryBuilder.java | 164 ++++- .../confluent/ksql/query/QueryRegistry.java | 5 +- .../ksql/query/QueryRegistryImpl.java | 8 +- .../confluent/ksql/query/QueueCallback.java | 40 -- .../ksql/query/TransientQueryQueue.java | 66 +- .../ksql/query/UnlimitedQueueCallback.java | 35 -- .../ksql/util/PushQueryMetadata.java | 3 + .../util/SandboxedTransientQueryMetadata.java | 6 + .../ksql/util/ScalablePushQueryMetadata.java | 6 + .../ksql/util/TransientQueryMetadata.java | 6 + .../ksql/embedded/KsqlContextTestUtil.java | 2 - .../ksql/engine/KsqlEngineTestUtil.java | 4 - .../ksql/integration/JsonFormatTest.java | 3 - .../integration/SecureIntegrationTest.java | 2 - .../ksql/query/LimitedQueueCallbackTest.java | 119 ---- .../ksql/query/QueryBuilderTest.java | 3 +- .../ksql/query/QueryRegistryImplTest.java | 8 +- .../util/StructuredTypesDataProvider.java | 6 +- .../ksql/test/tools/TestExecutor.java | 2 - .../ksql/test/driver/KsqlTesterTest.java | 2 - .../pull-queries-against-streams.json | 104 ++- .../ksql/api/impl/BlockingQueryPublisher.java | 12 + .../ksql/api/impl/QueryEndpoint.java | 42 +- .../ksql/api/server/LoggingHandler.java | 4 +- .../ksql/rest/server/KsqlRestApplication.java | 2 - .../server/StandaloneExecutorFactory.java | 4 - .../streaming/QueryStreamWriter.java | 27 +- .../streaming/StreamedQueryResource.java | 35 +- .../server/StandaloneExecutorFactoryTest.java | 3 - .../StandaloneExecutorFunctionalTest.java | 3 - .../streaming/QueryStreamWriterTest.java | 13 +- .../streaming/StreamedQueryResourceTest.java | 7 + 42 files changed, 1165 insertions(+), 837 deletions(-) create mode 100644 ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientMutationIntegrationTest.java rename ksqldb-engine/src/main/java/io/confluent/ksql/query/{LimitQueueCallback.java => CompletionHandler.java} (70%) delete mode 100644 ksqldb-engine/src/main/java/io/confluent/ksql/query/LimitedQueueCallback.java delete mode 100644 ksqldb-engine/src/main/java/io/confluent/ksql/query/QueueCallback.java delete mode 100644 ksqldb-engine/src/main/java/io/confluent/ksql/query/UnlimitedQueueCallback.java delete mode 100644 ksqldb-engine/src/test/java/io/confluent/ksql/query/LimitedQueueCallbackTest.java diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java index f3fb7a8de876..089dbb945b26 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java @@ -33,7 +33,9 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.fail; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableListMultimap; @@ -42,7 +44,6 @@ import io.confluent.common.utils.IntegrationTest; import io.confluent.ksql.GenericKey; import io.confluent.ksql.GenericRow; -import io.confluent.ksql.api.client.AcksPublisher; import io.confluent.ksql.api.client.BatchedQueryResult; import io.confluent.ksql.api.client.Client; import io.confluent.ksql.api.client.Client.HttpResponse; @@ -51,9 +52,6 @@ import io.confluent.ksql.api.client.ConnectorDescription; import io.confluent.ksql.api.client.ConnectorInfo; import io.confluent.ksql.api.client.ConnectorType; -import io.confluent.ksql.api.client.ExecuteStatementResult; -import io.confluent.ksql.api.client.InsertAck; -import io.confluent.ksql.api.client.InsertsPublisher; import io.confluent.ksql.api.client.KsqlArray; import io.confluent.ksql.api.client.KsqlObject; import io.confluent.ksql.api.client.QueryInfo; @@ -87,6 +85,7 @@ import io.confluent.ksql.serde.SerdeFeature; import io.confluent.ksql.serde.SerdeFeatures; import io.confluent.ksql.util.AppInfo; +import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.StructuredTypesDataProvider; import io.confluent.ksql.util.TestDataProvider; import io.vertx.core.Vertx; @@ -102,18 +101,16 @@ import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; -import java.time.Duration; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; import kafka.zookeeper.ZooKeeperClientException; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.json.JsonConverter; @@ -170,12 +167,8 @@ public class ClientIntegrationTest { private static final String EMPTY_TEST_TOPIC = EMPTY_TEST_DATA_PROVIDER.topicName(); private static final String EMPTY_TEST_STREAM = EMPTY_TEST_DATA_PROVIDER.sourceName(); - private static final TestDataProvider EMPTY_TEST_DATA_PROVIDER_2 = new TestDataProvider( - "EMPTY_STRUCTURED_TYPES_2", TEST_DATA_PROVIDER.schema(), ImmutableListMultimap.of()); - private static final String EMPTY_TEST_TOPIC_2 = EMPTY_TEST_DATA_PROVIDER_2.topicName(); - private static final String EMPTY_TEST_STREAM_2 = EMPTY_TEST_DATA_PROVIDER_2.sourceName(); - private static final String PUSH_QUERY = "SELECT * FROM " + TEST_STREAM + " EMIT CHANGES;"; + private static final String PULL_QUERY_ON_STREAM = "SELECT * FROM " + TEST_STREAM + ";"; private static final String PULL_QUERY_ON_TABLE = "SELECT * from " + AGG_TABLE + " WHERE K=" + AN_AGG_KEY + ";"; private static final int PUSH_QUERY_LIMIT_NUM_ROWS = 2; @@ -189,18 +182,6 @@ public class ClientIntegrationTest { .add(new KsqlObject().put("F1", new KsqlArray().add("a"))) .add(1); - private static final KsqlObject COMPLEX_FIELD_VALUE = new KsqlObject() - .put("DECIMAL", new BigDecimal("1.1")) - .put("STRUCT", new KsqlObject().put("F1", "foo").put("F2", 3)) - .put("ARRAY_ARRAY", new KsqlArray().add(new KsqlArray().add("bar"))) - .put("ARRAY_STRUCT", new KsqlArray().add(new KsqlObject().put("F1", "x"))) - .put("ARRAY_MAP", new KsqlArray().add(new KsqlObject().put("k", 10))) - .put("MAP_ARRAY", new KsqlObject().put("k", new KsqlArray().add("e1").add("e2"))) - .put("MAP_MAP", new KsqlObject().put("k1", new KsqlObject().put("k2", 5))) - .put("MAP_STRUCT", new KsqlObject().put("k", new KsqlObject().put("F1", "baz"))); - private static final KsqlObject EXPECTED_COMPLEX_FIELD_VALUE = COMPLEX_FIELD_VALUE.copy() - .put("DECIMAL", 1.1d); // Expect raw decimal value, whereas put(BigDecimal) serializes as string to avoid loss of precision - protected static final String EXECUTE_STATEMENT_REQUEST_ACCEPTED_DOC = "The ksqlDB server accepted the statement issued via executeStatement(), but the response " + "received is of an unexpected format. "; @@ -238,11 +219,10 @@ public class ClientIntegrationTest { @BeforeClass public static void setUpClass() throws Exception { - TEST_HARNESS.ensureTopics(TEST_TOPIC, EMPTY_TEST_TOPIC, EMPTY_TEST_TOPIC_2); + TEST_HARNESS.ensureTopics(TEST_TOPIC, EMPTY_TEST_TOPIC); TEST_HARNESS.produceRows(TEST_TOPIC, TEST_DATA_PROVIDER, KEY_FORMAT, VALUE_FORMAT); RestIntegrationTestUtil.createStream(REST_APP, TEST_DATA_PROVIDER); RestIntegrationTestUtil.createStream(REST_APP, EMPTY_TEST_DATA_PROVIDER); - RestIntegrationTestUtil.createStream(REST_APP, EMPTY_TEST_DATA_PROVIDER_2); makeKsqlRequest("CREATE TABLE " + AGG_TABLE + " AS " + "SELECT K, LATEST_BY_OFFSET(LONG) AS LONG FROM " + TEST_STREAM + " GROUP BY K;" @@ -280,9 +260,9 @@ public static void setUpClass() throws Exception { } private static void writeConnectConfigs(final String path, final Map configs) throws Exception { - try (PrintWriter out = new PrintWriter(new OutputStreamWriter( + try (final PrintWriter out = new PrintWriter(new OutputStreamWriter( new FileOutputStream(path, true), StandardCharsets.UTF_8))) { - for (Map.Entry entry : configs.entrySet()) { + for (final Map.Entry entry : configs.entrySet()) { out.println(entry.getKey() + "=" + entry.getValue()); } } @@ -372,6 +352,103 @@ public void shouldStreamPushQuerySync() throws Exception { assertThat(streamedQueryResult.isComplete(), is(false)); } + @Test + public void shouldStreamPullQueryOnStreamAsync() throws Exception { + // When + final StreamedQueryResult streamedQueryResult = + client.streamQuery( + PULL_QUERY_ON_STREAM, + ImmutableMap.of(KsqlConfig.KSQL_QUERY_STREAM_PULL_QUERY_ENABLED, true) + ).get(); + + // Then + assertThat(streamedQueryResult.columnNames(), is(TEST_COLUMN_NAMES)); + assertThat(streamedQueryResult.columnTypes(), is(TEST_COLUMN_TYPES)); + assertThat(streamedQueryResult.queryID(), is(notNullValue())); + + shouldReceiveRows( + streamedQueryResult, + 6, + rows -> verifyStreamRows(rows, 6), + true + ); + } + + @Test + public void shouldStreamPullQueryOnStreamSync() throws Exception { + // When + final StreamedQueryResult streamedQueryResult = client.streamQuery( + PULL_QUERY_ON_STREAM, + ImmutableMap.of(KsqlConfig.KSQL_QUERY_STREAM_PULL_QUERY_ENABLED, true) + ).get(); + + // Then + assertThat(streamedQueryResult.columnNames(), is(TEST_COLUMN_NAMES)); + assertThat(streamedQueryResult.columnTypes(), is(TEST_COLUMN_TYPES)); + assertThat(streamedQueryResult.queryID(), is(notNullValue())); + + final List results = new LinkedList<>(); + Row row; + while (true) { + row = streamedQueryResult.poll(); + if (row == null) { + break; + } else { + results.add(row); + } + } + + verifyStreamRows(results, 6); + + assertThatEventually(streamedQueryResult::isComplete, is(true)); + } + + @Test + public void shouldStreamPullQueryOnEmptyStreamSync() throws Exception { + // When + final StreamedQueryResult streamedQueryResult = client.streamQuery( + "SELECT * FROM " + EMPTY_TEST_STREAM + ";", + ImmutableMap.of(KsqlConfig.KSQL_QUERY_STREAM_PULL_QUERY_ENABLED, true) + ).get(); + + // Then + assertThat(streamedQueryResult.columnNames(), is(TEST_COLUMN_NAMES)); + assertThat(streamedQueryResult.columnTypes(), is(TEST_COLUMN_TYPES)); + assertThat(streamedQueryResult.queryID(), is(notNullValue())); + + final List results = new LinkedList<>(); + Row row; + while (true) { + row = streamedQueryResult.poll(); + if (row == null) { + break; + } else { + results.add(row); + } + } + + verifyStreamRows(results, 0); + + assertThatEventually(streamedQueryResult::isComplete, is(true)); + } + + @Test + public void shouldRejectPullQueryOnStreamByDefault() { + // When + final CompletableFuture future = client.streamQuery(PULL_QUERY_ON_STREAM); + final ExecutionException exception = assertThrows(ExecutionException.class, future::get); + + assertThat(exception.getCause(), instanceOf(KsqlClientException.class)); + assertThat(exception.getMessage(), containsString( + "Received 400 response from server:" + + " Pull queries on streams are disabled." + + " To create a push query on the stream, add EMIT CHANGES to the end." + + " To enable pull queries on streams," + + " set the ksql.query.pull.stream.enabled config to 'true'.\n" + + "Statement: SELECT * FROM STRUCTURED_TYPES_KSTREAM;." + )); + } + @Test public void shouldStreamPullQueryOnTableAsync() throws Exception { // When @@ -478,7 +555,7 @@ public void shouldAllowSubscribeStreamedQueryResultIfComplete() throws Exception assertThatEventually(streamedQueryResult::isComplete, is(true)); // When - TestSubscriber subscriber = subscribeAndWait(streamedQueryResult); + final TestSubscriber subscriber = subscribeAndWait(streamedQueryResult); assertThat(subscriber.getValues(), hasSize(0)); subscriber.getSub().request(PUSH_QUERY_LIMIT_NUM_ROWS); @@ -617,307 +694,14 @@ public void shouldHandleErrorResponseFromTerminatePushQuery() { assertThat(e.getCause().getMessage(), containsString("No query with id NONEXISTENT")); } - @Test - public void shouldInsertInto() throws Exception { - // Given - final KsqlObject insertRow = new KsqlObject() - .put("k", new KsqlObject().put("F1", new KsqlArray().add("my_key"))) // Column names are case-insensitive - .put("str", "HELLO") // Column names are case-insensitive - .put("`LONG`", 100L) // Backticks may be used to preserve case-sensitivity - .put("\"DEC\"", new BigDecimal("13.31")) // Double quotes may also be used to preserve case-sensitivity - .put("BYTES_", new byte[]{0, 1, 2}) - .put("ARRAY", new KsqlArray().add("v1").add("v2")) - .put("MAP", new KsqlObject().put("some_key", "a_value").put("another_key", "")) - .put("STRUCT", new KsqlObject().put("f1", 12)) // Nested field names are case-insensitive - .put("COMPLEX", COMPLEX_FIELD_VALUE) - .put("TIMESTAMP", "1970-01-01T00:00:00.001") - .put("DATE", "1970-01-01") - .put("TIME", "00:00:01"); - - // When - client.insertInto(EMPTY_TEST_STREAM.toLowerCase(), insertRow).get(); // Stream name is case-insensitive - - // Then: should receive new row - final String query = "SELECT * FROM " + EMPTY_TEST_STREAM + " EMIT CHANGES LIMIT 1;"; - final List rows = client.executeQuery(query).get(); - - // Verify inserted row is as expected - assertThat(rows, hasSize(1)); - assertThat(rows.get(0).getKsqlObject("K"), is(new KsqlObject().put("F1", new KsqlArray().add("my_key")))); - assertThat(rows.get(0).getString("STR"), is("HELLO")); - assertThat(rows.get(0).getLong("LONG"), is(100L)); - assertThat(rows.get(0).getDecimal("DEC"), is(new BigDecimal("13.31"))); - assertThat(rows.get(0).getBytes("BYTES_"), is(new byte[]{0, 1, 2})); - assertThat(rows.get(0).getKsqlArray("ARRAY"), is(new KsqlArray().add("v1").add("v2"))); - assertThat(rows.get(0).getKsqlObject("MAP"), is(new KsqlObject().put("some_key", "a_value").put("another_key", ""))); - assertThat(rows.get(0).getKsqlObject("STRUCT"), is(new KsqlObject().put("F1", 12))); - assertThat(rows.get(0).getKsqlObject("COMPLEX"), is(EXPECTED_COMPLEX_FIELD_VALUE)); - assertThat(rows.get(0).getString("TIMESTAMP"), is("1970-01-01T00:00:00.001")); - assertThat(rows.get(0).getString("DATE"), is("1970-01-01")); - assertThat(rows.get(0).getString("TIME"), is("00:00:01")); - } - - @Test - public void shouldHandleErrorResponseFromInsertInto() { - // Given - final KsqlObject insertRow = new KsqlObject() - .put("K", new KsqlObject().put("F1", new KsqlArray().add("my_key"))) - .put("LONG", 11L); - - // When - final Exception e = assertThrows( - ExecutionException.class, // thrown from .get() when the future completes exceptionally - () -> client.insertInto(AGG_TABLE, insertRow).get() - ); - - // Then - assertThat(e.getCause(), instanceOf(KsqlClientException.class)); - assertThat(e.getCause().getMessage(), containsString("Received 400 response from server")); - assertThat(e.getCause().getMessage(), containsString("Cannot insert into a table")); - } - - @Test - public void shouldStreamQueryWithProperties() throws Exception { - // Given - final Map properties = new HashMap<>(); - properties.put("auto.offset.reset", "latest"); - final String sql = "SELECT * FROM " + TEST_STREAM + " EMIT CHANGES LIMIT 1;"; - - final KsqlObject insertRow = new KsqlObject() - .put("K", new KsqlObject().put("F1", new KsqlArray().add("my_key_shouldStreamQueryWithProperties"))) - .put("STR", "Value_shouldStreamQueryWithProperties") - .put("LONG", 2000L) - .put("DEC", new BigDecimal("12.34")) - .put("BYTES_", new byte[]{0, 1, 2}) - .put("ARRAY", new KsqlArray().add("v1_shouldStreamQueryWithProperties").add("v2_shouldStreamQueryWithProperties")) - .put("MAP", new KsqlObject().put("test_name", "shouldStreamQueryWithProperties")) - .put("STRUCT", new KsqlObject().put("F1", 4)) - .put("COMPLEX", COMPLEX_FIELD_VALUE) - .put("TIMESTAMP", "1970-01-01T00:00:00.001") - .put("DATE", "1970-01-01") - .put("TIME", "00:00:00"); - - // When - final StreamedQueryResult queryResult = client.streamQuery(sql, properties).get(); - - // Then: a newly inserted row arrives - final Row row = assertThatEventually(() -> { - // Potentially try inserting multiple times, in case the query wasn't started by the first time - try { - client.insertInto(TEST_STREAM, insertRow).get(); - } catch (Exception e) { - throw new RuntimeException(e); - } - return queryResult.poll(Duration.ofMillis(10)); - }, is(notNullValue())); - - assertThat(row.getKsqlObject("K"), is(new KsqlObject().put("F1", new KsqlArray().add("my_key_shouldStreamQueryWithProperties")))); - assertThat(row.getString("STR"), is("Value_shouldStreamQueryWithProperties")); - assertThat(row.getLong("LONG"), is(2000L)); - assertThat(row.getDecimal("DEC"), is(new BigDecimal("12.34"))); - assertThat(row.getBytes("BYTES_"), is(new byte[]{0, 1, 2})); - assertThat(row.getKsqlArray("ARRAY"), is(new KsqlArray().add("v1_shouldStreamQueryWithProperties").add("v2_shouldStreamQueryWithProperties"))); - assertThat(row.getKsqlObject("MAP"), is(new KsqlObject().put("test_name", "shouldStreamQueryWithProperties"))); - assertThat(row.getKsqlObject("STRUCT"), is(new KsqlObject().put("F1", 4))); - assertThat(row.getKsqlObject("COMPLEX"), is(EXPECTED_COMPLEX_FIELD_VALUE)); - assertThat(row.getString("TIMESTAMP"), is("1970-01-01T00:00:00.001")); - assertThat(row.getString("DATE"), is("1970-01-01")); - assertThat(row.getString("TIME"), is("00:00")); - } - - @Test - public void shouldExecuteQueryWithProperties() { - // Given - final Map properties = new HashMap<>(); - properties.put("auto.offset.reset", "latest"); - final String sql = "SELECT * FROM " + TEST_STREAM + " EMIT CHANGES LIMIT 1;"; - - final KsqlObject insertRow = new KsqlObject() - .put("K", new KsqlObject().put("F1", new KsqlArray().add("my_key_shouldExecuteQueryWithProperties"))) - .put("STR", "Value_shouldExecuteQueryWithProperties") - .put("LONG", 2000L) - .put("DEC", new BigDecimal("12.34")) - .put("BYTES_", new byte[]{0, 1, 2}) - .put("ARRAY", new KsqlArray().add("v1_shouldExecuteQueryWithProperties").add("v2_shouldExecuteQueryWithProperties")) - .put("MAP", new KsqlObject().put("test_name", "shouldExecuteQueryWithProperties")) - .put("STRUCT", new KsqlObject().put("F1", 4)) - .put("COMPLEX", COMPLEX_FIELD_VALUE) - .put("TIMESTAMP", "1970-01-01T00:00:00.001") - .put("DATE", "1970-01-01") - .put("TIME", "00:00:00"); - - // When - final BatchedQueryResult queryResult = client.executeQuery(sql, properties); - - // Then: a newly inserted row arrives - - // Wait for row to arrive - final AtomicReference rowRef = new AtomicReference<>(); - new Thread(() -> { - try { - final List rows = queryResult.get(); - assertThat(rows, hasSize(1)); - rowRef.set(rows.get(0)); - } catch (Exception e) { - throw new RuntimeException(e); - } - }).start(); - - // Insert a new row - final Row row = assertThatEventually(() -> { - // Potentially try inserting multiple times, in case the query wasn't started by the first time - try { - client.insertInto(TEST_STREAM, insertRow).get(); - } catch (Exception e) { - throw new RuntimeException(e); - } - return rowRef.get(); - }, is(notNullValue())); - - // Verify received row - assertThat(row.getKsqlObject("K"), is(new KsqlObject().put("F1", new KsqlArray().add("my_key_shouldExecuteQueryWithProperties")))); - assertThat(row.getString("STR"), is("Value_shouldExecuteQueryWithProperties")); - assertThat(row.getLong("LONG"), is(2000L)); - assertThat(row.getDecimal("DEC"), is(new BigDecimal("12.34"))); - assertThat(row.getBytes("BYTES_"), is(new byte[]{0, 1, 2})); - assertThat(row.getKsqlArray("ARRAY"), is(new KsqlArray().add("v1_shouldExecuteQueryWithProperties").add("v2_shouldExecuteQueryWithProperties"))); - assertThat(row.getKsqlObject("MAP"), is(new KsqlObject().put("test_name", "shouldExecuteQueryWithProperties"))); - assertThat(row.getKsqlObject("STRUCT"), is(new KsqlObject().put("F1", 4))); - assertThat(row.getKsqlObject("COMPLEX"), is(EXPECTED_COMPLEX_FIELD_VALUE)); - assertThat(row.getString("TIMESTAMP"), is("1970-01-01T00:00:00.001")); - assertThat(row.getString("DATE"), is("1970-01-01")); - assertThat(row.getString("TIME"), is("00:00")); - } - - @Test - public void shouldStreamInserts() throws Exception { - // Given - final InsertsPublisher insertsPublisher = new InsertsPublisher(); - final int numRows = 5; - - // When - final AcksPublisher acksPublisher = client.streamInserts(EMPTY_TEST_STREAM_2, insertsPublisher).get(); - - TestSubscriber acksSubscriber = subscribeAndWait(acksPublisher); - assertThat(acksSubscriber.getValues(), hasSize(0)); - acksSubscriber.getSub().request(numRows); - - for (int i = 0; i < numRows; i++) { - insertsPublisher.accept(new KsqlObject() - .put("K", new KsqlObject().put("F1", new KsqlArray().add("my_key_" + i))) - .put("STR", "TEST_" + i) - .put("LONG", i) - .put("DEC", new BigDecimal("13.31")) - .put("BYTES_", new byte[]{0, 1, 2}) - .put("ARRAY", new KsqlArray().add("v_" + i)) - .put("MAP", new KsqlObject().put("k_" + i, "v_" + i)) - .put("COMPLEX", COMPLEX_FIELD_VALUE) - .put("TIMESTAMP", "1970-01-01T00:00:00.001") - .put("DATE", "1970-01-01") - .put("TIME", "00:00")); - } - - // Then - assertThatEventually(acksSubscriber::getValues, hasSize(numRows)); - for (int i = 0; i < numRows; i++) { - assertThat(acksSubscriber.getValues().get(i).seqNum(), is(Long.valueOf(i))); - } - assertThat(acksSubscriber.getError(), is(nullValue())); - assertThat(acksSubscriber.isCompleted(), is(false)); - - assertThat(acksPublisher.isComplete(), is(false)); - assertThat(acksPublisher.isFailed(), is(false)); - - // Then: should receive new rows - final String query = "SELECT * FROM " + EMPTY_TEST_STREAM_2 + " EMIT CHANGES LIMIT " + numRows + ";"; - final List rows = client.executeQuery(query).get(); - - // Verify inserted rows are as expected - assertThat(rows, hasSize(numRows)); - for (int i = 0; i < numRows; i++) { - assertThat(rows.get(i).getKsqlObject("K"), is(new KsqlObject().put("F1", new KsqlArray().add("my_key_" + i)))); - assertThat(rows.get(i).getString("STR"), is("TEST_" + i)); - assertThat(rows.get(i).getLong("LONG"), is(Long.valueOf(i))); - assertThat(rows.get(i).getDecimal("DEC"), is(new BigDecimal("13.31"))); - assertThat(rows.get(i).getBytes("BYTES_"), is(new byte[]{0, 1, 2})); - assertThat(rows.get(i).getKsqlArray("ARRAY"), is(new KsqlArray().add("v_" + i))); - assertThat(rows.get(i).getKsqlObject("MAP"), is(new KsqlObject().put("k_" + i, "v_" + i))); - assertThat(rows.get(i).getKsqlObject("COMPLEX"), is(EXPECTED_COMPLEX_FIELD_VALUE)); - assertThat(rows.get(i).getString("TIMESTAMP"), is("1970-01-01T00:00:00.001")); - assertThat(rows.get(i).getString("DATE"), is("1970-01-01")); - assertThat(rows.get(i).getString("TIME"), is("00:00")); - } - - // When: end connection - insertsPublisher.complete(); - - // Then - assertThatEventually(acksSubscriber::isCompleted, is(true)); - assertThat(acksSubscriber.getError(), is(nullValue())); - - assertThat(acksPublisher.isComplete(), is(true)); - assertThat(acksPublisher.isFailed(), is(false)); - } - - @Test - public void shouldHandleErrorResponseFromStreamInserts() { - // When - final Exception e = assertThrows( - ExecutionException.class, // thrown from .get() when the future completes exceptionally - () -> client.streamInserts(AGG_TABLE, new InsertsPublisher()).get() - ); - - // Then - assertThat(e.getCause(), instanceOf(KsqlClientException.class)); - assertThat(e.getCause().getMessage(), containsString("Received 400 response from server")); - assertThat(e.getCause().getMessage(), containsString("Cannot insert into a table")); - } - - @Test - public void shouldExecuteDdlDmlStatements() throws Exception { - // Given - final String streamName = TEST_STREAM + "_COPY"; - final String csas = "create stream " + streamName + " as select * from " + TEST_STREAM + " emit changes;"; - - final int numInitialStreams = 3; - final int numInitialQueries = 1; - verifyNumStreams(numInitialStreams); - verifyNumQueries(numInitialQueries); - - // When: create stream, start persistent query - final ExecuteStatementResult csasResult = client.executeStatement(csas).get(); - - // Then - verifyNumStreams(numInitialStreams + 1); - verifyNumQueries(numInitialQueries + 1); - assertThat(csasResult.queryId(), is(Optional.of(findQueryIdForSink(streamName)))); - - // When: terminate persistent query - final String queryId = csasResult.queryId().get(); - final ExecuteStatementResult terminateResult = - client.executeStatement("terminate " + queryId + ";").get(); - - // Then - verifyNumQueries(numInitialQueries); - assertThat(terminateResult.queryId(), is(Optional.empty())); - - // When: drop stream - final ExecuteStatementResult dropStreamResult = - client.executeStatement("drop stream " + streamName + ";").get(); - - // Then - verifyNumStreams(numInitialStreams); - assertThat(dropStreamResult.queryId(), is(Optional.empty())); - } - @Test public void shouldExecutePlainHttpRequests() throws Exception { - HttpResponse response = client.buildRequest("GET", "/info").send().get(); + final HttpResponse response = client.buildRequest("GET", "/info").send().get(); assertThat(response.status(), is(200)); - Map> info = response.bodyAsMap(); + final Map> info = response.bodyAsMap(); final ServerInfo serverInfo = client.serverInfo().get(); - Map innerMap = info.get("KsqlServerInfo"); + final Map innerMap = info.get("KsqlServerInfo"); assertThat(innerMap.get("version"), is(serverInfo.getServerVersion())); assertThat(innerMap.get("ksqlServiceId"), is(serverInfo.getKsqlServiceId())); @@ -1020,8 +804,7 @@ public void shouldListStreams() throws Exception { // Then assertThat("" + streams, streams, containsInAnyOrder( streamForProvider(TEST_DATA_PROVIDER), - streamForProvider(EMPTY_TEST_DATA_PROVIDER), - streamForProvider(EMPTY_TEST_DATA_PROVIDER_2) + streamForProvider(EMPTY_TEST_DATA_PROVIDER) )); } @@ -1046,7 +829,6 @@ public void shouldListTopics() throws Exception { assertThat("" + topics, topics, containsInAnyOrder( topicInfo(TEST_TOPIC), topicInfo(EMPTY_TEST_TOPIC), - topicInfo(EMPTY_TEST_TOPIC_2), topicInfo(AGG_TABLE), topicInfo("connect-config") )); @@ -1060,7 +842,7 @@ public void shouldListQueries() throws ExecutionException, InterruptedException // Then assertThat(queries, hasItem(allOf( hasProperty("queryType", is(QueryType.PERSISTENT)), - hasProperty("id", is("CTAS_" + AGG_TABLE + "_5")), + hasProperty("id", startsWith("CTAS_" + AGG_TABLE)), hasProperty("sql", is( "CREATE TABLE " + AGG_TABLE + " WITH (KAFKA_TOPIC='" + AGG_TABLE + "', PARTITIONS=1, REPLICAS=1) AS SELECT\n" + " " + TEST_STREAM + ".K K,\n" @@ -1093,7 +875,7 @@ public void shouldDescribeSource() throws Exception { assertThat(description.valueFormat(), is("JSON")); assertThat(description.readQueries(), hasSize(1)); assertThat(description.readQueries().get(0).getQueryType(), is(QueryType.PERSISTENT)); - assertThat(description.readQueries().get(0).getId(), is("CTAS_" + AGG_TABLE + "_5")); + assertThat(description.readQueries().get(0).getId(), startsWith("CTAS_" + AGG_TABLE)); assertThat(description.readQueries().get(0).getSql(), is( "CREATE TABLE " + AGG_TABLE + " WITH (KAFKA_TOPIC='" + AGG_TABLE + "', PARTITIONS=1, REPLICAS=1) AS SELECT\n" + " " + TEST_STREAM + ".K K,\n" @@ -1189,7 +971,7 @@ public void shouldDropConnector() throws Exception { assertThatEventually(() -> { try { return client.listConnectors().get().size(); - } catch (InterruptedException | ExecutionException e) { + } catch (final InterruptedException | ExecutionException e) { return null; } }, is(0)); @@ -1205,7 +987,7 @@ public void shouldCreateConnector() throws Exception { () -> { try { return (client.describeConnector("FOO").get()).state(); - } catch (InterruptedException | ExecutionException e) { + } catch (final InterruptedException | ExecutionException e) { return null; } }, @@ -1224,7 +1006,7 @@ public void shouldCreateConnectorWithVariables() throws Exception { () -> { try { return (client.describeConnector("FOO").get()).state(); - } catch (InterruptedException | ExecutionException e) { + } catch (final InterruptedException | ExecutionException e) { return null; } }, @@ -1239,26 +1021,6 @@ private Client createClient() { return Client.create(clientOptions, vertx); } - private void verifyNumStreams(final int numStreams) { - assertThatEventually(() -> { - try { - return client.listStreams().get().size(); - } catch (Exception e) { - return -1; - } - }, is(numStreams)); - } - - private void verifyNumQueries(final int numQueries) { - assertThatEventually(() -> { - try { - return client.listQueries().get().size(); - } catch (Exception e) { - return -1; - } - }, is(numQueries)); - } - private void givenConnectorExists() { // Make sure we are starting from a clean slate before creating a new connector. cleanupConnectors(); @@ -1269,7 +1031,7 @@ private void givenConnectorExists() { () -> { try { return ((ConnectorList) makeKsqlRequest("SHOW CONNECTORS;").get(0)).getConnectors().size(); - } catch (AssertionError e) { + } catch (final AssertionError e) { return 0; }}, is(1) @@ -1286,21 +1048,12 @@ private static void cleanupConnectors() { ); } - private String findQueryIdForSink(final String sinkName) throws Exception { - final List queryIds = client.listQueries().get().stream() - .filter(q -> q.getSink().equals(Optional.of(sinkName))) - .map(QueryInfo::getId) - .collect(Collectors.toList()); - assertThat(queryIds, hasSize(1)); - return queryIds.get(0); - } - private static List makeKsqlRequest(final String sql) { return RestIntegrationTestUtil.makeKsqlRequest(REST_APP, sql); } private static void verifyNumActiveQueries(final int numQueries) { - KsqlEngine engine = (KsqlEngine) REST_APP.getEngine(); + final KsqlEngine engine = (KsqlEngine) REST_APP.getEngine(); assertThatEventually(engine::numberOfLiveQueries, is(numQueries)); } @@ -1325,10 +1078,19 @@ private static void shouldReceiveStreamRows( } private static void verifyStreamRows(final List rows, final int numRows) { - assertThat(rows, hasSize(numRows)); - for (int i = 0; i < numRows; i++) { + for (int i = 0; i < Math.min(numRows, rows.size()); i++) { verifyStreamRowWithIndex(rows.get(i), i); } + if (rows.size() < numRows) { + fail("Expected " + numRows + " but only got " + rows.size()); + } else if (rows.size() > numRows) { + final List extra = rows.subList(numRows, rows.size()); + fail("Expected " + numRows + " but got " + rows.size() + ". The extra rows were: " + extra); + } + + // not strictly necessary after the other checks, but just to specify the invariant + assertThat(rows, hasSize(numRows)); + } private static void verifyStreamRowWithIndex(final Row row, final int index) { diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientMutationIntegrationTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientMutationIntegrationTest.java new file mode 100644 index 000000000000..c34084d17745 --- /dev/null +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientMutationIntegrationTest.java @@ -0,0 +1,593 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.client.integration; + +import static io.confluent.ksql.api.client.util.ClientTestUtil.subscribeAndWait; +import static io.confluent.ksql.test.util.AssertEventually.assertThatEventually; +import static io.confluent.ksql.util.KsqlConfig.KSQL_DEFAULT_KEY_FORMAT_CONFIG; +import static io.confluent.ksql.util.KsqlConfig.KSQL_STREAMS_PREFIX; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThrows; + +import com.google.common.collect.ImmutableListMultimap; +import com.google.common.collect.ImmutableMap; +import io.confluent.common.utils.IntegrationTest; +import io.confluent.ksql.api.client.AcksPublisher; +import io.confluent.ksql.api.client.BatchedQueryResult; +import io.confluent.ksql.api.client.Client; +import io.confluent.ksql.api.client.ClientOptions; +import io.confluent.ksql.api.client.ExecuteStatementResult; +import io.confluent.ksql.api.client.InsertAck; +import io.confluent.ksql.api.client.InsertsPublisher; +import io.confluent.ksql.api.client.KsqlArray; +import io.confluent.ksql.api.client.KsqlObject; +import io.confluent.ksql.api.client.QueryInfo; +import io.confluent.ksql.api.client.Row; +import io.confluent.ksql.api.client.StreamedQueryResult; +import io.confluent.ksql.api.client.exception.KsqlClientException; +import io.confluent.ksql.api.client.util.ClientTestUtil.TestSubscriber; +import io.confluent.ksql.integration.IntegrationTestHarness; +import io.confluent.ksql.integration.Retry; +import io.confluent.ksql.name.ColumnName; +import io.confluent.ksql.rest.entity.ConnectorList; +import io.confluent.ksql.rest.entity.KsqlEntity; +import io.confluent.ksql.rest.integration.RestIntegrationTestUtil; +import io.confluent.ksql.rest.server.ConnectExecutable; +import io.confluent.ksql.rest.server.TestKsqlRestApp; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.PhysicalSchema; +import io.confluent.ksql.schema.ksql.types.SqlTypes; +import io.confluent.ksql.serde.Format; +import io.confluent.ksql.serde.FormatFactory; +import io.confluent.ksql.serde.SerdeFeature; +import io.confluent.ksql.serde.SerdeFeatures; +import io.confluent.ksql.util.StructuredTypesDataProvider; +import io.confluent.ksql.util.TestDataProvider; +import io.vertx.core.Vertx; +import java.io.FileOutputStream; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import kafka.zookeeper.ZooKeeperClientException; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.storage.StringConverter; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.RuleChain; + +@Category({IntegrationTest.class}) +public class ClientMutationIntegrationTest { + + private static final StructuredTypesDataProvider TEST_DATA_PROVIDER = + new StructuredTypesDataProvider("STRUCTURED_TYPES_MUTABLE"); + private static final String TEST_TOPIC = TEST_DATA_PROVIDER.topicName(); + private static final String TEST_STREAM = TEST_DATA_PROVIDER.sourceName(); + + private static final TestDataProvider EMPTY_TEST_DATA_PROVIDER = new TestDataProvider( + "EMPTY_STRUCTURED_TYPES_MUTABLE", + TEST_DATA_PROVIDER.schema(), + ImmutableListMultimap.of() + ); + private static final String EMPTY_TEST_TOPIC = EMPTY_TEST_DATA_PROVIDER.topicName(); + private static final String EMPTY_TEST_STREAM = EMPTY_TEST_DATA_PROVIDER.sourceName(); + + private static final TestDataProvider EMPTY_TEST_DATA_PROVIDER_2 = new TestDataProvider( + "EMPTY_STRUCTURED_TYPES_2_MUTABLE", + TEST_DATA_PROVIDER.schema(), + ImmutableListMultimap.of() + ); + private static final String EMPTY_TEST_TOPIC_2 = EMPTY_TEST_DATA_PROVIDER_2.topicName(); + private static final String EMPTY_TEST_STREAM_2 = EMPTY_TEST_DATA_PROVIDER_2.sourceName(); + + private static final Format KEY_FORMAT = FormatFactory.JSON; + private static final Format VALUE_FORMAT = FormatFactory.JSON; + + private static final String AGG_TABLE = "AGG_TABLE"; + private static final PhysicalSchema AGG_SCHEMA = PhysicalSchema.from( + LogicalSchema.builder() + .keyColumn(ColumnName.of("K"), SqlTypes.struct() + .field("F1", SqlTypes.array(SqlTypes.STRING)) + .build()) + .valueColumn(ColumnName.of("LONG"), SqlTypes.BIGINT) + .build(), + SerdeFeatures.of(SerdeFeature.UNWRAP_SINGLES), + SerdeFeatures.of() + ); + + private static final KsqlObject COMPLEX_FIELD_VALUE = new KsqlObject() + .put("DECIMAL", new BigDecimal("1.1")) + .put("STRUCT", new KsqlObject().put("F1", "foo").put("F2", 3)) + .put("ARRAY_ARRAY", new KsqlArray().add(new KsqlArray().add("bar"))) + .put("ARRAY_STRUCT", new KsqlArray().add(new KsqlObject().put("F1", "x"))) + .put("ARRAY_MAP", new KsqlArray().add(new KsqlObject().put("k", 10))) + .put("MAP_ARRAY", new KsqlObject().put("k", new KsqlArray().add("e1").add("e2"))) + .put("MAP_MAP", new KsqlObject().put("k1", new KsqlObject().put("k2", 5))) + .put("MAP_STRUCT", new KsqlObject().put("k", new KsqlObject().put("F1", "baz"))); + private static final KsqlObject EXPECTED_COMPLEX_FIELD_VALUE = COMPLEX_FIELD_VALUE.copy() + .put("DECIMAL", 1.1d); // Expect raw decimal value, whereas put(BigDecimal) serializes as string to avoid loss of precision + + private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build(); + + // these properties are set together to allow us to verify that we can handle push queries + // in the worker pool without blocking the event loop. + private static final int EVENT_LOOP_POOL_SIZE = 1; + private static final int WORKER_POOL_SIZE = 10; + + private static final TestKsqlRestApp REST_APP = TestKsqlRestApp + .builder(TEST_HARNESS::kafkaBootstrapServers) + .withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) + .withProperty(KSQL_DEFAULT_KEY_FORMAT_CONFIG, "JSON") + .withProperty("ksql.verticle.instances", EVENT_LOOP_POOL_SIZE) + .withProperty("ksql.worker.pool.size", WORKER_POOL_SIZE) + .build(); + + @ClassRule + public static final RuleChain CHAIN = RuleChain + .outerRule(Retry.of(3, ZooKeeperClientException.class, 3, TimeUnit.SECONDS)) + .around(TEST_HARNESS) + .around(REST_APP); + + private static ConnectExecutable CONNECT; + + @BeforeClass + public static void setUpClass() throws Exception { + TEST_HARNESS.ensureTopics(TEST_TOPIC, EMPTY_TEST_TOPIC, EMPTY_TEST_TOPIC_2); + TEST_HARNESS.produceRows(TEST_TOPIC, TEST_DATA_PROVIDER, KEY_FORMAT, VALUE_FORMAT); + RestIntegrationTestUtil.createStream(REST_APP, TEST_DATA_PROVIDER); + RestIntegrationTestUtil.createStream(REST_APP, EMPTY_TEST_DATA_PROVIDER); + RestIntegrationTestUtil.createStream(REST_APP, EMPTY_TEST_DATA_PROVIDER_2); + + makeKsqlRequest("CREATE TABLE " + AGG_TABLE + " AS " + + "SELECT K, LATEST_BY_OFFSET(LONG) AS LONG FROM " + TEST_STREAM + " GROUP BY K;" + ); + + TEST_HARNESS.verifyAvailableUniqueRows( + AGG_TABLE, + 4, // Only unique keys are counted + KEY_FORMAT, + VALUE_FORMAT, + AGG_SCHEMA + ); + + final String testDir = Paths.get(TestUtils.tempDirectory().getAbsolutePath(), "client_integ_test").toString(); + final String connectFilePath = Paths.get(testDir, "connect.properties").toString(); + Files.createDirectories(Paths.get(testDir)); + + writeConnectConfigs(connectFilePath, ImmutableMap.builder() + .put("bootstrap.servers", TEST_HARNESS.kafkaBootstrapServers()) + .put("group.id", UUID.randomUUID().toString()) + .put("key.converter", StringConverter.class.getName()) + .put("value.converter", JsonConverter.class.getName()) + .put("offset.storage.topic", "connect-offsets") + .put("status.storage.topic", "connect-status") + .put("config.storage.topic", "connect-config") + .put("offset.storage.replication.factor", "1") + .put("status.storage.replication.factor", "1") + .put("config.storage.replication.factor", "1") + .put("value.converter.schemas.enable", "false") + .build() + ); + + CONNECT = ConnectExecutable.of(connectFilePath); + CONNECT.startAsync(); + } + + private static void writeConnectConfigs(final String path, final Map configs) throws Exception { + try (final PrintWriter out = new PrintWriter(new OutputStreamWriter( + new FileOutputStream(path, true), StandardCharsets.UTF_8))) { + for (final Map.Entry entry : configs.entrySet()) { + out.println(entry.getKey() + "=" + entry.getValue()); + } + } + } + + @AfterClass + public static void classTearDown() { + cleanupConnectors(); + CONNECT.shutdown(); + REST_APP.getPersistentQueries().forEach(str -> makeKsqlRequest("TERMINATE " + str + ";")); + } + + private Vertx vertx; + private Client client; + + @Before + public void setUp() { + vertx = Vertx.vertx(); + client = createClient(); + } + + @After + public void tearDown() { + if (client != null) { + client.close(); + } + if (vertx != null) { + vertx.close(); + } + REST_APP.getServiceContext().close(); + } + + @Test + public void shouldStreamQueryWithProperties() throws Exception { + // Given + final Map properties = new HashMap<>(); + properties.put("auto.offset.reset", "latest"); + final String sql = "SELECT * FROM " + TEST_STREAM + " EMIT CHANGES LIMIT 1;"; + + final KsqlObject insertRow = new KsqlObject() + .put("K", new KsqlObject().put("F1", new KsqlArray().add("my_key_shouldStreamQueryWithProperties"))) + .put("STR", "Value_shouldStreamQueryWithProperties") + .put("LONG", 2000L) + .put("DEC", new BigDecimal("12.34")) + .put("BYTES_", new byte[]{0, 1, 2}) + .put("ARRAY", new KsqlArray().add("v1_shouldStreamQueryWithProperties").add("v2_shouldStreamQueryWithProperties")) + .put("MAP", new KsqlObject().put("test_name", "shouldStreamQueryWithProperties")) + .put("STRUCT", new KsqlObject().put("F1", 4)) + .put("COMPLEX", COMPLEX_FIELD_VALUE) + .put("TIMESTAMP", "1970-01-01T00:00:00.001") + .put("DATE", "1970-01-01") + .put("TIME", "00:00:00"); + + // When + final StreamedQueryResult queryResult = client.streamQuery(sql, properties).get(); + + // Then: a newly inserted row arrives + final Row row = assertThatEventually(() -> { + // Potentially try inserting multiple times, in case the query wasn't started by the first time + try { + client.insertInto(TEST_STREAM, insertRow).get(); + } catch (final Exception e) { + throw new RuntimeException(e); + } + return queryResult.poll(Duration.ofMillis(10)); + }, is(notNullValue())); + + assertThat(row.getKsqlObject("K"), is(new KsqlObject().put("F1", new KsqlArray().add("my_key_shouldStreamQueryWithProperties")))); + assertThat(row.getString("STR"), is("Value_shouldStreamQueryWithProperties")); + assertThat(row.getLong("LONG"), is(2000L)); + assertThat(row.getDecimal("DEC"), is(new BigDecimal("12.34"))); + assertThat(row.getBytes("BYTES_"), is(new byte[]{0, 1, 2})); + assertThat(row.getKsqlArray("ARRAY"), is(new KsqlArray().add("v1_shouldStreamQueryWithProperties").add("v2_shouldStreamQueryWithProperties"))); + assertThat(row.getKsqlObject("MAP"), is(new KsqlObject().put("test_name", "shouldStreamQueryWithProperties"))); + assertThat(row.getKsqlObject("STRUCT"), is(new KsqlObject().put("F1", 4))); + assertThat(row.getKsqlObject("COMPLEX"), is(EXPECTED_COMPLEX_FIELD_VALUE)); + assertThat(row.getString("TIMESTAMP"), is("1970-01-01T00:00:00.001")); + assertThat(row.getString("DATE"), is("1970-01-01")); + assertThat(row.getString("TIME"), is("00:00")); + } + + @Test + public void shouldExecuteQueryWithProperties() { + // Given + final Map properties = new HashMap<>(); + properties.put("auto.offset.reset", "latest"); + final String sql = "SELECT * FROM " + TEST_STREAM + " EMIT CHANGES LIMIT 1;"; + + final KsqlObject insertRow = new KsqlObject() + .put("K", new KsqlObject().put("F1", new KsqlArray().add("my_key_shouldExecuteQueryWithProperties"))) + .put("STR", "Value_shouldExecuteQueryWithProperties") + .put("LONG", 2000L) + .put("DEC", new BigDecimal("12.34")) + .put("BYTES_", new byte[]{0, 1, 2}) + .put("ARRAY", new KsqlArray().add("v1_shouldExecuteQueryWithProperties").add("v2_shouldExecuteQueryWithProperties")) + .put("MAP", new KsqlObject().put("test_name", "shouldExecuteQueryWithProperties")) + .put("STRUCT", new KsqlObject().put("F1", 4)) + .put("COMPLEX", COMPLEX_FIELD_VALUE) + .put("TIMESTAMP", "1970-01-01T00:00:00.001") + .put("DATE", "1970-01-01") + .put("TIME", "00:00:00"); + + // When + final BatchedQueryResult queryResult = client.executeQuery(sql, properties); + + // Then: a newly inserted row arrives + + // Wait for row to arrive + final AtomicReference rowRef = new AtomicReference<>(); + new Thread(() -> { + try { + final List rows = queryResult.get(); + assertThat(rows, hasSize(1)); + rowRef.set(rows.get(0)); + } catch (final Exception e) { + throw new RuntimeException(e); + } + }).start(); + + // Insert a new row + final Row row = assertThatEventually(() -> { + // Potentially try inserting multiple times, in case the query wasn't started by the first time + try { + client.insertInto(TEST_STREAM, insertRow).get(); + } catch (final Exception e) { + throw new RuntimeException(e); + } + return rowRef.get(); + }, is(notNullValue())); + + // Verify received row + assertThat(row.getKsqlObject("K"), is(new KsqlObject().put("F1", new KsqlArray().add("my_key_shouldExecuteQueryWithProperties")))); + assertThat(row.getString("STR"), is("Value_shouldExecuteQueryWithProperties")); + assertThat(row.getLong("LONG"), is(2000L)); + assertThat(row.getDecimal("DEC"), is(new BigDecimal("12.34"))); + assertThat(row.getBytes("BYTES_"), is(new byte[]{0, 1, 2})); + assertThat(row.getKsqlArray("ARRAY"), is(new KsqlArray().add("v1_shouldExecuteQueryWithProperties").add("v2_shouldExecuteQueryWithProperties"))); + assertThat(row.getKsqlObject("MAP"), is(new KsqlObject().put("test_name", "shouldExecuteQueryWithProperties"))); + assertThat(row.getKsqlObject("STRUCT"), is(new KsqlObject().put("F1", 4))); + assertThat(row.getKsqlObject("COMPLEX"), is(EXPECTED_COMPLEX_FIELD_VALUE)); + assertThat(row.getString("TIMESTAMP"), is("1970-01-01T00:00:00.001")); + assertThat(row.getString("DATE"), is("1970-01-01")); + assertThat(row.getString("TIME"), is("00:00")); + } + + @Test + public void shouldInsertInto() throws Exception { + // Given + final KsqlObject insertRow = new KsqlObject() + .put("k", new KsqlObject().put("F1", new KsqlArray().add("my_key"))) // Column names are case-insensitive + .put("str", "HELLO") // Column names are case-insensitive + .put("`LONG`", 100L) // Backticks may be used to preserve case-sensitivity + .put("\"DEC\"", new BigDecimal("13.31")) // Double quotes may also be used to preserve case-sensitivity + .put("BYTES_", new byte[]{0, 1, 2}) + .put("ARRAY", new KsqlArray().add("v1").add("v2")) + .put("MAP", new KsqlObject().put("some_key", "a_value").put("another_key", "")) + .put("STRUCT", new KsqlObject().put("f1", 12)) // Nested field names are case-insensitive + .put("COMPLEX", COMPLEX_FIELD_VALUE) + .put("TIMESTAMP", "1970-01-01T00:00:00.001") + .put("DATE", "1970-01-01") + .put("TIME", "00:00:01"); + + // When + client.insertInto(EMPTY_TEST_STREAM.toLowerCase(), insertRow).get(); // Stream name is case-insensitive + + // Then: should receive new row + final String query = "SELECT * FROM " + EMPTY_TEST_STREAM + " EMIT CHANGES LIMIT 1;"; + final List rows = client.executeQuery(query).get(); + + // Verify inserted row is as expected + assertThat(rows, hasSize(1)); + assertThat(rows.get(0).getKsqlObject("K"), is(new KsqlObject().put("F1", new KsqlArray().add("my_key")))); + assertThat(rows.get(0).getString("STR"), is("HELLO")); + assertThat(rows.get(0).getLong("LONG"), is(100L)); + assertThat(rows.get(0).getDecimal("DEC"), is(new BigDecimal("13.31"))); + assertThat(rows.get(0).getBytes("BYTES_"), is(new byte[]{0, 1, 2})); + assertThat(rows.get(0).getKsqlArray("ARRAY"), is(new KsqlArray().add("v1").add("v2"))); + assertThat(rows.get(0).getKsqlObject("MAP"), is(new KsqlObject().put("some_key", "a_value").put("another_key", ""))); + assertThat(rows.get(0).getKsqlObject("STRUCT"), is(new KsqlObject().put("F1", 12))); + assertThat(rows.get(0).getKsqlObject("COMPLEX"), is(EXPECTED_COMPLEX_FIELD_VALUE)); + assertThat(rows.get(0).getString("TIMESTAMP"), is("1970-01-01T00:00:00.001")); + assertThat(rows.get(0).getString("DATE"), is("1970-01-01")); + assertThat(rows.get(0).getString("TIME"), is("00:00:01")); + } + + @Test + public void shouldHandleErrorResponseFromInsertInto() { + // Given + final KsqlObject insertRow = new KsqlObject() + .put("K", new KsqlObject().put("F1", new KsqlArray().add("my_key"))) + .put("LONG", 11L); + + // When + final Exception e = assertThrows( + ExecutionException.class, // thrown from .get() when the future completes exceptionally + () -> client.insertInto(AGG_TABLE, insertRow).get() + ); + + // Then + assertThat(e.getCause(), instanceOf(KsqlClientException.class)); + assertThat(e.getCause().getMessage(), containsString("Received 400 response from server")); + assertThat(e.getCause().getMessage(), containsString("Cannot insert into a table")); + } + + @Test + public void shouldStreamInserts() throws Exception { + // Given + final InsertsPublisher insertsPublisher = new InsertsPublisher(); + final int numRows = 5; + + // When + final AcksPublisher acksPublisher = client.streamInserts(EMPTY_TEST_STREAM_2, insertsPublisher).get(); + + final TestSubscriber acksSubscriber = subscribeAndWait(acksPublisher); + assertThat(acksSubscriber.getValues(), hasSize(0)); + acksSubscriber.getSub().request(numRows); + + for (int i = 0; i < numRows; i++) { + insertsPublisher.accept(new KsqlObject() + .put("K", new KsqlObject().put("F1", new KsqlArray().add("my_key_" + i))) + .put("STR", "TEST_" + i) + .put("LONG", i) + .put("DEC", new BigDecimal("13.31")) + .put("BYTES_", new byte[]{0, 1, 2}) + .put("ARRAY", new KsqlArray().add("v_" + i)) + .put("MAP", new KsqlObject().put("k_" + i, "v_" + i)) + .put("COMPLEX", COMPLEX_FIELD_VALUE) + .put("TIMESTAMP", "1970-01-01T00:00:00.001") + .put("DATE", "1970-01-01") + .put("TIME", "00:00")); + } + + // Then + assertThatEventually(acksSubscriber::getValues, hasSize(numRows)); + for (int i = 0; i < numRows; i++) { + assertThat(acksSubscriber.getValues().get(i).seqNum(), is(Long.valueOf(i))); + } + assertThat(acksSubscriber.getError(), is(nullValue())); + assertThat(acksSubscriber.isCompleted(), is(false)); + + assertThat(acksPublisher.isComplete(), is(false)); + assertThat(acksPublisher.isFailed(), is(false)); + + // Then: should receive new rows + final String query = "SELECT * FROM " + EMPTY_TEST_STREAM_2 + " EMIT CHANGES LIMIT " + numRows + ";"; + final List rows = client.executeQuery(query).get(); + + // Verify inserted rows are as expected + assertThat(rows, hasSize(numRows)); + for (int i = 0; i < numRows; i++) { + assertThat(rows.get(i).getKsqlObject("K"), is(new KsqlObject().put("F1", new KsqlArray().add("my_key_" + i)))); + assertThat(rows.get(i).getString("STR"), is("TEST_" + i)); + assertThat(rows.get(i).getLong("LONG"), is(Long.valueOf(i))); + assertThat(rows.get(i).getDecimal("DEC"), is(new BigDecimal("13.31"))); + assertThat(rows.get(i).getBytes("BYTES_"), is(new byte[]{0, 1, 2})); + assertThat(rows.get(i).getKsqlArray("ARRAY"), is(new KsqlArray().add("v_" + i))); + assertThat(rows.get(i).getKsqlObject("MAP"), is(new KsqlObject().put("k_" + i, "v_" + i))); + assertThat(rows.get(i).getKsqlObject("COMPLEX"), is(EXPECTED_COMPLEX_FIELD_VALUE)); + assertThat(rows.get(i).getString("TIMESTAMP"), is("1970-01-01T00:00:00.001")); + assertThat(rows.get(i).getString("DATE"), is("1970-01-01")); + assertThat(rows.get(i).getString("TIME"), is("00:00")); + } + + // When: end connection + insertsPublisher.complete(); + + // Then + assertThatEventually(acksSubscriber::isCompleted, is(true)); + assertThat(acksSubscriber.getError(), is(nullValue())); + + assertThat(acksPublisher.isComplete(), is(true)); + assertThat(acksPublisher.isFailed(), is(false)); + } + + @Test + public void shouldHandleErrorResponseFromStreamInserts() { + // When + final Exception e = assertThrows( + ExecutionException.class, // thrown from .get() when the future completes exceptionally + () -> client.streamInserts(AGG_TABLE, new InsertsPublisher()).get() + ); + + // Then + assertThat(e.getCause(), instanceOf(KsqlClientException.class)); + assertThat(e.getCause().getMessage(), containsString("Received 400 response from server")); + assertThat(e.getCause().getMessage(), containsString("Cannot insert into a table")); + } + + @Test + public void shouldExecuteDdlDmlStatements() throws Exception { + // Given + final String streamName = TEST_STREAM + "_COPY"; + final String csas = "create stream " + streamName + " as select * from " + TEST_STREAM + " emit changes;"; + + final int numInitialStreams = 3; + final int numInitialQueries = 1; + verifyNumStreams(numInitialStreams); + verifyNumQueries(numInitialQueries); + + // When: create stream, start persistent query + final ExecuteStatementResult csasResult = client.executeStatement(csas).get(); + + // Then + verifyNumStreams(numInitialStreams + 1); + verifyNumQueries(numInitialQueries + 1); + assertThat(csasResult.queryId(), is(Optional.of(findQueryIdForSink(streamName)))); + + // When: terminate persistent query + final String queryId = csasResult.queryId().get(); + final ExecuteStatementResult terminateResult = + client.executeStatement("terminate " + queryId + ";").get(); + + // Then + verifyNumQueries(numInitialQueries); + assertThat(terminateResult.queryId(), is(Optional.empty())); + + // When: drop stream + final ExecuteStatementResult dropStreamResult = + client.executeStatement("drop stream " + streamName + ";").get(); + + // Then + verifyNumStreams(numInitialStreams); + assertThat(dropStreamResult.queryId(), is(Optional.empty())); + } + + private void verifyNumStreams(final int numStreams) { + assertThatEventually(() -> { + try { + return client.listStreams().get().size(); + } catch (final Exception e) { + return -1; + } + }, is(numStreams)); + } + + private void verifyNumQueries(final int numQueries) { + assertThatEventually(() -> { + try { + return client.listQueries().get().size(); + } catch (final Exception e) { + return -1; + } + }, is(numQueries)); + } + + private String findQueryIdForSink(final String sinkName) throws Exception { + final List queryIds = client.listQueries().get().stream() + .filter(q -> q.getSink().equals(Optional.of(sinkName))) + .map(QueryInfo::getId) + .collect(Collectors.toList()); + assertThat(queryIds, hasSize(1)); + return queryIds.get(0); + } + + private Client createClient() { + final ClientOptions clientOptions = ClientOptions.create() + .setHost("localhost") + .setPort(REST_APP.getListeners().get(0).getPort()); + return Client.create(clientOptions, vertx); + } + + private static void cleanupConnectors() { + ((ConnectorList) makeKsqlRequest("SHOW CONNECTORS;").get(0)).getConnectors() + .forEach(c -> makeKsqlRequest("DROP CONNECTOR " + c.getName() + ";")); + assertThatEventually( + () -> ((ConnectorList) makeKsqlRequest("SHOW CONNECTORS;") + .get(0)).getConnectors().size(), + is(0) + ); + } + + private static List makeKsqlRequest(final String sql) { + return RestIntegrationTestUtil.makeKsqlRequest(REST_APP, sql); + } + +} \ No newline at end of file diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/embedded/KsqlContext.java b/ksqldb-engine/src/main/java/io/confluent/ksql/embedded/KsqlContext.java index 47c3d1eee0a0..15f354972aa4 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/embedded/KsqlContext.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/embedded/KsqlContext.java @@ -56,7 +56,6 @@ import java.util.Set; import java.util.function.BiFunction; import java.util.stream.Collectors; -import org.apache.kafka.clients.admin.Admin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,7 +84,6 @@ public static KsqlContext create( final ServiceInfo serviceInfo = ServiceInfo.create(ksqlConfig); final KsqlEngine engine = new KsqlEngine( serviceContext, - () -> Admin.create(ksqlConfig.getKsqlAdminClientConfigProps()), processingLogContext, functionRegistry, serviceInfo, diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java index d4fa7614ff27..2c79eac2bdf7 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java @@ -18,6 +18,7 @@ import static io.confluent.ksql.metastore.model.DataSource.DataSourceType; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -107,6 +108,7 @@ import java.util.OptionalInt; import java.util.Set; import java.util.stream.Collectors; +import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -366,7 +368,8 @@ ScalablePushQueryMetadata executeScalablePushQuery( @SuppressWarnings("OptionalGetWithoutIsPresent") // Known to be non-empty TransientQueryMetadata executeTransientQuery( final ConfiguredStatement statement, - final boolean excludeTombstones + final boolean excludeTombstones, + final Optional> endOffsets ) { final ExecutorPlans plans = planQuery(statement, statement.getStatement(), Optional.empty(), Optional.empty(), engineContext.getMetaStore()); @@ -391,7 +394,8 @@ TransientQueryMetadata executeTransientQuery( outputNode.getSchema(), outputNode.getLimit(), outputNode.getWindowInfo(), - excludeTombstones + excludeTombstones, + endOffsets ); } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java index 1f423c58276a..3046303f40a9 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java @@ -79,15 +79,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Function; -import java.util.function.Supplier; import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; import org.apache.kafka.clients.admin.ListOffsetsOptions; import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; @@ -107,12 +104,9 @@ public class KsqlEngine implements KsqlExecutionContext, Closeable { private final EngineContext primaryContext; private final QueryCleanupService cleanupService; private final OrphanedTransientQueryCleaner orphanedTransientQueryCleaner; - private final KsqlConfig ksqlConfig; - private final Admin persistentAdminClient; public KsqlEngine( final ServiceContext serviceContext, - final Supplier adminSupplier, final ProcessingLogContext processingLogContext, final FunctionRegistry functionRegistry, final ServiceInfo serviceInfo, @@ -131,7 +125,6 @@ public KsqlEngine( serviceInfo.customMetricsTags(), serviceInfo.metricsExtension() ), - adminSupplier, queryIdGenerator, ksqlConfig, queryEventListeners @@ -145,7 +138,6 @@ public KsqlEngine( final String serviceId, final MutableMetaStore metaStore, final Function engineMetricsFactory, - final Supplier adminSupplier, final QueryIdGenerator queryIdGenerator, final KsqlConfig ksqlConfig, final List queryEventListeners @@ -181,8 +173,6 @@ public KsqlEngine( 1000, TimeUnit.MILLISECONDS ); - this.ksqlConfig = ksqlConfig; - this.persistentAdminClient = adminSupplier.get(); cleanupService.startAsync(); } @@ -314,7 +304,7 @@ public TransientQueryMetadata executeTransientQuery( try { final TransientQueryMetadata query = EngineExecutor .create(primaryContext, serviceContext, statement.getSessionConfig()) - .executeTransientQuery(statement, excludeTombstones); + .executeTransientQuery(statement, excludeTombstones, Optional.empty()); return query; } catch (final KsqlStatementException e) { throw e; @@ -364,15 +354,16 @@ public StreamPullQueryMetadata createStreamPullQuery( .putAll(statementOrig.getSessionConfig().getOverrides()) .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") .put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) - .put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100) + // There's no point in EOS, since this query only produces side effects. + .put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.AT_LEAST_ONCE) .build() ); + final ImmutableMap endOffsets = + getQueryInputEndOffsets(analysis, serviceContext.getAdminClient()); + final TransientQueryMetadata transientQueryMetadata = EngineExecutor .create(primaryContext, serviceContext, statement.getSessionConfig()) - .executeTransientQuery(statement, excludeTombstones); - - final ImmutableMap endOffsets = - getQueryInputEndOffsets(analysis, statement, serviceContext.getAdminClient()); + .executeTransientQuery(statement, excludeTombstones, Optional.of(endOffsets)); QueryLogger.info( "Streaming stream pull query results '{}'", @@ -382,35 +373,8 @@ public StreamPullQueryMetadata createStreamPullQuery( return new StreamPullQueryMetadata(transientQueryMetadata, endOffsets); } - public boolean passedEndOffsets(final StreamPullQueryMetadata streamPullQueryMetadata) { - - try { - final ListConsumerGroupOffsetsResult result = - persistentAdminClient.listConsumerGroupOffsets( - streamPullQueryMetadata.getTransientQueryMetadata().getQueryApplicationId() - ); - - final Map metadataMap = - result.partitionsToOffsetAndMetadata().get(); - - final Map endOffsets = streamPullQueryMetadata.getEndOffsets(); - - for (final Map.Entry entry : endOffsets.entrySet()) { - final OffsetAndMetadata offsetAndMetadata = metadataMap.get(entry.getKey()); - if (offsetAndMetadata == null || offsetAndMetadata.offset() < entry.getValue()) { - log.debug("SPQ waiting on " + entry + " at " + offsetAndMetadata); - return false; - } - } - return true; - } catch (final ExecutionException | InterruptedException e) { - throw new KsqlException(e); - } - } - private ImmutableMap getQueryInputEndOffsets( final ImmutableAnalysis analysis, - final ConfiguredStatement statement, final Admin admin) { final String sourceTopicName = analysis.getFrom().getDataSource().getKafkaTopicName(); @@ -419,21 +383,9 @@ private ImmutableMap getQueryInputEndOffsets( sourceTopicName ); - final Object processingGuarantee = statement - .getSessionConfig() - .getConfig(true) - .getKsqlStreamConfigProps() - .get(StreamsConfig.PROCESSING_GUARANTEE_CONFIG); - - final IsolationLevel isolationLevel = - StreamsConfig.AT_LEAST_ONCE.equals(processingGuarantee) - ? IsolationLevel.READ_UNCOMMITTED - : IsolationLevel.READ_COMMITTED; - return getEndOffsets( admin, - topicDescription, - isolationLevel + topicDescription ); } @@ -459,8 +411,7 @@ private TopicDescription getTopicDescription(final Admin admin, final String sou private ImmutableMap getEndOffsets( final Admin admin, - final TopicDescription topicDescription, - final IsolationLevel isolationLevel) { + final TopicDescription topicDescription) { final Map topicPartitions = topicDescription .partitions() @@ -471,7 +422,7 @@ private ImmutableMap getEndOffsets( final ListOffsetsResult listOffsetsResult = admin.listOffsets( topicPartitions, new ListOffsetsOptions( - isolationLevel + IsolationLevel.READ_UNCOMMITTED ) ); @@ -557,7 +508,7 @@ public void close(final boolean closeQueries) { try { cleanupService.stopAsync().awaitTerminated(30, TimeUnit.SECONDS); - } catch (TimeoutException e) { + } catch (final TimeoutException e) { log.warn("Timed out while closing cleanup service. " + "External resources for the following applications may be orphaned: {}", cleanupService.pendingApplicationIds() @@ -566,7 +517,6 @@ public void close(final boolean closeQueries) { engineMetrics.close(); aggregateMetricsCollector.shutdown(); - persistentAdminClient.close(); } @Override diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java index fce90f97ab48..e383ff05a9b0 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java @@ -168,7 +168,7 @@ public TransientQueryMetadata executeTransientQuery( engineContext, serviceContext, statement.getSessionConfig() - ).executeTransientQuery(statement, excludeTombstones); + ).executeTransientQuery(statement, excludeTombstones, Optional.empty()); } @Override diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/BlockingRowQueue.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/BlockingRowQueue.java index 49530c7a648d..ed06eb929f5f 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/BlockingRowQueue.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/BlockingRowQueue.java @@ -40,6 +40,12 @@ public interface BlockingRowQueue { */ void setLimitHandler(LimitHandler limitHandler); + /** + * Sets a handler that will be called when the query completes. + * Replaces any previous handler. + */ + void setCompletionHandler(CompletionHandler completionHandler); + /** * Sets the callback that will be called any time a new row is accepted into the queue. * diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/LimitQueueCallback.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/CompletionHandler.java similarity index 70% rename from ksqldb-engine/src/main/java/io/confluent/ksql/query/LimitQueueCallback.java rename to ksqldb-engine/src/main/java/io/confluent/ksql/query/CompletionHandler.java index 381b6348dca0..df6961130bad 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/LimitQueueCallback.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/CompletionHandler.java @@ -15,14 +15,10 @@ package io.confluent.ksql.query; -public interface LimitQueueCallback extends QueueCallback { +public interface CompletionHandler { /** - * Sets the limit handler that will be called when the limit is reached. - * - *

Replaces any previous handler. - * - * @param limitHandler the handler. + * Fired when the query is complete */ - void setLimitHandler(LimitHandler limitHandler); + void complete(); } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/LimitedQueueCallback.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/LimitedQueueCallback.java deleted file mode 100644 index d9799b7918a9..000000000000 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/LimitedQueueCallback.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright 2019 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.query; - -import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; -import javax.annotation.concurrent.ThreadSafe; - -/** - * A {@code LimitQueueCallback} that limits the number of events queued and fires the - * {@code LimitHandler} when the limit is reached. - */ -@ThreadSafe -public final class LimitedQueueCallback implements LimitQueueCallback { - - private final AtomicInteger remaining; - private final AtomicInteger queued; - private volatile LimitHandler limitHandler = () -> { - }; - - LimitedQueueCallback(final int limit) { - if (limit < 0) { - throw new IllegalArgumentException("limit must be positive, was:" + limit); - } - this.remaining = new AtomicInteger(limit); - this.queued = new AtomicInteger(limit); - } - - @Override - public void setLimitHandler(final LimitHandler limitHandler) { - this.limitHandler = Objects.requireNonNull(limitHandler, "limitHandler"); - if (remaining.get() == 0) { - limitHandler.limitReached(); - } - } - - @Override - public boolean shouldQueue() { - return remaining.decrementAndGet() >= 0; - } - - @Override - public void onQueued() { - if (queued.decrementAndGet() == 0) { - limitHandler.limitReached(); - } - } -} diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/PullQueryQueue.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/PullQueryQueue.java index 290d03975d35..0e95d65ef0c2 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/PullQueryQueue.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/PullQueryQueue.java @@ -82,6 +82,12 @@ public void setLimitHandler(final LimitHandler limitHandler) { this.limitHandler = limitHandler; } + @Override + public void setCompletionHandler(final CompletionHandler completionHandler) { + // not currently used in pull queries, although future refactoring might be able to + // take advantage of this mechanism. + } + @Override public void setQueuedCallback(final Runnable queuedCallback) { final Runnable parent = this.queuedCallback; diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java index fbc5398acd90..bba26d639882 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java @@ -19,6 +19,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import io.confluent.ksql.GenericRow; import io.confluent.ksql.config.SessionConfig; @@ -66,11 +68,17 @@ import io.confluent.ksql.util.SharedKafkaStreamsRuntimeImpl; import io.confluent.ksql.util.TransientQueryMetadata; import io.confluent.ksql.util.ValidationSharedKafkaStreamsRuntimeImpl; +import io.vertx.core.impl.ConcurrentHashSet; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.time.Duration; import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.OptionalInt; @@ -79,12 +87,21 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; +import org.apache.kafka.streams.processor.internals.StreamTask; import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology; import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyStreamsBuilder; @@ -172,7 +189,8 @@ TransientQueryMetadata buildTransientQuery( final Optional windowInfo, final boolean excludeTombstones, final QueryMetadata.Listener listener, - final StreamsBuilder streamsBuilder + final StreamsBuilder streamsBuilder, + final Optional> endOffsets ) { final KsqlConfig ksqlConfig = config.getConfig(true); final String applicationId = QueryApplicationId.build(ksqlConfig, false, queryId); @@ -184,7 +202,8 @@ TransientQueryMetadata buildTransientQuery( final Map streamsProperties = buildStreamsProperties(applicationId, queryId); final Object buildResult = buildQueryImplementation(physicalPlan, runtimeBuildContext); - final BlockingRowQueue queue = buildTransientQueryQueue(buildResult, limit, excludeTombstones); + final TransientQueryQueue queue = + buildTransientQueryQueue(buildResult, limit, excludeTombstones, endOffsets); final Topology topology = streamsBuilder.build(PropertiesUtil.asProperties(streamsProperties)); final TransientQueryMetadata.ResultType resultType = buildResult instanceof KTableHolder @@ -513,17 +532,19 @@ private ProcessingLogger getUncaughtExceptionProcessingLogger(final QueryId quer private static TransientQueryQueue buildTransientQueryQueue( final Object buildResult, final OptionalInt limit, - final boolean excludeTombstones + final boolean excludeTombstones, + final Optional> endOffsets ) { final TransientQueryQueue queue = new TransientQueryQueue(limit); if (buildResult instanceof KStreamHolder) { final KStream kstream = ((KStreamHolder) buildResult).getStream(); - - kstream - // Null value for a stream is invalid: - .filter((k, v) -> v != null) - .foreach((k, v) -> queue.acceptRow(null, v)); + // The list of "done partitions" is shared among all the stream threads and tasks. + final ConcurrentHashSet donePartitions = new ConcurrentHashSet<>(); + kstream.process( + (ProcessorSupplier) () -> + new TransientQuerySinkProcessor(queue, endOffsets, donePartitions) + ); } else if (buildResult instanceof KTableHolder) { final KTable ktable = ((KTableHolder) buildResult).getTable(); @@ -650,4 +671,131 @@ private static void updateListProperty( valueList.add(value); properties.put(key, valueList); } + + private static class TransientQuerySinkProcessor implements + Processor { + + private final TransientQueryQueue queue; + private final Optional> endOffsets; + private final ConcurrentHashSet donePartitions; + private ProcessorContext context; + + TransientQuerySinkProcessor( + final TransientQueryQueue queue, + final Optional> endOffsets, + final ConcurrentHashSet donePartitions) { + + this.queue = queue; + this.endOffsets = endOffsets; + this.donePartitions = donePartitions; + } + + @Override + public void init(final ProcessorContext context) { + Processor.super.init(context); + this.context = context; + if (endOffsets.isPresent()) { + // registering a punctuation here is the easiest way to check + // if we're complete even if we don't see all records. + // The in-line check in process() is faster, + // but some records may be filtered out before reaching us. + context.schedule( + Duration.ofMillis(100), + PunctuationType.WALL_CLOCK_TIME, + timestamp -> checkCompletion() + ); + } + } + + @Override + public void process(final Record record) { + final Optional topicPartition = + context + .recordMetadata() + .map(m -> new TopicPartition(m.topic(), m.partition())); + + final boolean alreadyDone = + topicPartition.isPresent() && donePartitions.contains(topicPartition.get()); + + // We ignore null values in streams as invalid data and drop the record. + // If we're already done with the current partition, we just drop the record. + if (record.value() != null && !alreadyDone) { + queue.acceptRow(null, record.value()); + } + + if (topicPartition.isPresent()) { + checkCompletion(topicPartition.get()); + } + } + + private void checkCompletion(final TopicPartition topicPartition) { + if (endOffsets.isPresent()) { + final ImmutableMap endOffsetsMap = endOffsets.get(); + if (endOffsetsMap.containsKey(topicPartition)) { + final Map currentPositions = getCurrentPositions(); + checkCompletion(topicPartition, currentPositions, endOffsetsMap.get(topicPartition)); + } + } + } + + private void checkCompletion() { + if (endOffsets.isPresent()) { + final Map currentPositions = + getCurrentPositions(); + + for (final Entry end : endOffsets.get().entrySet()) { + checkCompletion( + end.getKey(), + currentPositions, + end.getValue() + ); + } + // if we're completely done with this query, then call the completion handler. + if (ImmutableSet.copyOf(donePartitions).equals(endOffsets.get().keySet())) { + queue.complete(); + } + } + } + + private void checkCompletion(final TopicPartition topicPartition, + final Map currentPositions, final Long end) { + if (currentPositions.containsKey(topicPartition)) { + final OffsetAndMetadata current = currentPositions.get(topicPartition); + if (current != null && current.offset() >= end) { + donePartitions.add(topicPartition); + } + } + } + + @SuppressWarnings("unchecked") + private Map getCurrentPositions() { + // It might make sense to actually propose adding the concept of getting the + // "current position" in the ProcessorContext, but for now we just expose it + // by reflection. Obviously, this code is pretty brittle. + try { + if (context.getClass().equals(ProcessorContextImpl.class)) { + final Field streamTask; + streamTask = ProcessorContextImpl.class.getDeclaredField("streamTask"); + streamTask.setAccessible(true); + final StreamTask task = (StreamTask) streamTask.get(context); + final Method committableOffsetsAndMetadata = + StreamTask.class.getDeclaredMethod("committableOffsetsAndMetadata"); + committableOffsetsAndMetadata.setAccessible(true); + return (Map) + committableOffsetsAndMetadata.invoke(task); + } else { + // Specifically, this will break if you try to run this processor in a + // unit test with a mock processor. Both KafkaStreams and the TopologyTestDriver + // use a ProcessorContextImpl, which should be the only way this processor gets run. + throw new IllegalStateException( + "Expected only to run in the KafkaStreams or TopologyTestDriver runtimes." + ); + } + } catch (final NoSuchFieldException | IllegalAccessException + | NoSuchMethodException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryRegistry.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryRegistry.java index 18f94758d411..6d2e4dd972f6 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryRegistry.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryRegistry.java @@ -15,6 +15,7 @@ package io.confluent.ksql.query; +import com.google.common.collect.ImmutableMap; import io.confluent.ksql.config.SessionConfig; import io.confluent.ksql.execution.plan.ExecutionStep; import io.confluent.ksql.logging.processing.ProcessingLogContext; @@ -34,6 +35,7 @@ import java.util.OptionalInt; import java.util.Set; import java.util.function.BiPredicate; +import org.apache.kafka.common.TopicPartition; /** * Interface for building and managing queries. @@ -75,7 +77,8 @@ TransientQueryMetadata createTransientQuery( LogicalSchema schema, OptionalInt limit, Optional windowInfo, - boolean excludeTombstones + boolean excludeTombstones, + Optional> endOffsets ); // CHECKSTYLE_RULES.ON: ParameterNumberCheck diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryRegistryImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryRegistryImpl.java index bb81299c1ffb..0a3034482bfc 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryRegistryImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryRegistryImpl.java @@ -16,6 +16,7 @@ package io.confluent.ksql.query; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import io.confluent.ksql.config.SessionConfig; @@ -54,6 +55,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiPredicate; import java.util.stream.Collectors; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.StreamsBuilder; @@ -151,7 +153,8 @@ public TransientQueryMetadata createTransientQuery( final LogicalSchema schema, final OptionalInt limit, final Optional windowInfo, - final boolean excludeTombstones) { + final boolean excludeTombstones, + final Optional> endOffsets) { // CHECKSTYLE_RULES.ON: ParameterNumberCheck final QueryBuilder queryBuilder = queryBuilderFactory.create( config, @@ -172,7 +175,8 @@ public TransientQueryMetadata createTransientQuery( windowInfo, excludeTombstones, new ListenerImpl(), - new StreamsBuilder() + new StreamsBuilder(), + endOffsets ); registerTransientQuery(serviceContext, metaStore, query); return query; diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueueCallback.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueueCallback.java deleted file mode 100644 index 807008fb0ce4..000000000000 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueueCallback.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright 2019 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.query; - -import javax.annotation.concurrent.ThreadSafe; - -/** - * A callback called before and after things are queued. - * - *

Implementations must be thread-safe as they will be called from multiple threads - * simultaneously. - */ -@ThreadSafe -public interface QueueCallback { - - /** - * Called to determine if an output row should be queued for output. - * - * @return {@code true} if it should be sent, {@code false} otherwise. - */ - boolean shouldQueue(); - - /** - * Called once a row has been queued for output. - */ - void onQueued(); -} diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/TransientQueryQueue.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/TransientQueryQueue.java index d9693774d570..2c0b35851474 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/TransientQueryQueue.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/TransientQueryQueue.java @@ -26,6 +26,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** * A queue of rows for transient queries. @@ -36,8 +37,11 @@ public class TransientQueryQueue implements BlockingRowQueue { private final BlockingQueue, GenericRow>> rowQueue; private final int offerTimeoutMs; - private LimitQueueCallback callback; private volatile boolean closed = false; + private final AtomicInteger remaining; + private LimitHandler limitHandler; + private CompletionHandler completionHandler; + private Runnable queuedCallback; public TransientQueryQueue(final OptionalInt limit) { this(limit, BLOCKING_QUEUE_CAPACITY, 100); @@ -49,39 +53,24 @@ public TransientQueryQueue( final int queueSizeLimit, final int offerTimeoutMs ) { - this.callback = limit.isPresent() - ? new LimitedQueueCallback(limit.getAsInt()) - : new UnlimitedQueueCallback(); + this.remaining = limit.isPresent() ? new AtomicInteger(limit.getAsInt()) : null; this.rowQueue = new LinkedBlockingQueue<>(queueSizeLimit); this.offerTimeoutMs = offerTimeoutMs; } @Override - public void setLimitHandler(final LimitHandler limitHandler) { - callback.setLimitHandler(limitHandler); + public void setQueuedCallback(final Runnable queuedCallback) { + this.queuedCallback = queuedCallback; } @Override - public void setQueuedCallback(final Runnable queuedCallback) { - final LimitQueueCallback parent = callback; - - callback = new LimitQueueCallback() { - @Override - public boolean shouldQueue() { - return parent.shouldQueue(); - } - - @Override - public void onQueued() { - parent.onQueued(); - queuedCallback.run(); - } + public void setLimitHandler(final LimitHandler limitHandler) { + this.limitHandler = limitHandler; + } - @Override - public void setLimitHandler(final LimitHandler limitHandler) { - parent.setLimitHandler(limitHandler); - } - }; + @Override + public void setCompletionHandler(final CompletionHandler completionHandler) { + this.completionHandler = completionHandler; } @Override @@ -117,7 +106,7 @@ public void close() { public void acceptRow(final List key, final GenericRow value) { try { - if (!callback.shouldQueue()) { + if (passedLimit()) { return; } @@ -125,7 +114,7 @@ public void acceptRow(final List key, final GenericRow value) { while (!closed) { if (rowQueue.offer(row, offerTimeoutMs, TimeUnit.MILLISECONDS)) { - callback.onQueued(); + onQueued(); break; } } @@ -144,7 +133,7 @@ public void acceptRow(final List key, final GenericRow value) { */ public boolean acceptRowNonBlocking(final List key, final GenericRow value) { try { - if (!callback.shouldQueue()) { + if (passedLimit()) { return true; } @@ -154,7 +143,7 @@ public boolean acceptRowNonBlocking(final List key, final GenericRow value) { if (!rowQueue.offer(row, 0, TimeUnit.MILLISECONDS)) { return false; } - callback.onQueued(); + onQueued(); return true; } } catch (final InterruptedException e) { @@ -168,4 +157,23 @@ public boolean acceptRowNonBlocking(final List key, final GenericRow value) { public boolean isClosed() { return closed; } + + public void complete() { + if (completionHandler != null) { + completionHandler.complete(); + } + } + + private void onQueued() { + if (remaining != null && remaining.decrementAndGet() <= 0) { + limitHandler.limitReached(); + } + if (queuedCallback != null) { + queuedCallback.run(); + } + } + + private boolean passedLimit() { + return remaining != null && remaining.get() <= 0; + } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/UnlimitedQueueCallback.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/UnlimitedQueueCallback.java deleted file mode 100644 index 042dae683e1d..000000000000 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/UnlimitedQueueCallback.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2019 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.query; - -/** - * A {@code LimitQueueCallback} that does not apply a limit. - */ -public class UnlimitedQueueCallback implements LimitQueueCallback { - - @Override - public void setLimitHandler(final LimitHandler limitHandler) { - } - - @Override - public boolean shouldQueue() { - return true; - } - - @Override - public void onQueued() { - } -} diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/PushQueryMetadata.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/PushQueryMetadata.java index 8a5cf5da474b..a3b9cb096956 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/PushQueryMetadata.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/PushQueryMetadata.java @@ -16,6 +16,7 @@ package io.confluent.ksql.util; import io.confluent.ksql.query.BlockingRowQueue; +import io.confluent.ksql.query.CompletionHandler; import io.confluent.ksql.query.LimitHandler; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.LogicalSchema; @@ -36,6 +37,8 @@ public interface PushQueryMetadata { void setLimitHandler(LimitHandler limitHandler); + void setCompletionHandler(CompletionHandler completionHandler); + void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler handler); LogicalSchema getLogicalSchema(); diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/SandboxedTransientQueryMetadata.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/SandboxedTransientQueryMetadata.java index 5d015ca1ed30..a6675470ed12 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/SandboxedTransientQueryMetadata.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/SandboxedTransientQueryMetadata.java @@ -17,6 +17,7 @@ import io.confluent.ksql.GenericRow; import io.confluent.ksql.query.BlockingRowQueue; +import io.confluent.ksql.query.CompletionHandler; import io.confluent.ksql.query.LimitHandler; import java.util.Collection; import java.util.List; @@ -67,6 +68,11 @@ public void setLimitHandler(final LimitHandler limitHandler) { throwUseException(); } + @Override + public void setCompletionHandler(final CompletionHandler completionHandler) { + throwUseException(); + } + public void setQueuedCallback(final Runnable callback) { throwUseException(); } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/ScalablePushQueryMetadata.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/ScalablePushQueryMetadata.java index d256bfc705a0..568fcd82f6e8 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/ScalablePushQueryMetadata.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/ScalablePushQueryMetadata.java @@ -20,6 +20,7 @@ import io.confluent.ksql.physical.scalablepush.PushQueryQueuePopulator; import io.confluent.ksql.physical.scalablepush.PushRouting.PushConnectionsHandle; import io.confluent.ksql.query.BlockingRowQueue; +import io.confluent.ksql.query.CompletionHandler; import io.confluent.ksql.query.LimitHandler; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.LogicalSchema; @@ -111,6 +112,11 @@ public void setLimitHandler(final LimitHandler limitHandler) { rowQueue.setLimitHandler(limitHandler); } + @Override + public void setCompletionHandler(final CompletionHandler completionHandler) { + rowQueue.setCompletionHandler(completionHandler); + } + @Override public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler handler) { onException(handler::handle); diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/TransientQueryMetadata.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/TransientQueryMetadata.java index 4ab51bf922bc..3dd166a3e4e6 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/TransientQueryMetadata.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/TransientQueryMetadata.java @@ -18,6 +18,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.query.BlockingRowQueue; +import io.confluent.ksql.query.CompletionHandler; import io.confluent.ksql.query.KafkaStreamsBuilder; import io.confluent.ksql.query.LimitHandler; import io.confluent.ksql.query.QueryErrorClassifier; @@ -132,6 +133,11 @@ public void setLimitHandler(final LimitHandler limitHandler) { rowQueue.setLimitHandler(limitHandler); } + @Override + public void setCompletionHandler(final CompletionHandler completionHandler) { + rowQueue.setCompletionHandler(completionHandler); + } + @Override public void close() { // To avoid deadlock, close the queue first to ensure producer side isn't blocked trying to diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/embedded/KsqlContextTestUtil.java b/ksqldb-engine/src/test/java/io/confluent/ksql/embedded/KsqlContextTestUtil.java index 689761a08e4f..8feb8f650e6a 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/embedded/KsqlContextTestUtil.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/embedded/KsqlContextTestUtil.java @@ -34,7 +34,6 @@ import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; -import org.easymock.EasyMock; public final class KsqlContextTestUtil { @@ -69,7 +68,6 @@ public static KsqlContext create( final KsqlEngine engine = new KsqlEngine( serviceContext, - () -> EasyMock.mock(Admin.class), ProcessingLogContext.create(), functionRegistry, ServiceInfo.create(ksqlConfig, metricsPrefix), diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTestUtil.java b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTestUtil.java index 653575d272fb..6cc49e995107 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTestUtil.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTestUtil.java @@ -42,8 +42,6 @@ import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; -import org.apache.kafka.clients.admin.Admin; -import org.mockito.Mockito; public final class KsqlEngineTestUtil { @@ -60,7 +58,6 @@ public static KsqlEngine createKsqlEngine( "test_instance_", metaStore, (engine) -> new KsqlEngineMetrics("", engine, Collections.emptyMap(), Optional.empty()), - () -> Mockito.mock(Admin.class), new SequentialQueryIdGenerator(), new KsqlConfig(Collections.emptyMap()), Collections.emptyList() @@ -80,7 +77,6 @@ public static KsqlEngine createKsqlEngine( "test_instance_", metaStore, engineMetricsFactory, - () -> Mockito.mock(Admin.class), queryIdGenerator, ksqlConfig, Collections.emptyList() diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java index 69541c13d109..bf31773525a3 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java @@ -57,8 +57,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import kafka.zookeeper.ZooKeeperClientException; -import org.apache.kafka.clients.admin.Admin; -import org.easymock.EasyMock; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -105,7 +103,6 @@ public void before() { ksqlEngine = new KsqlEngine( serviceContext, - () -> EasyMock.mock(Admin.class), ProcessingLogContext.create(), functionRegistry, ServiceInfo.create(ksqlConfig), diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java index 22db24e54744..f26f42d8fe2a 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java @@ -86,7 +86,6 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.MissingSourceTopicException; import org.apache.kafka.streams.errors.StreamsException; -import org.easymock.EasyMock; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -418,7 +417,6 @@ private void givenTestSetupWithConfig(final Map ksqlConfigs) { serviceContext = ServiceContextFactory.create(ksqlConfig, DisabledKsqlClient::instance); ksqlEngine = new KsqlEngine( serviceContext, - () -> EasyMock.mock(Admin.class), ProcessingLogContext.create(), new InternalFunctionRegistry(), ServiceInfo.create(ksqlConfig), diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/query/LimitedQueueCallbackTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/query/LimitedQueueCallbackTest.java deleted file mode 100644 index 474d096e1806..000000000000 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/query/LimitedQueueCallbackTest.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Copyright 2019 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.query; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.is; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.util.stream.IntStream; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -@RunWith(MockitoJUnitRunner.class) -public class LimitedQueueCallbackTest { - - private static final int SOME_LIMIT = 3; - - @Mock - private LimitHandler limitHandler; - private LimitedQueueCallback callback; - - @Before - public void setUp() { - callback = new LimitedQueueCallback(SOME_LIMIT); - callback.setLimitHandler(limitHandler); - } - - @Test - public void shouldAllowQueuingUpToTheLimit() { - // When: - IntStream.range(0, SOME_LIMIT).forEach(idx -> - assertThat(callback.shouldQueue(), is(true))); - - // Then: - IntStream.range(0, SOME_LIMIT).forEach(idx -> - assertThat(callback.shouldQueue(), is(false))); - } - - @Test - public void shouldNotCallLimitHandlerIfLimitNotReached() { - // When: - IntStream.range(0, SOME_LIMIT - 1).forEach(idx -> callback.onQueued()); - - // Then: - verify(limitHandler, never()).limitReached(); - } - - @Test - public void shouldCallLimitHandlerOnceLimitReached() { - // When: - IntStream.range(0, SOME_LIMIT).forEach(idx -> callback.onQueued()); - - // Then: - verify(limitHandler).limitReached(); - } - - @Test - public void shouldOnlyCallLimitHandlerOnce() { - // When: - IntStream.range(0, SOME_LIMIT * 2).forEach(idx -> callback.onQueued()); - - // Then: - verify(limitHandler, times(1)).limitReached(); - } - - @Test - public void shouldCallLimitHandlerImmediatelyIfLimitZero() { - // Given: - callback = new LimitedQueueCallback(0); - - // When: - callback.setLimitHandler(limitHandler); - - // Then: - verify(limitHandler).limitReached(); - } - - @Test - public void shouldBeThreadSafe() { - // Given: - final int theLimit = 100; - callback = new LimitedQueueCallback(theLimit); - callback.setLimitHandler(limitHandler); - - // When: - final int enqueued = IntStream.range(0, theLimit * 2) - .parallel() - .map(idx -> { - if (!callback.shouldQueue()) { - return 0; - } - callback.onQueued(); - return 1; - }) - .sum(); - - // Then: - assertThat(enqueued, is(theLimit)); - verify(limitHandler, times(1)).limitReached(); - } -} \ No newline at end of file diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/query/QueryBuilderTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/query/QueryBuilderTest.java index ae203f28c3e1..d2389f0ea9e9 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/query/QueryBuilderTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/query/QueryBuilderTest.java @@ -252,7 +252,8 @@ public void shouldBuildTransientQueryCorrectly() { Optional.empty(), false, queryListener, - streamsBuilder + streamsBuilder, + Optional.empty() ); queryMetadata.initialize(); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/query/QueryRegistryImplTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/query/QueryRegistryImplTest.java index 5f84878b3417..b322e4f06612 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/query/QueryRegistryImplTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/query/QueryRegistryImplTest.java @@ -34,7 +34,6 @@ import io.confluent.ksql.util.PersistentQueryMetadataImpl; import io.confluent.ksql.util.QueryMetadata; import io.confluent.ksql.util.TransientQueryMetadata; - import java.util.Arrays; import java.util.Collection; import java.util.Map; @@ -51,10 +50,10 @@ import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; import org.mockito.quality.Strictness; -import org.mockito.Mockito; @RunWith(Parameterized.class) public class QueryRegistryImplTest { @@ -540,7 +539,7 @@ private TransientQueryMetadata givenCreateTransient( final TransientQueryMetadata query = mock(TransientQueryMetadata.class); when(query.getQueryId()).thenReturn(queryId); when(queryBuilder.buildTransientQuery( - any(), any(), any(), any(), any(), any(), any(), any(), anyBoolean(), any(), any()) + any(), any(), any(), any(), any(), any(), any(), any(), anyBoolean(), any(), any(), any()) ).thenReturn(query); registry.createTransientQuery( config, @@ -555,7 +554,8 @@ private TransientQueryMetadata givenCreateTransient( mock(LogicalSchema.class), OptionalInt.of(123), Optional.empty(), - false + false, + Optional.empty() ); return query; } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/StructuredTypesDataProvider.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/StructuredTypesDataProvider.java index 14dd69879726..0fcfa8445311 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/StructuredTypesDataProvider.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/StructuredTypesDataProvider.java @@ -115,7 +115,11 @@ public class StructuredTypesDataProvider extends TestDataProvider { .build(); public StructuredTypesDataProvider() { - super("STRUCTURED_TYPES", PHYSICAL_SCHEMA, ROWS); + this("STRUCTURED_TYPES"); + } + + public StructuredTypesDataProvider(final String namePrefix) { + super(namePrefix, PHYSICAL_SCHEMA, ROWS); } @SuppressWarnings("unchecked") diff --git a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutor.java b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutor.java index dfeb82000abd..e095442b4a2c 100644 --- a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutor.java +++ b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutor.java @@ -77,7 +77,6 @@ import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; -import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.streams.StreamsConfig; @@ -579,7 +578,6 @@ static KsqlEngine getKsqlEngine(final ServiceContext serviceContext, engine, Collections.emptyMap(), Optional.empty()), - () -> Admin.create(BASE_CONFIG), new SequentialQueryIdGenerator(), KsqlConfig.empty(), Collections.emptyList() diff --git a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/driver/KsqlTesterTest.java b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/driver/KsqlTesterTest.java index 6c08244c8611..983a25f9bc23 100644 --- a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/driver/KsqlTesterTest.java +++ b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/driver/KsqlTesterTest.java @@ -63,7 +63,6 @@ import io.confluent.ksql.test.parser.TestStatement; import io.confluent.ksql.test.tools.TestFunctionRegistry; import io.confluent.ksql.test.util.KsqlTestFolder; -import io.confluent.ksql.util.FakeKafkaClientSupplier; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.PersistentQueryMetadata; @@ -165,7 +164,6 @@ public void setUp() { final MetaStoreImpl metaStore = new MetaStoreImpl(TestFunctionRegistry.INSTANCE.get()); this.engine = new KsqlEngine( serviceContext, - () -> new FakeKafkaClientSupplier().getAdmin(Collections.emptyMap()), NoopProcessingLogContext.INSTANCE, metaStore, ServiceInfo.create(config), diff --git a/ksqldb-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-streams.json b/ksqldb-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-streams.json index fd0890988ee7..5c0adfdd03a9 100644 --- a/ksqldb-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-streams.json +++ b/ksqldb-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-streams.json @@ -93,17 +93,13 @@ } }, { - "name": "correct results", + "name": "simple", "properties": { "ksql.query.pull.stream.enabled": true }, "statements": [ "CREATE STREAM S1 (MYKEY INT KEY, MYVALUE INT) WITH (kafka_topic='test_topic', value_format='JSON');", - "SELECT * FROM S1;", - "SELECT MYKEY FROM S1;", - "SELECT MYVALUE FROM S1;", - "SELECT MYKEY, MYVALUE * 2 AS DOUBLEVALUE FROM S1;", - "SELECT * FROM S1 WHERE MYVALUE >= 2 AND MYVALUE < 4;" + "SELECT * FROM S1;" ], "topics": [ {"name": "test_topic", "partitions": 1} // to get a stable ordering @@ -122,28 +118,116 @@ {"row":{"columns":[11, 2]}}, {"row":{"columns":[12, 3]}}, {"row":{"columns":[13, 4]}} - ]}, + ]} + ] + }, + { + "name": "select key", + "properties": { + "ksql.query.pull.stream.enabled": true + }, + "statements": [ + "CREATE STREAM S1 (MYKEY INT KEY, MYVALUE INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "SELECT MYKEY FROM S1;" + ], + "topics": [ + {"name": "test_topic", "partitions": 1} // to get a stable ordering + ], + "inputs": [ + {"topic": "test_topic", "timestamp": 12365, "key": 10, "value": {"myvalue": 1}}, + {"topic": "test_topic", "timestamp": 12366, "key": 11, "value": {"myvalue": 2}}, + {"topic": "test_topic", "timestamp": 12367, "key": 12, "value": {"myvalue": 3}}, + {"topic": "test_topic", "timestamp": 12368, "key": 13, "value": {"myvalue": 4}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, {"query": [ {"header":{"schema":"`MYKEY` INTEGER"}}, {"row":{"columns":[10]}}, {"row":{"columns":[11]}}, {"row":{"columns":[12]}}, {"row":{"columns":[13]}} - ]}, + ]} + ] + }, + { + "name": "select value", + "properties": { + "ksql.query.pull.stream.enabled": true + }, + "statements": [ + "CREATE STREAM S1 (MYKEY INT KEY, MYVALUE INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "SELECT MYVALUE FROM S1;" + ], + "topics": [ + {"name": "test_topic", "partitions": 1} // to get a stable ordering + ], + "inputs": [ + {"topic": "test_topic", "timestamp": 12365, "key": 10, "value": {"myvalue": 1}}, + {"topic": "test_topic", "timestamp": 12366, "key": 11, "value": {"myvalue": 2}}, + {"topic": "test_topic", "timestamp": 12367, "key": 12, "value": {"myvalue": 3}}, + {"topic": "test_topic", "timestamp": 12368, "key": 13, "value": {"myvalue": 4}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, {"query": [ {"header":{"schema":"`MYVALUE` INTEGER"}}, {"row":{"columns":[1]}}, {"row":{"columns":[2]}}, {"row":{"columns":[3]}}, {"row":{"columns":[4]}} - ]}, + ]} + ] + }, + { + "name": "project expression", + "properties": { + "ksql.query.pull.stream.enabled": true + }, + "statements": [ + "CREATE STREAM S1 (MYKEY INT KEY, MYVALUE INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "SELECT MYKEY, MYVALUE * 2 AS DOUBLEVALUE FROM S1;" + ], + "topics": [ + {"name": "test_topic", "partitions": 1} // to get a stable ordering + ], + "inputs": [ + {"topic": "test_topic", "timestamp": 12365, "key": 10, "value": {"myvalue": 1}}, + {"topic": "test_topic", "timestamp": 12366, "key": 11, "value": {"myvalue": 2}}, + {"topic": "test_topic", "timestamp": 12367, "key": 12, "value": {"myvalue": 3}}, + {"topic": "test_topic", "timestamp": 12368, "key": 13, "value": {"myvalue": 4}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, {"query": [ {"header":{"schema":"`MYKEY` INTEGER, `DOUBLEVALUE` INTEGER"}}, {"row":{"columns":[10, 2]}}, {"row":{"columns":[11, 4]}}, {"row":{"columns":[12, 6]}}, {"row":{"columns":[13, 8]}} - ]}, + ]} + ] + }, + { + "name": "where clause", + "properties": { + "ksql.query.pull.stream.enabled": true + }, + "statements": [ + "CREATE STREAM S1 (MYKEY INT KEY, MYVALUE INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "SELECT * FROM S1 WHERE MYVALUE >= 2 AND MYVALUE < 4;" + ], + "topics": [ + {"name": "test_topic", "partitions": 1} // to get a stable ordering + ], + "inputs": [ + {"topic": "test_topic", "timestamp": 12365, "key": 10, "value": {"myvalue": 1}}, + {"topic": "test_topic", "timestamp": 12366, "key": 11, "value": {"myvalue": 2}}, + {"topic": "test_topic", "timestamp": 12367, "key": 12, "value": {"myvalue": 3}}, + {"topic": "test_topic", "timestamp": 12368, "key": 13, "value": {"myvalue": 4}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, {"query": [ {"header":{"schema":"`MYKEY` INTEGER, `MYVALUE` INTEGER"}}, {"row":{"columns":[11, 2]}}, diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/BlockingQueryPublisher.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/BlockingQueryPublisher.java index e292e9ba4f04..87761d34eaba 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/BlockingQueryPublisher.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/BlockingQueryPublisher.java @@ -80,6 +80,18 @@ public void setQueryHandle(final QueryHandle queryHandle, final boolean isPullQu ctx.runOnContext(v -> sendComplete()); } }); + // Justification for duplicated code: + // The way we handle query completion right now happens to be the same as the way + // we handle limit above, but this is not necessarily going to stay the same. For example, + // we should be returning a "Limit Reached" message as we do in the HTTP/1 endpoint when + // we hit the limit, but for query completion, we should just end the response stream. + this.queue.setCompletionHandler(() -> { + complete = true; + // This allows us to finish the query immediately if the query is already fully streamed. + if (queue.isEmpty()) { + ctx.runOnContext(v -> sendComplete()); + } + }); this.queryHandle = queryHandle; queryHandle.onException(t -> ctx.runOnContext(v -> sendError(t))); } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java index 9a9ff8e4ea08..99762fd5875e 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java @@ -19,7 +19,6 @@ import com.google.common.util.concurrent.RateLimiter; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.confluent.ksql.analyzer.ImmutableAnalysis; -import io.confluent.ksql.analyzer.PullQueryValidator; import io.confluent.ksql.api.server.MetricsCallbackHolder; import io.confluent.ksql.api.server.QueryHandle; import io.confluent.ksql.api.server.SlidingWindowRateLimiter; @@ -60,6 +59,7 @@ import io.confluent.ksql.util.KsqlStatementException; import io.confluent.ksql.util.PushQueryMetadata; import io.confluent.ksql.util.ScalablePushQueryMetadata; +import io.confluent.ksql.util.StreamPullQueryMetadata; import io.confluent.ksql.util.TransientQueryMetadata; import io.confluent.ksql.util.VertxUtils; import io.vertx.core.Context; @@ -153,10 +153,12 @@ public QueryPublisher createQueryPublisher( metricsCallbackHolder ); case KSTREAM: - throw new KsqlStatementException( - "Pull queries are not supported on streams." - + PullQueryValidator.PULL_QUERY_SYNTAX_HELP, - statement.getStatementText() + return createStreamPullQueryPublisher( + analysis, + context, + serviceContext, + statement, + workerExecutor ); default: throw new KsqlStatementException( @@ -248,6 +250,36 @@ private QueryPublisher createPushQueryPublisher( return publisher; } + private QueryPublisher createStreamPullQueryPublisher( + final ImmutableAnalysis analysis, + final Context context, + final ServiceContext serviceContext, + final ConfiguredStatement statement, + final WorkerExecutor workerExecutor + ) { + // Limits and metrics will be in follow-on PRs. + + final BlockingQueryPublisher publisher = new BlockingQueryPublisher(context, workerExecutor); + + final StreamPullQueryMetadata metadata = + ksqlEngine.createStreamPullQuery( + serviceContext, + analysis, + statement, + true + ); + + localCommands.ifPresent(lc -> lc.write(metadata.getTransientQueryMetadata())); + + publisher.setQueryHandle( + new KsqlQueryHandle(metadata.getTransientQueryMetadata()), + false, + false + ); + + return publisher; + } + private QueryPublisher createTablePullQueryPublisher( final ImmutableAnalysis analysis, final Context context, diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/LoggingHandler.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/LoggingHandler.java index f9d3560a99c0..a341e61c5a30 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/LoggingHandler.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/LoggingHandler.java @@ -22,6 +22,7 @@ import io.vertx.core.Handler; import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpVersion; +import io.vertx.core.net.SocketAddress; import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.impl.Utils; import java.time.Clock; @@ -87,9 +88,10 @@ public void handle(final RoutingContext routingContext) { final String userAgent = Optional.ofNullable( routingContext.request().getHeader(HTTP_HEADER_USER_AGENT)).orElse("-"); final String timestamp = Utils.formatRFC1123DateTime(clock.millis()); + final SocketAddress socketAddress = routingContext.request().remoteAddress(); final String message = String.format( "%s - %s [%s] \"%s %s %s\" %d %d \"-\" \"%s\" %d", - routingContext.request().remoteAddress().host(), + socketAddress == null ? "null" : socketAddress.host(), name, timestamp, method, diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 43d1fc0d2ac0..b91aa7bb4c08 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -146,7 +146,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; -import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.apache.log4j.LogManager; @@ -728,7 +727,6 @@ static KsqlRestApplication buildApplication( ); final KsqlEngine ksqlEngine = new KsqlEngine( serviceContext, - () -> Admin.create(ksqlConfig.getKsqlAdminClientConfigProps()), processingLogContext, functionRegistry, ServiceInfo.create(ksqlConfig, metricsPrefix), diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutorFactory.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutorFactory.java index 660d1f6dba84..7b17a97ac8f3 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutorFactory.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutorFactory.java @@ -44,7 +44,6 @@ import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; -import org.apache.kafka.clients.admin.Admin; public final class StandaloneExecutorFactory { @@ -74,7 +73,6 @@ public static StandaloneExecutor create( queriesFile, installDir, serviceContextFactory, - () -> Admin.create(tempConfig.getKsqlAdminClientConfigProps()), KafkaConfigStore::new, KsqlVersionCheckerAgent::new, StandaloneExecutor::new @@ -102,7 +100,6 @@ static StandaloneExecutor create( final String queriesFile, final String installDir, final Function serviceContextFactory, - final Supplier adminSupplier, final BiFunction configStoreFactory, final Function, VersionCheckerAgent> versionCheckerFactory, final StandaloneExecutorConstructor constructor @@ -129,7 +126,6 @@ static StandaloneExecutor create( final KsqlEngine ksqlEngine = new KsqlEngine( serviceContext, - adminSupplier, processingLogContext, functionRegistry, ServiceInfo.create(ksqlConfig), diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriter.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriter.java index 76efb29ef13c..07b073d0595e 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriter.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriter.java @@ -36,7 +36,6 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,9 +50,9 @@ class QueryStreamWriter implements StreamingOutput { private final long disconnectCheckInterval; private final ObjectMapper objectMapper; private final TombstoneFactory tombstoneFactory; - private final Supplier isComplete; private volatile Exception streamsException; private volatile boolean limitReached = false; + private volatile boolean complete; private volatile boolean connectionClosed; private boolean closed; @@ -63,7 +62,7 @@ class QueryStreamWriter implements StreamingOutput { final ObjectMapper objectMapper, final CompletableFuture connectionClosedFuture ) { - this(queryMetadata, disconnectCheckInterval, objectMapper, connectionClosedFuture, () -> false); + this(queryMetadata, disconnectCheckInterval, objectMapper, connectionClosedFuture, false); } QueryStreamWriter( @@ -71,17 +70,23 @@ class QueryStreamWriter implements StreamingOutput { final long disconnectCheckInterval, final ObjectMapper objectMapper, final CompletableFuture connectionClosedFuture, - final Supplier isComplete + final boolean emptyStream ) { this.objectMapper = Objects.requireNonNull(objectMapper, "objectMapper"); this.disconnectCheckInterval = disconnectCheckInterval; this.queryMetadata = Objects.requireNonNull(queryMetadata, "queryMetadata"); - this.queryMetadata.setLimitHandler(new LimitHandler()); + this.queryMetadata.setLimitHandler(() -> limitReached = true); + this.queryMetadata.setCompletionHandler(() -> complete = true); this.queryMetadata.setUncaughtExceptionHandler(new StreamsExceptionHandler()); - this.isComplete = Objects.requireNonNull(isComplete, "isComplete"); this.tombstoneFactory = TombstoneFactory.create(queryMetadata); connectionClosedFuture.thenAccept(v -> connectionClosed = true); - queryMetadata.start(); + if (emptyStream) { + // if we're writing an empty stream, it's already complete, + // and we don't even need to bother starting Streams. + complete = true; + } else { + queryMetadata.start(); + } } // CHECKSTYLE_RULES.OFF: CyclomaticComplexity @@ -94,7 +99,7 @@ public void write(final OutputStream out) { final BlockingRowQueue rowQueue = queryMetadata.getRowQueue(); - while (!connectionClosed && queryMetadata.isRunning() && !limitReached && !isComplete.get()) { + while (!connectionClosed && queryMetadata.isRunning() && !limitReached && !complete) { final KeyValue, GenericRow> row = rowQueue.poll( disconnectCheckInterval, TimeUnit.MILLISECONDS @@ -218,10 +223,4 @@ public StreamThreadExceptionResponse handle(final Throwable throwable) { } } - private class LimitHandler implements io.confluent.ksql.query.LimitHandler { - @Override - public void limitReached() { - limitReached = true; - } - } } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index 021111fb83d5..6b8151c31245 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -83,7 +83,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.streams.StreamsConfig; @@ -587,44 +586,12 @@ private EndpointResponse handleStreamPullQuery( disconnectCheckInterval.toMillis(), OBJECT_MAPPER, connectionClosedFuture, - new StreamPullQueryCompletionCheck(ksqlEngine, streamPullQueryMetadata) + streamPullQueryMetadata.getEndOffsets().isEmpty() ); return EndpointResponse.ok(queryStreamWriter); } - private static final class StreamPullQueryCompletionCheck implements Supplier { - - boolean complete = false; - long lastCheck = 0; - private final KsqlEngine ksqlEngine; - private final StreamPullQueryMetadata streamPullQueryMetadata; - - private StreamPullQueryCompletionCheck(final KsqlEngine ksqlEngine, - final StreamPullQueryMetadata streamPullQueryMetadata) { - this.ksqlEngine = ksqlEngine; - this.streamPullQueryMetadata = streamPullQueryMetadata; - } - - @Override - public Boolean get() { - if (complete) { - return true; - } else { - final long now = System.currentTimeMillis(); - if (now - lastCheck > 100L) { - lastCheck = now; - final boolean completed = - ksqlEngine.passedEndOffsets(streamPullQueryMetadata); - complete = completed; - return completed; - } else { - return false; - } - } - } - } - private EndpointResponse handlePushQuery( final ServiceContext serviceContext, final PreparedStatement statement, diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorFactoryTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorFactoryTest.java index a4b3089bd251..221e23399958 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorFactoryTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorFactoryTest.java @@ -25,8 +25,6 @@ import java.util.Map; import java.util.function.BiFunction; import java.util.function.Function; -import org.apache.kafka.clients.admin.Admin; -import org.easymock.EasyMock; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeMatcher; @@ -97,7 +95,6 @@ private void create() { QUERIES_FILE, INSTALL_DIR, serviceContextFactory, - () -> EasyMock.mock(Admin.class), configStoreFactory, activeQuerySupplier -> versionChecker, constructor diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorFunctionalTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorFunctionalTest.java index 4a3d5d900370..0ee1ed1e35bf 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorFunctionalTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorFunctionalTest.java @@ -48,8 +48,6 @@ import java.util.Map; import java.util.function.Function; import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.admin.Admin; -import org.easymock.EasyMock; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@ -121,7 +119,6 @@ public void setUp() throws Exception { queryFile.toString(), ".", serviceContextFactory, - () -> EasyMock.mock(Admin.class), KafkaConfigStore::new, activeQuerySupplier -> versionChecker, StandaloneExecutor::new diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriterTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriterTest.java index 71c75f572491..01e0ae04301d 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriterTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriterTest.java @@ -30,6 +30,7 @@ import io.confluent.ksql.GenericRow; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.query.BlockingRowQueue; +import io.confluent.ksql.query.CompletionHandler; import io.confluent.ksql.query.LimitHandler; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.rest.ApiJsonMapper; @@ -77,9 +78,12 @@ public class QueryStreamWriterTest { private ArgumentCaptor ehCapture; @Captor private ArgumentCaptor limitHandlerCapture; + @Captor + private ArgumentCaptor completionHandlerCapture; private QueryStreamWriter writer; private ByteArrayOutputStream out; private LimitHandler limitHandler; + private CompletionHandler completionHandler; private ObjectMapper objectMapper; @Before @@ -156,15 +160,14 @@ public void shouldExitAndDrainIfQueryComplete() { queryMetadata, 1000, objectMapper, - new CompletableFuture<>(), - () -> true + new CompletableFuture<>() ); out = new ByteArrayOutputStream(); - verify(queryMetadata).setLimitHandler(limitHandlerCapture.capture()); - limitHandler = limitHandlerCapture.getValue(); - + verify(queryMetadata).setCompletionHandler(completionHandlerCapture.capture()); + completionHandler = completionHandlerCapture.getValue(); + completionHandler.complete(); // When: writer.write(out); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java index ec4b5b2f86b9..33fa5fab0e3e 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java @@ -72,6 +72,7 @@ import io.confluent.ksql.physical.scalablepush.PushRouting; import io.confluent.ksql.properties.DenyListPropertyValidator; import io.confluent.ksql.query.BlockingRowQueue; +import io.confluent.ksql.query.CompletionHandler; import io.confluent.ksql.query.KafkaStreamsBuilder; import io.confluent.ksql.query.LimitHandler; import io.confluent.ksql.query.PullQueryQueue; @@ -129,6 +130,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.function.Supplier; import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.acl.AclOperation; @@ -1074,6 +1076,11 @@ public void setLimitHandler(final LimitHandler limitHandler) { } + @Override + public void setCompletionHandler(final CompletionHandler completionHandler) { + + } + @Override public void setQueuedCallback(final Runnable callback) {