diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java index 475944a99aed..c63a7886de2d 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java @@ -399,7 +399,7 @@ protected Void visitDropTable(final DropTable node, final Integer context) { @Override protected Void visitTerminateQuery(final TerminateQuery node, final Integer context) { builder.append("TERMINATE "); - builder.append(node.getQueryId().map(QueryId::toString).orElse("ALL")); + builder.append(node.getQueryId().map(QueryId::toString).orElse(TerminateQuery.ALL_QUERIES)); return null; } diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/TerminateQuery.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/TerminateQuery.java index 4664072056e9..70244b895c1c 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/TerminateQuery.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/TerminateQuery.java @@ -24,6 +24,7 @@ @Immutable public final class TerminateQuery extends Statement { + public static final String ALL_QUERIES = "ALL"; private final Optional queryId; public static TerminateQuery all(final Optional location) { diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandIdAssigner.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandIdAssigner.java index 42adaf2a4584..498e48b82812 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandIdAssigner.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandIdAssigner.java @@ -110,7 +110,7 @@ private static CommandId getDropTypeCommandId(final DropType dropType) { private static CommandId getTerminateCommandId(final TerminateQuery terminateQuery) { return new CommandId( CommandId.Type.TERMINATE, - terminateQuery.getQueryId().map(QueryId::toString).orElse("ALL"), + terminateQuery.getQueryId().map(QueryId::toString).orElse(TerminateQuery.ALL_QUERIES), CommandId.Action.EXECUTE ); } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/RestoreCommandsCompactor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/RestoreCommandsCompactor.java index 5630feca18d0..6160bd5054bd 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/RestoreCommandsCompactor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/RestoreCommandsCompactor.java @@ -16,6 +16,7 @@ package io.confluent.ksql.rest.server.computation; import io.confluent.ksql.engine.KsqlPlan; +import io.confluent.ksql.parser.tree.TerminateQuery; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.rest.entity.CommandId.Type; import java.util.HashMap; @@ -85,7 +86,11 @@ public static CompactedNode maybeAppend( final Optional plan = command.getPlan(); if (queued.getAndDeserializeCommandId().getType() == Type.TERMINATE) { final QueryId queryId = new QueryId(queued.getAndDeserializeCommandId().getEntity()); - markShouldSkip(queryId, latestNodeWithId); + if (queryId.toString().equals(TerminateQuery.ALL_QUERIES)) { + latestNodeWithId.values().forEach(node -> node.shouldSkip = true); + } else { + markShouldSkip(queryId, latestNodeWithId); + } // terminate commands don't get added to the list of commands to execute // because we "execute" them in this class by removing query plans from diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index 36d03bc73b4d..f4f912df6550 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -648,6 +648,19 @@ public void shouldRecoverTerminates() { shouldRecover(commands); } + @Test + public void shouldRecoverTerminateAll() { + server1.submitCommands( + "CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');", + "CREATE STREAM B AS SELECT * FROM A;", + "INSERT INTO B SELECT * FROM A;", + "TERMINATE ALL;", + "DROP STREAM B;", + "CREATE STREAM B AS SELECT * FROM A;" + ); + shouldRecover(commands); + } + @Test public void shouldRecoverDrop() { server1.submitCommands(