diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineContext.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineContext.java index 6d4ab34f91ef..78be2338e105 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineContext.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineContext.java @@ -258,12 +258,12 @@ private void throwIfInsertQueriesExist(final SourceName sourceName) { + "You need to terminate them before dropping %s.", sourceName.text(), sourceQueries.stream() - .sorted() .map(QueryId::toString) + .sorted() .collect(Collectors.joining(", ")), sinkQueries.stream() - .sorted() .map(QueryId::toString) + .sorted() .collect(Collectors.joining(", ")), sourceName.text() )); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java index adc0f0f53b9f..4510ea1c4cc9 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java @@ -568,6 +568,39 @@ public void shouldFailDropStreamWhenAnotherStreamIsReadingTheTable() { assertThat(e, statementText(is("drop stream bar;"))); } + @Test + public void shouldFailDropStreamWhenMultipleStreamsAreReadingTheTable() { + // Given: + KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, + "create stream bar as select * from test1;" + + "create stream foo as select * from bar;" + + "create stream foo2 as select * from bar;", + KSQL_CONFIG, + Collections.emptyMap() + ); + + // When: + final KsqlStatementException e = assertThrows( + KsqlStatementException.class, + () -> KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, + "drop stream bar;", + KSQL_CONFIG, + Collections.emptyMap() + ) + ); + + // Then: + assertThat(e, rawMessage(is( + "Cannot drop BAR.\n" + + "The following streams and/or tables read from this source: [FOO, FOO2].\n" + + "You need to drop them before dropping BAR."))); + assertThat(e, statementText(is("drop stream bar;"))); + } + @Test public void shouldFailDropStreamWhenAnInsertQueryIsWritingTheStream() { // Given: @@ -635,6 +668,44 @@ public void shouldFailDropStreamWhenAnInsertQueryIsReadingTheStream() { assertThat(e, statementText(is("drop stream bar;"))); } + @Test + public void shouldFailDropStreamWhenMultipleInsertQueriesAreReadingAndWritingTheStream() { + // Given: + KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, + "create stream bar as select * from test1;" + + "create stream foo as select * from test1;" + + "create stream foo2 as select * from test1;" + + "insert into foo select * from bar;" + + "insert into foo2 select * from bar;" + + "insert into bar select * from foo;" + + "insert into bar select * from foo2;", + KSQL_CONFIG, + Collections.emptyMap() + ); + + // When: + final KsqlStatementException e = assertThrows( + KsqlStatementException.class, + () -> KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, + "drop stream bar;", + KSQL_CONFIG, + Collections.emptyMap() + ) + ); + + // Then: + assertThat(e, rawMessage(is( + "Cannot drop BAR.\n" + + "The following queries read from this source: [INSERTQUERY_3, INSERTQUERY_4].\n" + + "The following queries write into this source: [INSERTQUERY_5, INSERTQUERY_6].\n" + + "You need to terminate them before dropping BAR."))); + assertThat(e, statementText(is("drop stream bar;"))); + } + @Test public void shouldDropTableAndTerminateQuery() { // Given: