From b0669a044b53407fba3ceb11bddee73e69c0006f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Pe=C3=B1a?= Date: Wed, 25 Mar 2020 15:02:23 -0500 Subject: [PATCH] fix: 'drop (stream|table) if exists' fails if source does not exist (#4872) --- .../ksql/ddl/commands/DropSourceFactory.java | 2 +- .../ddl/commands/DropSourceFactoryTest.java | 4 +- .../ddl/commands/DropSourceCommand.java | 17 ++ .../6.0.0_1585158673084/plan.json | 179 ++++++++++++++++++ .../6.0.0_1585158673084/spec.json | 18 ++ .../6.0.0_1585158673084/topology | 13 ++ .../6.0.0_1585158673119/plan.json | 155 +++++++++++++++ .../6.0.0_1585158673119/spec.json | 18 ++ .../6.0.0_1585158673119/topology | 13 ++ .../6.0.0_1585158673144/plan.json | 179 ++++++++++++++++++ .../6.0.0_1585158673144/spec.json | 18 ++ .../6.0.0_1585158673144/topology | 13 ++ .../query-validation-tests/drop_source.json | 61 ++++++ .../rest/server/computation/RecoveryTest.java | 12 ++ 14 files changed, 699 insertions(+), 3 deletions(-) create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_an_existing_stream_should_succeed/6.0.0_1585158673084/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_an_existing_stream_should_succeed/6.0.0_1585158673084/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_an_existing_stream_should_succeed/6.0.0_1585158673084/topology create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_if_exists_a_non-existing_stream_should_succeed/6.0.0_1585158673119/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_if_exists_a_non-existing_stream_should_succeed/6.0.0_1585158673119/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_if_exists_a_non-existing_stream_should_succeed/6.0.0_1585158673119/topology create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_if_exists_an_existing_stream_should_succeed/6.0.0_1585158673144/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_if_exists_an_existing_stream_should_succeed/6.0.0_1585158673144/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_if_exists_an_existing_stream_should_succeed/6.0.0_1585158673144/topology create mode 100644 ksqldb-functional-tests/src/test/resources/query-validation-tests/drop_source.json diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/DropSourceFactory.java b/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/DropSourceFactory.java index 8c601cd16bb1..643aa49c9d9e 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/DropSourceFactory.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/DropSourceFactory.java @@ -56,7 +56,7 @@ private DropSourceCommand create( final DataSourceType dataSourceType) { final DataSource dataSource = metaStore.getSource(sourceName); if (dataSource == null) { - if (ifExists) { + if (!ifExists) { throw new KsqlException("Source " + sourceName.text() + " does not exist."); } } else if (dataSource.getDataSourceType() != dataSourceType) { diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/DropSourceFactoryTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/DropSourceFactoryTest.java index 8e862704d591..23c7edc843ed 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/DropSourceFactoryTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/DropSourceFactoryTest.java @@ -93,7 +93,7 @@ public void shouldCreateCommandForDropTable() { @Test public void shouldCreateDropSourceOnMissingSourceWithIfExistsForStream() { // Given: - final DropStream dropStream = new DropStream(SOME_NAME, false, true); + final DropStream dropStream = new DropStream(SOME_NAME, true, true); when(metaStore.getSource(SOME_NAME)).thenReturn(null); // When: @@ -106,7 +106,7 @@ public void shouldCreateDropSourceOnMissingSourceWithIfExistsForStream() { @Test public void shouldFailDropSourceOnMissingSourceWithNoIfExistsForStream() { // Given: - final DropStream dropStream = new DropStream(SOME_NAME, true, true); + final DropStream dropStream = new DropStream(SOME_NAME, false, true); when(metaStore.getSource(SOME_NAME)).thenReturn(null); // Then: diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/DropSourceCommand.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/DropSourceCommand.java index d63e39274306..c9310cbd7148 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/DropSourceCommand.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/DropSourceCommand.java @@ -37,4 +37,21 @@ public DdlCommandResult execute(final Executor executor) { public SourceName getSourceName() { return sourceName; } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final DropSourceCommand that = (DropSourceCommand)o; + return Objects.equals(sourceName, that.sourceName); + } + + @Override + public int hashCode() { + return Objects.hash(sourceName); + } } diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_an_existing_stream_should_succeed/6.0.0_1585158673084/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_an_existing_stream_should_succeed/6.0.0_1585158673084/plan.json new file mode 100644 index 000000000000..2702927efbdf --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_an_existing_stream_should_succeed/6.0.0_1585158673084/plan.json @@ -0,0 +1,179 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT2 (DATA STRING) WITH (KAFKA_TOPIC='input', VALUE_FORMAT='DELIMITED');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT2", + "schema" : "`ROWKEY` STRING KEY, `DATA` STRING", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "DELIMITED", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "DROP STREAM INPUT2", + "ddlCommand" : { + "@type" : "dropSourceV1", + "sourceName" : "INPUT2" + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (DATA STRING) WITH (KAFKA_TOPIC='input', VALUE_FORMAT='DELIMITED');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ROWKEY` STRING KEY, `DATA` STRING", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "DELIMITED", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` STRING KEY, `DATA` STRING", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "DELIMITED", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "DELIMITED", + "properties" : { } + }, + "options" : [ ] + }, + "timestampColumn" : null, + "sourceSchema" : "`ROWKEY` STRING KEY, `DATA` STRING" + }, + "selectExpressions" : [ "DATA AS DATA" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "DELIMITED", + "properties" : { } + }, + "options" : [ ] + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.streams.state.dir" : "/tmp/confluent1770849578883696623", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.authentication.plugin.class" : null, + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.any.key.name.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_an_existing_stream_should_succeed/6.0.0_1585158673084/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_an_existing_stream_should_succeed/6.0.0_1585158673084/spec.json new file mode 100644 index 000000000000..9192e5be3778 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_an_existing_stream_should_succeed/6.0.0_1585158673084/spec.json @@ -0,0 +1,18 @@ +{ + "version" : "6.0.0", + "timestamp" : 1585158673084, + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT NOT NULL" + }, + "inputs" : [ { + "topic" : "input", + "key" : "k1", + "value" : "v1" + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "k1", + "value" : "v1" + } ] +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_an_existing_stream_should_succeed/6.0.0_1585158673084/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_an_existing_stream_should_succeed/6.0.0_1585158673084/topology new file mode 100644 index 000000000000..a70a4e91e301 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_an_existing_stream_should_succeed/6.0.0_1585158673084/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_if_exists_a_non-existing_stream_should_succeed/6.0.0_1585158673119/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_if_exists_a_non-existing_stream_should_succeed/6.0.0_1585158673119/plan.json new file mode 100644 index 000000000000..42c9c8e65010 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_if_exists_a_non-existing_stream_should_succeed/6.0.0_1585158673119/plan.json @@ -0,0 +1,155 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "DROP STREAM IF EXISTS TEST", + "ddlCommand" : { + "@type" : "dropSourceV1", + "sourceName" : "TEST" + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (DATA STRING) WITH (KAFKA_TOPIC='input', VALUE_FORMAT='DELIMITED');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ROWKEY` STRING KEY, `DATA` STRING", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "DELIMITED", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` STRING KEY, `DATA` STRING", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "DELIMITED", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "DELIMITED", + "properties" : { } + }, + "options" : [ ] + }, + "timestampColumn" : null, + "sourceSchema" : "`ROWKEY` STRING KEY, `DATA` STRING" + }, + "selectExpressions" : [ "DATA AS DATA" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "DELIMITED", + "properties" : { } + }, + "options" : [ ] + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.streams.state.dir" : "/tmp/confluent1770849578883696623", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.authentication.plugin.class" : null, + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.any.key.name.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_if_exists_a_non-existing_stream_should_succeed/6.0.0_1585158673119/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_if_exists_a_non-existing_stream_should_succeed/6.0.0_1585158673119/spec.json new file mode 100644 index 000000000000..d58bace15951 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_if_exists_a_non-existing_stream_should_succeed/6.0.0_1585158673119/spec.json @@ -0,0 +1,18 @@ +{ + "version" : "6.0.0", + "timestamp" : 1585158673119, + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT NOT NULL" + }, + "inputs" : [ { + "topic" : "input", + "key" : "k1", + "value" : "v1" + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "k1", + "value" : "v1" + } ] +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_if_exists_a_non-existing_stream_should_succeed/6.0.0_1585158673119/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_if_exists_a_non-existing_stream_should_succeed/6.0.0_1585158673119/topology new file mode 100644 index 000000000000..a70a4e91e301 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_if_exists_a_non-existing_stream_should_succeed/6.0.0_1585158673119/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_if_exists_an_existing_stream_should_succeed/6.0.0_1585158673144/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_if_exists_an_existing_stream_should_succeed/6.0.0_1585158673144/plan.json new file mode 100644 index 000000000000..2cb7334512ed --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_if_exists_an_existing_stream_should_succeed/6.0.0_1585158673144/plan.json @@ -0,0 +1,179 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT2 (DATA STRING) WITH (KAFKA_TOPIC='input', VALUE_FORMAT='DELIMITED');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT2", + "schema" : "`ROWKEY` STRING KEY, `DATA` STRING", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "DELIMITED", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "DROP STREAM IF EXISTS INPUT2", + "ddlCommand" : { + "@type" : "dropSourceV1", + "sourceName" : "INPUT2" + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (DATA STRING) WITH (KAFKA_TOPIC='input', VALUE_FORMAT='DELIMITED');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ROWKEY` STRING KEY, `DATA` STRING", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "DELIMITED", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` STRING KEY, `DATA` STRING", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "DELIMITED", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "DELIMITED", + "properties" : { } + }, + "options" : [ ] + }, + "timestampColumn" : null, + "sourceSchema" : "`ROWKEY` STRING KEY, `DATA` STRING" + }, + "selectExpressions" : [ "DATA AS DATA" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "DELIMITED", + "properties" : { } + }, + "options" : [ ] + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.streams.state.dir" : "/tmp/confluent1770849578883696623", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.authentication.plugin.class" : null, + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.any.key.name.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_if_exists_an_existing_stream_should_succeed/6.0.0_1585158673144/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_if_exists_an_existing_stream_should_succeed/6.0.0_1585158673144/spec.json new file mode 100644 index 000000000000..2077393f4fea --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_if_exists_an_existing_stream_should_succeed/6.0.0_1585158673144/spec.json @@ -0,0 +1,18 @@ +{ + "version" : "6.0.0", + "timestamp" : 1585158673144, + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT NOT NULL" + }, + "inputs" : [ { + "topic" : "input", + "key" : "k1", + "value" : "v1" + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "k1", + "value" : "v1" + } ] +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_if_exists_an_existing_stream_should_succeed/6.0.0_1585158673144/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_if_exists_an_existing_stream_should_succeed/6.0.0_1585158673144/topology new file mode 100644 index 000000000000..a70a4e91e301 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/drop_source_-_drop_if_exists_an_existing_stream_should_succeed/6.0.0_1585158673144/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/drop_source.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/drop_source.json new file mode 100644 index 000000000000..52031569f106 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/drop_source.json @@ -0,0 +1,61 @@ +{ + "comments": [ + "Tests covering DROP statements" + ], + "tests": [ + { + "name": "drop an existing stream should succeed", + "statements": [ + "CREATE STREAM input2 (data VARCHAR) WITH (kafka_topic='input', value_format='DELIMITED');", + "DROP STREAM input2;", + "CREATE STREAM input (data VARCHAR) WITH (kafka_topic='input', value_format='DELIMITED');", + "CREATE STREAM output AS SELECT * FROM input;" + ], + "inputs": [ + {"topic": "input", "key": "k1", "value": "v1"} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "k1", "value": "v1"} + ] + }, + { + "name": "drop a non-existing stream should fail", + "statements": [ + "DROP STREAM test;" + ], + "expectedException" : { + "type": "io.confluent.ksql.util.KsqlException", + "message": "Source TEST does not exist" + } + }, + { + "name": "drop if exists a non-existing stream should succeed", + "statements": [ + "DROP STREAM IF EXISTS test;", + "CREATE STREAM input (data VARCHAR) WITH (kafka_topic='input', value_format='DELIMITED');", + "CREATE STREAM output AS SELECT * FROM input;" + ], + "inputs": [ + {"topic": "input", "key": "k1", "value": "v1"} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "k1", "value": "v1"} + ] + }, + { + "name": "drop if exists an existing stream should succeed", + "statements": [ + "CREATE STREAM input2 (data VARCHAR) WITH (kafka_topic='input', value_format='DELIMITED');", + "DROP STREAM IF EXISTS input2;", + "CREATE STREAM input (data VARCHAR) WITH (kafka_topic='input', value_format='DELIMITED');", + "CREATE STREAM output AS SELECT * FROM input;" + ], + "inputs": [ + {"topic": "input", "key": "k1", "value": "v1"} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "k1", "value": "v1"} + ] + } + ] +} \ No newline at end of file diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index 03a5db1d5029..2afe9fbe7b0a 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -619,6 +619,18 @@ public void shouldNotDeleteTopicsOnRecoveryEvenIfLegacyDropCommandAlreadyInComma topicClient.preconditionTopicExists("B"); shouldRecover(ImmutableList.of( + new QueuedCommand( + new CommandId(Type.STREAM, "B", Action.CREATE), + new Command( + "CREATE STREAM B (COLUMN STRING) " + + "WITH (KAFKA_TOPIC='B', VALUE_FORMAT='JSON');", + Collections.emptyMap(), + null, + Optional.empty() + ), + Optional.empty(), + 2L + ), new QueuedCommand( new CommandId(Type.STREAM, "B", Action.DROP), new Command("DROP STREAM B DELETE TOPIC;", ImmutableMap.of(), ImmutableMap.of(), Optional.empty()),