Skip to content

Commit

Permalink
fix: ClassCastException when dropping sources with 2+ insert queries (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Zara Lim authored Sep 30, 2021
1 parent abe47c3 commit a7c6ebe
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,12 @@ private void throwIfInsertQueriesExist(final SourceName sourceName) {
+ "You need to terminate them before dropping %s.",
sourceName.text(),
sourceQueries.stream()
.sorted()
.map(QueryId::toString)
.sorted()
.collect(Collectors.joining(", ")),
sinkQueries.stream()
.sorted()
.map(QueryId::toString)
.sorted()
.collect(Collectors.joining(", ")),
sourceName.text()
));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,39 @@ public void shouldFailDropStreamWhenAnotherStreamIsReadingTheTable() {
assertThat(e, statementText(is("drop stream bar;")));
}

@Test
public void shouldFailDropStreamWhenMultipleStreamsAreReadingTheTable() {
// Given:
KsqlEngineTestUtil.execute(
serviceContext,
ksqlEngine,
"create stream bar as select * from test1;"
+ "create stream foo as select * from bar;"
+ "create stream foo2 as select * from bar;",
KSQL_CONFIG,
Collections.emptyMap()
);

// When:
final KsqlStatementException e = assertThrows(
KsqlStatementException.class,
() -> KsqlEngineTestUtil.execute(
serviceContext,
ksqlEngine,
"drop stream bar;",
KSQL_CONFIG,
Collections.emptyMap()
)
);

// Then:
assertThat(e, rawMessage(is(
"Cannot drop BAR.\n"
+ "The following streams and/or tables read from this source: [FOO, FOO2].\n"
+ "You need to drop them before dropping BAR.")));
assertThat(e, statementText(is("drop stream bar;")));
}

@Test
public void shouldFailDropStreamWhenAnInsertQueryIsWritingTheStream() {
// Given:
Expand Down Expand Up @@ -635,6 +668,44 @@ public void shouldFailDropStreamWhenAnInsertQueryIsReadingTheStream() {
assertThat(e, statementText(is("drop stream bar;")));
}

@Test
public void shouldFailDropStreamWhenMultipleInsertQueriesAreReadingAndWritingTheStream() {
// Given:
KsqlEngineTestUtil.execute(
serviceContext,
ksqlEngine,
"create stream bar as select * from test1;"
+ "create stream foo as select * from test1;"
+ "create stream foo2 as select * from test1;"
+ "insert into foo select * from bar;"
+ "insert into foo2 select * from bar;"
+ "insert into bar select * from foo;"
+ "insert into bar select * from foo2;",
KSQL_CONFIG,
Collections.emptyMap()
);

// When:
final KsqlStatementException e = assertThrows(
KsqlStatementException.class,
() -> KsqlEngineTestUtil.execute(
serviceContext,
ksqlEngine,
"drop stream bar;",
KSQL_CONFIG,
Collections.emptyMap()
)
);

// Then:
assertThat(e, rawMessage(is(
"Cannot drop BAR.\n"
+ "The following queries read from this source: [INSERTQUERY_3, INSERTQUERY_4].\n"
+ "The following queries write into this source: [INSERTQUERY_5, INSERTQUERY_6].\n"
+ "You need to terminate them before dropping BAR.")));
assertThat(e, statementText(is("drop stream bar;")));
}

@Test
public void shouldDropTableAndTerminateQuery() {
// Given:
Expand Down

0 comments on commit a7c6ebe

Please sign in to comment.