Skip to content

Commit

Permalink
feat: surface error to user when command topic deleted while server r…
Browse files Browse the repository at this point in the history
…unning (#6240)
  • Loading branch information
stevenpyzhang authored Sep 23, 2020
1 parent dc48b70 commit c5d6b56
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,9 @@ static KsqlRestApplication buildApplication(
KsqlRestConfig.KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS)),
metricsPrefix,
InternalTopicSerdes.deserializer(Command.class),
errorHandler
errorHandler,
serviceContext.getTopicClient(),
commandTopicName
);

final KsqlResource ksqlResource = new KsqlResource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.ClusterTerminator;
import io.confluent.ksql.rest.util.TerminateCluster;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.RetryUtil;
Expand All @@ -40,7 +41,9 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.Deserializer;
Expand Down Expand Up @@ -81,6 +84,8 @@ public class CommandRunner implements Closeable {
private final Consumer<QueuedCommand> incompatibleCommandChecker;
private final Errors errorHandler;
private boolean incompatibleCommandDetected;
private final Supplier<Boolean> commandTopicExists;
private boolean commandTopicDeleted;
private Status state = new Status(CommandRunnerStatus.RUNNING, CommandRunnerDegradedReason.NONE);

public enum CommandRunnerStatus {
Expand All @@ -91,8 +96,8 @@ public enum CommandRunnerStatus {

public enum CommandRunnerDegradedReason {
NONE(errors -> ""),
CORRUPTED(Errors:: commandRunnerDegradedBackupCorruptedErrorMessage),
INCOMPATIBLE_COMMAND(Errors:: commandRunnerDegradedIncompatibleCommandsErrorMessage);
CORRUPTED(Errors::commandRunnerDegradedCorruptedErrorMessage),
INCOMPATIBLE_COMMAND(Errors::commandRunnerDegradedIncompatibleCommandsErrorMessage);

private final Function<Errors, String> msgFactory;

Expand All @@ -110,8 +115,8 @@ public static class Status {
private final CommandRunnerDegradedReason degradedReason;

public Status(
final CommandRunnerStatus status,
final CommandRunnerDegradedReason degradedReason
final CommandRunnerStatus status,
final CommandRunnerDegradedReason degradedReason
) {
this.status = status;
this.degradedReason = degradedReason;
Expand All @@ -137,7 +142,9 @@ public CommandRunner(
final Duration commandRunnerHealthTimeout,
final String metricsGroupPrefix,
final Deserializer<Command> commandDeserializer,
final Errors errorHandler
final Errors errorHandler,
final KafkaTopicClient kafkaTopicClient,
final String commandTopicName
) {
this(
statementExecutor,
Expand All @@ -156,7 +163,8 @@ public CommandRunner(
queuedCommand.getAndDeserializeCommand(commandDeserializer);
},
commandDeserializer,
errorHandler
errorHandler,
() -> kafkaTopicClient.isTopicExists(commandTopicName)
);
}

Expand All @@ -176,7 +184,8 @@ public CommandRunner(
final Function<List<QueuedCommand>, List<QueuedCommand>> compactor,
final Consumer<QueuedCommand> incompatibleCommandChecker,
final Deserializer<Command> commandDeserializer,
final Errors errorHandler
final Errors errorHandler,
final Supplier<Boolean> commandTopicExists
) {
// CHECKSTYLE_RULES.ON: ParameterNumberCheck
this.statementExecutor = Objects.requireNonNull(statementExecutor, "statementExecutor");
Expand All @@ -199,7 +208,10 @@ public CommandRunner(
Objects.requireNonNull(commandDeserializer, "commandDeserializer");
this.errorHandler =
Objects.requireNonNull(errorHandler, "errorHandler");
this.commandTopicExists =
Objects.requireNonNull(commandTopicExists, "commandTopicExists");
this.incompatibleCommandDetected = false;
this.commandTopicDeleted = false;
}

/**
Expand Down Expand Up @@ -289,6 +301,9 @@ void fetchAndRunCommands() {
lastPollTime.set(clock.instant());
final List<QueuedCommand> commands = commandStore.getNewCommands(NEW_CMDS_TIMEOUT);
if (commands.isEmpty()) {
if (!commandTopicExists.get()) {
commandTopicDeleted = true;
}
return;
}

Expand Down Expand Up @@ -428,6 +443,13 @@ public void run() {
CommandRunnerDegradedReason.CORRUPTED
);
closeEarly();
} else if (commandTopicDeleted) {
LOG.warn("CommandRunner entering degraded state due to command topic deletion.");
state = new Status(
CommandRunnerStatus.DEGRADED,
CommandRunnerDegradedReason.CORRUPTED
);
closeEarly();
} else {
LOG.trace("Polling for new writes to command topic");
fetchAndRunCommands();
Expand All @@ -437,6 +459,13 @@ public void run() {
if (!closed) {
throw wue;
}
} catch (final OffsetOutOfRangeException e) {
LOG.warn("The command topic offset was reset. CommandRunner thread exiting.");
state = new Status(
CommandRunnerStatus.DEGRADED,
CommandRunnerDegradedReason.CORRUPTED
);
closeEarly();
} finally {
commandStore.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.when;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import java.util.function.Function;
import java.util.function.Supplier;

import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.junit.Before;
Expand All @@ -70,7 +72,7 @@
@RunWith(MockitoJUnitRunner.class)
public class CommandRunnerTest {
private static final long COMMAND_RUNNER_HEALTH_TIMEOUT = 1000;
private static final String BACKUP_CORRUPTED_ERROR_MESSAGE = "corrupted";
private static final String CORRUPTED_ERROR_MESSAGE = "corrupted";
private static final String INCOMPATIBLE_COMMANDS_ERROR_MESSAGE = "incompatible";

@Mock
Expand Down Expand Up @@ -104,6 +106,8 @@ public class CommandRunnerTest {
@Mock
private Deserializer<Command> commandDeserializer;
@Mock
private Supplier<Boolean> commandTopicExists;
@Mock
private Errors errorHandler;
@Captor
private ArgumentCaptor<Runnable> threadTaskCaptor;
Expand All @@ -126,10 +130,11 @@ public void setup() {
doNothing().when(incompatibleCommandChecker).accept(queuedCommand3);

when(commandStore.corruptionDetected()).thenReturn(false);
when(commandTopicExists.get()).thenReturn(true);
when(compactor.apply(any())).thenAnswer(inv -> inv.getArgument(0));
when(errorHandler.commandRunnerDegradedIncompatibleCommandsErrorMessage()).thenReturn(INCOMPATIBLE_COMMANDS_ERROR_MESSAGE);
when(errorHandler.commandRunnerDegradedBackupCorruptedErrorMessage()).thenReturn(BACKUP_CORRUPTED_ERROR_MESSAGE);

when(errorHandler.commandRunnerDegradedCorruptedErrorMessage()).thenReturn(CORRUPTED_ERROR_MESSAGE);
givenQueuedCommands(queuedCommand1, queuedCommand2, queuedCommand3);

commandRunner = new CommandRunner(
Expand All @@ -146,7 +151,8 @@ public void setup() {
compactor,
incompatibleCommandChecker,
commandDeserializer,
errorHandler
errorHandler,
commandTopicExists
);
}

Expand Down Expand Up @@ -329,10 +335,29 @@ public void shouldNotProcessCommandTopicIfBackupCorrupted() throws InterruptedEx
inOrder.verify(executor).awaitTermination(anyLong(), any());
inOrder.verify(commandStore).close();
assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.DEGRADED));
assertThat(commandRunner.getCommandRunnerDegradedWarning(), is(BACKUP_CORRUPTED_ERROR_MESSAGE));
assertThat(commandRunner.getCommandRunnerDegradedWarning(), is(CORRUPTED_ERROR_MESSAGE));
assertThat(commandRunner.getCommandRunnerDegradedReason(), is(CommandRunner.CommandRunnerDegradedReason.CORRUPTED));
}

@Test
public void shouldEnterDegradedStateIfCommandTopicMissing() {
// Given:
givenQueuedCommands();
when(commandTopicExists.get()).thenReturn(false);

// When:
commandRunner.start();

final Runnable threadTask = getThreadTask();
threadTask.run();

assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.DEGRADED));
assertThat(commandRunner.getCommandRunnerDegradedWarning(), is(CORRUPTED_ERROR_MESSAGE));
assertThat(
commandRunner.getCommandRunnerDegradedReason(),
is(CommandRunner.CommandRunnerDegradedReason.CORRUPTED));
}

@Test
public void shouldPullAndRunStatements() {
// Given:
Expand Down Expand Up @@ -524,6 +549,26 @@ public void shouldCloseEarlyWhenSerializationExceptionInFetch() throws Exception
inOrder.verify(commandStore).close();
}

@Test
public void shouldCloseEarlyWhenOffsetOutOfRangeException() throws Exception {
// Given:
when(commandStore.getNewCommands(any()))
.thenReturn(Collections.singletonList(queuedCommand1))
.thenThrow(new OffsetOutOfRangeException(Collections.singletonMap(new TopicPartition("command_topic", 0), 0L)));

// When:
commandRunner.start();
verify(commandStore, never()).close();
final Runnable threadTask = getThreadTask();
threadTask.run();

// Then:
final InOrder inOrder = inOrder(executor, commandStore);
inOrder.verify(commandStore).wakeup();
inOrder.verify(executor).awaitTermination(anyLong(), any());
inOrder.verify(commandStore).close();
}

@Test
public void shouldCloseTheCommandRunnerCorrectly() throws Exception {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.entity.KsqlRequest;
import io.confluent.ksql.rest.server.CommandTopicBackup;
import io.confluent.ksql.rest.server.CommandTopicBackupNoOp;
import io.confluent.ksql.rest.server.resources.KsqlResource;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.ClusterTerminator;
Expand All @@ -68,6 +67,7 @@
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.streams.StreamsConfig;
Expand Down Expand Up @@ -231,7 +231,9 @@ private class KsqlServer {
Duration.ofMillis(2000),
"",
InternalTopicSerdes.deserializer(Command.class),
errorHandler
errorHandler,
topicClient,
"command_topic"
);

this.ksqlResource = new KsqlResource(
Expand Down Expand Up @@ -567,6 +569,7 @@ private void shouldRecover(final List<QueuedCommand> commands) {
@Before
public void setUp() {
topicClient.preconditionTopicExists("A");
topicClient.preconditionTopicExists("command_topic");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,16 @@ public class DefaultErrorMessages implements ErrorMessages {
+ System.lineSeparator()
+ "This is most likely due to the service being rolled back to an earlier version.";

public static final String COMMAND_RUNNER_DEGRADED_BACKUP_CORRUPTED_ERROR_MESSAGE =
"The server has detected that the command topic may be corrupted. The backup of the "
+ "command topic does not match the current contents of command topic."
public static final String COMMAND_RUNNER_DEGRADED_CORRUPTED_ERROR_MESSAGE =
"The server has detected corruption in the command topic due "
+ "to modifications performed on it. "
+ System.lineSeparator()
+ "DDL statements will not be processed until either:"
+ "DDL statements will not be processed any further."
+ System.lineSeparator()
+ "1. The current command topic is deleted and the backup file is used "
+ "to restore the command topic."
+ "If a backup of the command topic is available, "
+ "restore the command topic using the backup file."
+ System.lineSeparator()
+ "2. The current backup file is deleted."
+ System.lineSeparator()
+ "The server must be restarted after performing either operation in order to resume "
+ "normal functionality";
+ "A server restart is required to restore full functionality.";


@Override
Expand Down Expand Up @@ -70,7 +67,7 @@ public String commandRunnerDegradedIncompatibleCommandsErrorMessage() {
}

@Override
public String commandRunnerDegradedBackupCorruptedErrorMessage() {
return COMMAND_RUNNER_DEGRADED_BACKUP_CORRUPTED_ERROR_MESSAGE;
public String commandRunnerDegradedCorruptedErrorMessage() {
return COMMAND_RUNNER_DEGRADED_CORRUPTED_ERROR_MESSAGE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ public interface ErrorMessages {

String commandRunnerDegradedIncompatibleCommandsErrorMessage();

String commandRunnerDegradedBackupCorruptedErrorMessage();
String commandRunnerDegradedCorruptedErrorMessage();
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ public String commandRunnerDegradedIncompatibleCommandsErrorMessage() {
return errorMessages.commandRunnerDegradedIncompatibleCommandsErrorMessage();
}

public String commandRunnerDegradedBackupCorruptedErrorMessage() {
return errorMessages.commandRunnerDegradedBackupCorruptedErrorMessage();
public String commandRunnerDegradedCorruptedErrorMessage() {
return errorMessages.commandRunnerDegradedCorruptedErrorMessage();
}

public EndpointResponse generateResponse(
Expand Down

0 comments on commit c5d6b56

Please sign in to comment.