diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index e8ae5a9d55ea..4504750791e7 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -750,7 +750,9 @@ static KsqlRestApplication buildApplication( KsqlRestConfig.KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS)), metricsPrefix, InternalTopicSerdes.deserializer(Command.class), - errorHandler + errorHandler, + serviceContext.getTopicClient(), + commandTopicName ); final KsqlResource ksqlResource = new KsqlResource( diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java index c3cf82c1d662..0bb68edced0a 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java @@ -22,6 +22,7 @@ import io.confluent.ksql.rest.server.state.ServerState; import io.confluent.ksql.rest.util.ClusterTerminator; import io.confluent.ksql.rest.util.TerminateCluster; +import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.util.Pair; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.RetryUtil; @@ -40,7 +41,9 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.Deserializer; @@ -81,6 +84,8 @@ public class CommandRunner implements Closeable { private final Consumer incompatibleCommandChecker; private final Errors errorHandler; private boolean incompatibleCommandDetected; + private final Supplier commandTopicExists; + private boolean commandTopicDeleted; private Status state = new Status(CommandRunnerStatus.RUNNING, CommandRunnerDegradedReason.NONE); public enum CommandRunnerStatus { @@ -91,8 +96,8 @@ public enum CommandRunnerStatus { public enum CommandRunnerDegradedReason { NONE(errors -> ""), - CORRUPTED(Errors:: commandRunnerDegradedBackupCorruptedErrorMessage), - INCOMPATIBLE_COMMAND(Errors:: commandRunnerDegradedIncompatibleCommandsErrorMessage); + CORRUPTED(Errors::commandRunnerDegradedCorruptedErrorMessage), + INCOMPATIBLE_COMMAND(Errors::commandRunnerDegradedIncompatibleCommandsErrorMessage); private final Function msgFactory; @@ -110,8 +115,8 @@ public static class Status { private final CommandRunnerDegradedReason degradedReason; public Status( - final CommandRunnerStatus status, - final CommandRunnerDegradedReason degradedReason + final CommandRunnerStatus status, + final CommandRunnerDegradedReason degradedReason ) { this.status = status; this.degradedReason = degradedReason; @@ -137,7 +142,9 @@ public CommandRunner( final Duration commandRunnerHealthTimeout, final String metricsGroupPrefix, final Deserializer commandDeserializer, - final Errors errorHandler + final Errors errorHandler, + final KafkaTopicClient kafkaTopicClient, + final String commandTopicName ) { this( statementExecutor, @@ -156,7 +163,8 @@ public CommandRunner( queuedCommand.getAndDeserializeCommand(commandDeserializer); }, commandDeserializer, - errorHandler + errorHandler, + () -> kafkaTopicClient.isTopicExists(commandTopicName) ); } @@ -176,7 +184,8 @@ public CommandRunner( final Function, List> compactor, final Consumer incompatibleCommandChecker, final Deserializer commandDeserializer, - final Errors errorHandler + final Errors errorHandler, + final Supplier commandTopicExists ) { // CHECKSTYLE_RULES.ON: ParameterNumberCheck this.statementExecutor = Objects.requireNonNull(statementExecutor, "statementExecutor"); @@ -199,7 +208,10 @@ public CommandRunner( Objects.requireNonNull(commandDeserializer, "commandDeserializer"); this.errorHandler = Objects.requireNonNull(errorHandler, "errorHandler"); + this.commandTopicExists = + Objects.requireNonNull(commandTopicExists, "commandTopicExists"); this.incompatibleCommandDetected = false; + this.commandTopicDeleted = false; } /** @@ -289,6 +301,9 @@ void fetchAndRunCommands() { lastPollTime.set(clock.instant()); final List commands = commandStore.getNewCommands(NEW_CMDS_TIMEOUT); if (commands.isEmpty()) { + if (!commandTopicExists.get()) { + commandTopicDeleted = true; + } return; } @@ -428,6 +443,13 @@ public void run() { CommandRunnerDegradedReason.CORRUPTED ); closeEarly(); + } else if (commandTopicDeleted) { + LOG.warn("CommandRunner entering degraded state due to command topic deletion."); + state = new Status( + CommandRunnerStatus.DEGRADED, + CommandRunnerDegradedReason.CORRUPTED + ); + closeEarly(); } else { LOG.trace("Polling for new writes to command topic"); fetchAndRunCommands(); @@ -437,6 +459,13 @@ public void run() { if (!closed) { throw wue; } + } catch (final OffsetOutOfRangeException e) { + LOG.warn("The command topic offset was reset. CommandRunner thread exiting."); + state = new Status( + CommandRunnerStatus.DEGRADED, + CommandRunnerDegradedReason.CORRUPTED + ); + closeEarly(); } finally { commandStore.close(); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicBackupImplTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicBackupImplTest.java index 43116db9ccb2..c8dbbd57cd83 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicBackupImplTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicBackupImplTest.java @@ -18,7 +18,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThrows; import static org.mockito.Mockito.when; diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java index 8ec5c7b6ab3e..7d3dc09f1851 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java @@ -55,6 +55,8 @@ import java.util.function.Function; import java.util.function.Supplier; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Deserializer; import org.junit.Before; @@ -70,7 +72,7 @@ @RunWith(MockitoJUnitRunner.class) public class CommandRunnerTest { private static final long COMMAND_RUNNER_HEALTH_TIMEOUT = 1000; - private static final String BACKUP_CORRUPTED_ERROR_MESSAGE = "corrupted"; + private static final String CORRUPTED_ERROR_MESSAGE = "corrupted"; private static final String INCOMPATIBLE_COMMANDS_ERROR_MESSAGE = "incompatible"; @Mock @@ -104,6 +106,8 @@ public class CommandRunnerTest { @Mock private Deserializer commandDeserializer; @Mock + private Supplier commandTopicExists; + @Mock private Errors errorHandler; @Captor private ArgumentCaptor threadTaskCaptor; @@ -126,10 +130,11 @@ public void setup() { doNothing().when(incompatibleCommandChecker).accept(queuedCommand3); when(commandStore.corruptionDetected()).thenReturn(false); + when(commandTopicExists.get()).thenReturn(true); when(compactor.apply(any())).thenAnswer(inv -> inv.getArgument(0)); when(errorHandler.commandRunnerDegradedIncompatibleCommandsErrorMessage()).thenReturn(INCOMPATIBLE_COMMANDS_ERROR_MESSAGE); - when(errorHandler.commandRunnerDegradedBackupCorruptedErrorMessage()).thenReturn(BACKUP_CORRUPTED_ERROR_MESSAGE); - + when(errorHandler.commandRunnerDegradedCorruptedErrorMessage()).thenReturn(CORRUPTED_ERROR_MESSAGE); + givenQueuedCommands(queuedCommand1, queuedCommand2, queuedCommand3); commandRunner = new CommandRunner( @@ -146,7 +151,8 @@ public void setup() { compactor, incompatibleCommandChecker, commandDeserializer, - errorHandler + errorHandler, + commandTopicExists ); } @@ -329,10 +335,29 @@ public void shouldNotProcessCommandTopicIfBackupCorrupted() throws InterruptedEx inOrder.verify(executor).awaitTermination(anyLong(), any()); inOrder.verify(commandStore).close(); assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.DEGRADED)); - assertThat(commandRunner.getCommandRunnerDegradedWarning(), is(BACKUP_CORRUPTED_ERROR_MESSAGE)); + assertThat(commandRunner.getCommandRunnerDegradedWarning(), is(CORRUPTED_ERROR_MESSAGE)); assertThat(commandRunner.getCommandRunnerDegradedReason(), is(CommandRunner.CommandRunnerDegradedReason.CORRUPTED)); } + @Test + public void shouldEnterDegradedStateIfCommandTopicMissing() { + // Given: + givenQueuedCommands(); + when(commandTopicExists.get()).thenReturn(false); + + // When: + commandRunner.start(); + + final Runnable threadTask = getThreadTask(); + threadTask.run(); + + assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.DEGRADED)); + assertThat(commandRunner.getCommandRunnerDegradedWarning(), is(CORRUPTED_ERROR_MESSAGE)); + assertThat( + commandRunner.getCommandRunnerDegradedReason(), + is(CommandRunner.CommandRunnerDegradedReason.CORRUPTED)); + } + @Test public void shouldPullAndRunStatements() { // Given: @@ -524,6 +549,26 @@ public void shouldCloseEarlyWhenSerializationExceptionInFetch() throws Exception inOrder.verify(commandStore).close(); } + @Test + public void shouldCloseEarlyWhenOffsetOutOfRangeException() throws Exception { + // Given: + when(commandStore.getNewCommands(any())) + .thenReturn(Collections.singletonList(queuedCommand1)) + .thenThrow(new OffsetOutOfRangeException(Collections.singletonMap(new TopicPartition("command_topic", 0), 0L))); + + // When: + commandRunner.start(); + verify(commandStore, never()).close(); + final Runnable threadTask = getThreadTask(); + threadTask.run(); + + // Then: + final InOrder inOrder = inOrder(executor, commandStore); + inOrder.verify(commandStore).wakeup(); + inOrder.verify(executor).awaitTermination(anyLong(), any()); + inOrder.verify(commandStore).close(); + } + @Test public void shouldCloseTheCommandRunnerCorrectly() throws Exception { // Given: diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index 420a4f8c92ce..6c7eb843b17e 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -46,7 +46,6 @@ import io.confluent.ksql.rest.entity.CommandId; import io.confluent.ksql.rest.entity.KsqlRequest; import io.confluent.ksql.rest.server.CommandTopicBackup; -import io.confluent.ksql.rest.server.CommandTopicBackupNoOp; import io.confluent.ksql.rest.server.resources.KsqlResource; import io.confluent.ksql.rest.server.state.ServerState; import io.confluent.ksql.rest.util.ClusterTerminator; @@ -68,6 +67,7 @@ import java.util.Map.Entry; import java.util.Optional; import java.util.Set; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.streams.StreamsConfig; @@ -231,7 +231,9 @@ private class KsqlServer { Duration.ofMillis(2000), "", InternalTopicSerdes.deserializer(Command.class), - errorHandler + errorHandler, + topicClient, + "command_topic" ); this.ksqlResource = new KsqlResource( @@ -567,6 +569,7 @@ private void shouldRecover(final List commands) { @Before public void setUp() { topicClient.preconditionTopicExists("A"); + topicClient.preconditionTopicExists("command_topic"); } @Test diff --git a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/DefaultErrorMessages.java b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/DefaultErrorMessages.java index e91b24805f8d..6037391037cb 100644 --- a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/DefaultErrorMessages.java +++ b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/DefaultErrorMessages.java @@ -25,19 +25,16 @@ public class DefaultErrorMessages implements ErrorMessages { + System.lineSeparator() + "This is most likely due to the service being rolled back to an earlier version."; - public static final String COMMAND_RUNNER_DEGRADED_BACKUP_CORRUPTED_ERROR_MESSAGE = - "The server has detected that the command topic may be corrupted. The backup of the " - + "command topic does not match the current contents of command topic." + public static final String COMMAND_RUNNER_DEGRADED_CORRUPTED_ERROR_MESSAGE = + "The server has detected corruption in the command topic due " + + "to modifications performed on it. " + System.lineSeparator() - + "DDL statements will not be processed until either:" + + "DDL statements will not be processed any further." + System.lineSeparator() - + "1. The current command topic is deleted and the backup file is used " - + "to restore the command topic." + + "If a backup of the command topic is available, " + + "restore the command topic using the backup file." + System.lineSeparator() - + "2. The current backup file is deleted." - + System.lineSeparator() - + "The server must be restarted after performing either operation in order to resume " - + "normal functionality"; + + "A server restart is required to restore full functionality."; @Override @@ -70,7 +67,7 @@ public String commandRunnerDegradedIncompatibleCommandsErrorMessage() { } @Override - public String commandRunnerDegradedBackupCorruptedErrorMessage() { - return COMMAND_RUNNER_DEGRADED_BACKUP_CORRUPTED_ERROR_MESSAGE; + public String commandRunnerDegradedCorruptedErrorMessage() { + return COMMAND_RUNNER_DEGRADED_CORRUPTED_ERROR_MESSAGE; } } diff --git a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/ErrorMessages.java b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/ErrorMessages.java index 56d912e45b33..269e0e9f1487 100644 --- a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/ErrorMessages.java +++ b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/ErrorMessages.java @@ -25,5 +25,5 @@ public interface ErrorMessages { String commandRunnerDegradedIncompatibleCommandsErrorMessage(); - String commandRunnerDegradedBackupCorruptedErrorMessage(); + String commandRunnerDegradedCorruptedErrorMessage(); } diff --git a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java index 85f5407adff2..e929982cc83e 100644 --- a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java +++ b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java @@ -230,8 +230,8 @@ public String commandRunnerDegradedIncompatibleCommandsErrorMessage() { return errorMessages.commandRunnerDegradedIncompatibleCommandsErrorMessage(); } - public String commandRunnerDegradedBackupCorruptedErrorMessage() { - return errorMessages.commandRunnerDegradedBackupCorruptedErrorMessage(); + public String commandRunnerDegradedCorruptedErrorMessage() { + return errorMessages.commandRunnerDegradedCorruptedErrorMessage(); } public EndpointResponse generateResponse(