Skip to content

Commit

Permalink
fix: 'drop (stream|table) if exists' fails if source does not exist (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
spena authored Mar 25, 2020
1 parent 258d0b0 commit b0669a0
Show file tree
Hide file tree
Showing 14 changed files with 699 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private DropSourceCommand create(
final DataSourceType dataSourceType) {
final DataSource dataSource = metaStore.getSource(sourceName);
if (dataSource == null) {
if (ifExists) {
if (!ifExists) {
throw new KsqlException("Source " + sourceName.text() + " does not exist.");
}
} else if (dataSource.getDataSourceType() != dataSourceType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void shouldCreateCommandForDropTable() {
@Test
public void shouldCreateDropSourceOnMissingSourceWithIfExistsForStream() {
// Given:
final DropStream dropStream = new DropStream(SOME_NAME, false, true);
final DropStream dropStream = new DropStream(SOME_NAME, true, true);
when(metaStore.getSource(SOME_NAME)).thenReturn(null);

// When:
Expand All @@ -106,7 +106,7 @@ public void shouldCreateDropSourceOnMissingSourceWithIfExistsForStream() {
@Test
public void shouldFailDropSourceOnMissingSourceWithNoIfExistsForStream() {
// Given:
final DropStream dropStream = new DropStream(SOME_NAME, true, true);
final DropStream dropStream = new DropStream(SOME_NAME, false, true);
when(metaStore.getSource(SOME_NAME)).thenReturn(null);

// Then:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,21 @@ public DdlCommandResult execute(final Executor executor) {
public SourceName getSourceName() {
return sourceName;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final DropSourceCommand that = (DropSourceCommand)o;
return Objects.equals(sourceName, that.sourceName);
}

@Override
public int hashCode() {
return Objects.hash(sourceName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM INPUT2 (DATA STRING) WITH (KAFKA_TOPIC='input', VALUE_FORMAT='DELIMITED');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "INPUT2",
"schema" : "`ROWKEY` STRING KEY, `DATA` STRING",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "DELIMITED",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : null
}, {
"@type" : "ksqlPlanV1",
"statementText" : "DROP STREAM INPUT2",
"ddlCommand" : {
"@type" : "dropSourceV1",
"sourceName" : "INPUT2"
},
"queryPlan" : null
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM INPUT (DATA STRING) WITH (KAFKA_TOPIC='input', VALUE_FORMAT='DELIMITED');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "INPUT",
"schema" : "`ROWKEY` STRING KEY, `DATA` STRING",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "DELIMITED",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : null
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`ROWKEY` STRING KEY, `DATA` STRING",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "DELIMITED",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : {
"sources" : [ "INPUT" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "DELIMITED",
"properties" : { }
},
"options" : [ ]
},
"timestampColumn" : null,
"sourceSchema" : "`ROWKEY` STRING KEY, `DATA` STRING"
},
"selectExpressions" : [ "DATA AS DATA" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "DELIMITED",
"properties" : { }
},
"options" : [ ]
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.streams.state.dir" : "/tmp/confluent1770849578883696623",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.query.pull.metrics.enabled" : "false",
"ksql.authentication.plugin.class" : null,
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.any.key.name.enabled" : "false",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"version" : "6.0.0",
"timestamp" : 1585158673084,
"schemas" : {
"CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT<DATA VARCHAR> NOT NULL",
"CSAS_OUTPUT_0.OUTPUT" : "STRUCT<DATA VARCHAR> NOT NULL"
},
"inputs" : [ {
"topic" : "input",
"key" : "k1",
"value" : "v1"
} ],
"outputs" : [ {
"topic" : "OUTPUT",
"key" : "k1",
"value" : "v1"
} ]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> Project
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-TRANSFORMVALUES-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT)
<-- Project

Loading

0 comments on commit b0669a0

Please sign in to comment.