Skip to content

Commit

Permalink
fix: recovery hangs when using TERMINATE ALL (#6397)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra authored Oct 9, 2020
1 parent d05a4df commit 7a57b3c
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ protected Void visitDropTable(final DropTable node, final Integer context) {
@Override
protected Void visitTerminateQuery(final TerminateQuery node, final Integer context) {
builder.append("TERMINATE ");
builder.append(node.getQueryId().map(QueryId::toString).orElse("ALL"));
builder.append(node.getQueryId().map(QueryId::toString).orElse(TerminateQuery.ALL_QUERIES));
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
@Immutable
public final class TerminateQuery extends Statement {

public static final String ALL_QUERIES = "ALL";
private final Optional<QueryId> queryId;

public static TerminateQuery all(final Optional<NodeLocation> location) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private static CommandId getDropTypeCommandId(final DropType dropType) {
private static CommandId getTerminateCommandId(final TerminateQuery terminateQuery) {
return new CommandId(
CommandId.Type.TERMINATE,
terminateQuery.getQueryId().map(QueryId::toString).orElse("ALL"),
terminateQuery.getQueryId().map(QueryId::toString).orElse(TerminateQuery.ALL_QUERIES),
CommandId.Action.EXECUTE
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.rest.server.computation;

import io.confluent.ksql.engine.KsqlPlan;
import io.confluent.ksql.parser.tree.TerminateQuery;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.rest.entity.CommandId.Type;
import java.util.HashMap;
Expand Down Expand Up @@ -85,7 +86,11 @@ public static CompactedNode maybeAppend(
final Optional<KsqlPlan> plan = command.getPlan();
if (queued.getAndDeserializeCommandId().getType() == Type.TERMINATE) {
final QueryId queryId = new QueryId(queued.getAndDeserializeCommandId().getEntity());
markShouldSkip(queryId, latestNodeWithId);
if (queryId.toString().equals(TerminateQuery.ALL_QUERIES)) {
latestNodeWithId.values().forEach(node -> node.shouldSkip = true);
} else {
markShouldSkip(queryId, latestNodeWithId);
}

// terminate commands don't get added to the list of commands to execute
// because we "execute" them in this class by removing query plans from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,19 @@ public void shouldRecoverTerminates() {
shouldRecover(commands);
}

@Test
public void shouldRecoverTerminateAll() {
server1.submitCommands(
"CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B AS SELECT * FROM A;",
"INSERT INTO B SELECT * FROM A;",
"TERMINATE ALL;",
"DROP STREAM B;",
"CREATE STREAM B AS SELECT * FROM A;"
);
shouldRecover(commands);
}

@Test
public void shouldRecoverDrop() {
server1.submitCommands(
Expand Down

0 comments on commit 7a57b3c

Please sign in to comment.