Skip to content

Commit

Permalink
feat: DistributingExecutor fails DDL statement if CommandRunner DEGRA…
Browse files Browse the repository at this point in the history
…DED (#6031)

* feat: move deserialization to CommandRunner and introduce DEGRADED to CommandRunnerStatus

* rohan comment

* fix test

* more feedback

* pass deserializer to queuedcommand for command

* chore: refactor DistributingExecutor to use CommandRunner instead of CommandQueue

* rohan comment
  • Loading branch information
stevenpyzhang authored Aug 20, 2020
1 parent ab8cec2 commit 62b6d9a
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -719,16 +719,6 @@ static KsqlRestApplication buildApplication(
denyListPropertyValidator
);

final KsqlResource ksqlResource = new KsqlResource(
ksqlEngine,
commandStore,
Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)),
versionChecker::updateLastRequestTime,
authorizationValidator,
errorHandler,
denyListPropertyValidator
);

final List<String> managedTopics = new LinkedList<>();
managedTopics.add(commandTopicName);
if (processingLogConfig.getBoolean(ProcessingLogConfig.TOPIC_AUTO_CREATE)) {
Expand All @@ -747,6 +737,16 @@ static KsqlRestApplication buildApplication(
metricsPrefix,
InternalTopicSerdes.deserializer(Command.class)
);

final KsqlResource ksqlResource = new KsqlResource(
ksqlEngine,
commandRunner,
Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)),
versionChecker::updateLastRequestTime,
authorizationValidator,
errorHandler,
denyListPropertyValidator
);

final QueryMonitor queryMonitor = new QueryMonitor(ksqlConfig, ksqlEngine);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ private void terminateCluster(final Command command) {
LOG.info("The KSQL server was terminated.");
}

CommandRunnerStatus checkCommandRunnerStatus() {
public CommandRunnerStatus checkCommandRunnerStatus() {
if (deserializationErrorThrown) {
return CommandRunnerStatus.DEGRADED;
}
Expand Down Expand Up @@ -341,6 +341,10 @@ private List<QueuedCommand> checkForIncompatibleCommands(final List<QueuedComman
return compatibleCommands;
}

public CommandQueue getCommandQueue() {
return commandStore;
}

private class Runner implements Runnable {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Supplier;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
Expand All @@ -58,6 +60,7 @@ public class DistributingExecutor {
private final CommandIdAssigner commandIdAssigner;
private final ReservedInternalTopics internalTopics;
private final Errors errorHandler;
private final Supplier<Boolean> commandRunnerDegraded;

public DistributingExecutor(
final KsqlConfig ksqlConfig,
Expand All @@ -66,9 +69,10 @@ public DistributingExecutor(
final BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory,
final Optional<KsqlAuthorizationValidator> authorizationValidator,
final ValidatedCommandFactory validatedCommandFactory,
final Errors errorHandler
final Errors errorHandler,
final Supplier<Boolean> commandRunnerDegraded
) {
this.commandQueue = Objects.requireNonNull(commandQueue, "commandQueue");
this.commandQueue = commandQueue;
this.distributedCmdResponseTimeout =
Objects.requireNonNull(distributedCmdResponseTimeout, "distributedCmdResponseTimeout");
this.injectorFactory = Objects.requireNonNull(injectorFactory, "injectorFactory");
Expand All @@ -82,6 +86,8 @@ public DistributingExecutor(
this.internalTopics =
new ReservedInternalTopics(Objects.requireNonNull(ksqlConfig, "ksqlConfig"));
this.errorHandler = Objects.requireNonNull(errorHandler, "errorHandler");
this.commandRunnerDegraded =
Objects.requireNonNull(commandRunnerDegraded, "commandRunnerDegraded");
}

/**
Expand All @@ -99,6 +105,11 @@ public Optional<KsqlEntity> execute(
final KsqlExecutionContext executionContext,
final KsqlSecurityContext securityContext
) {
if (commandRunnerDegraded.get()) {
throw new KsqlServerException("Failed to handle Ksql Statement."
+ System.lineSeparator()
+ errorHandler.commandRunnerDegradedErrorMessage());
}
final ConfiguredStatement<?> injected = injectorFactory
.apply(executionContext, securityContext.getServiceContext())
.inject(statement);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlRequest;
import io.confluent.ksql.rest.server.ServerUtil;
import io.confluent.ksql.rest.server.computation.CommandQueue;
import io.confluent.ksql.rest.server.computation.CommandRunner;
import io.confluent.ksql.rest.server.computation.DistributingExecutor;
import io.confluent.ksql.rest.server.computation.ValidatedCommandFactory;
import io.confluent.ksql.rest.server.execution.CustomExecutors;
Expand Down Expand Up @@ -93,7 +93,7 @@ public class KsqlResource implements KsqlConfigurable {
.build();

private final KsqlEngine ksqlEngine;
private final CommandQueue commandQueue;
private final CommandRunner commandRunner;
private final Duration distributedCmdResponseTimeout;
private final ActivenessRegistrar activenessRegistrar;
private final BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory;
Expand All @@ -107,7 +107,7 @@ public class KsqlResource implements KsqlConfigurable {

public KsqlResource(
final KsqlEngine ksqlEngine,
final CommandQueue commandQueue,
final CommandRunner commandRunner,
final Duration distributedCmdResponseTimeout,
final ActivenessRegistrar activenessRegistrar,
final Optional<KsqlAuthorizationValidator> authorizationValidator,
Expand All @@ -116,7 +116,7 @@ public KsqlResource(
) {
this(
ksqlEngine,
commandQueue,
commandRunner,
distributedCmdResponseTimeout,
activenessRegistrar,
Injectors.DEFAULT,
Expand All @@ -128,7 +128,7 @@ public KsqlResource(

KsqlResource(
final KsqlEngine ksqlEngine,
final CommandQueue commandQueue,
final CommandRunner commandRunner,
final Duration distributedCmdResponseTimeout,
final ActivenessRegistrar activenessRegistrar,
final BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory,
Expand All @@ -137,7 +137,7 @@ public KsqlResource(
final DenyListPropertyValidator denyListPropertyValidator
) {
this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine");
this.commandQueue = Objects.requireNonNull(commandQueue, "commandQueue");
this.commandRunner = Objects.requireNonNull(commandRunner, "commandRunner");
this.distributedCmdResponseTimeout =
Objects.requireNonNull(distributedCmdResponseTimeout, "distributedCmdResponseTimeout");
this.activenessRegistrar =
Expand Down Expand Up @@ -180,17 +180,19 @@ public void configure(final KsqlConfig config) {
CustomExecutors.EXECUTOR_MAP,
new DistributingExecutor(
config,
commandQueue,
commandRunner.getCommandQueue(),
distributedCmdResponseTimeout,
injectorFactory,
authorizationValidator,
new ValidatedCommandFactory(),
errorHandler
errorHandler,
() -> commandRunner.checkCommandRunnerStatus()
== CommandRunner.CommandRunnerStatus.DEGRADED
),
ksqlEngine,
config,
new DefaultCommandQueueSync(
commandQueue,
commandRunner.getCommandQueue(),
KsqlResource::shouldSynchronize,
distributedCmdResponseTimeout
)
Expand Down Expand Up @@ -239,7 +241,7 @@ public EndpointResponse handleKsqlStatements(

try {
CommandStoreUtil.httpWaitForCommandSequenceNumber(
commandQueue,
commandRunner.getCommandQueue(),
request,
distributedCmdResponseTimeout);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
import java.util.HashMap;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.errors.TimeoutException;
import org.junit.Before;
Expand Down Expand Up @@ -130,6 +132,8 @@ CommonCreateConfigs.VALUE_FORMAT_PROPERTY, new StringLiteral("json")
private Command command;
@Mock
private Errors errorHandler;
@Mock
private Supplier<Boolean> commandRunnerDegraded;

private DistributingExecutor distributor;
private AtomicLong scnCounter;
Expand All @@ -146,6 +150,7 @@ public void setUp() throws InterruptedException {
when(status.getCommandSequenceNumber()).thenAnswer(inv -> scnCounter.incrementAndGet());
when(executionContext.getMetaStore()).thenReturn(metaStore);
when(executionContext.createSandbox(any())).thenReturn(sandboxContext);
when(commandRunnerDegraded.get()).thenReturn(false);
serviceContext = SandboxedServiceContext.create(TestServiceContext.create());
when(executionContext.getServiceContext()).thenReturn(serviceContext);
when(validatedCommandFactory.create(any(), any())).thenReturn(command);
Expand All @@ -160,7 +165,8 @@ public void setUp() throws InterruptedException {
(ec, sc) -> InjectorChain.of(schemaInjector, topicInjector),
Optional.of(authorizationValidator),
validatedCommandFactory,
errorHandler
errorHandler,
commandRunnerDegraded
);
}

Expand Down Expand Up @@ -225,6 +231,18 @@ public void shouldReturnCommandStatus() {
equalTo(new CommandStatusEntity("", CS_COMMAND, SUCCESS_STATUS, 1L)));
}

@Test
public void shouldNotInitTransactionWhenCommandRunnerDegraded() {
// When:
when(commandRunnerDegraded.get()).thenReturn(true);

// Then:
assertThrows(
KsqlServerException.class,
() -> distributor.execute(CONFIGURED_STATEMENT, executionContext, securityContext)
);
}

@Test
public void shouldThrowExceptionOnFailureToEnqueue() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ private class KsqlServer {

this.ksqlResource = new KsqlResource(
ksqlEngine,
fakeCommandQueue,
commandRunner,
Duration.ofMillis(0),
()->{},
Optional.of((sc, metastore, statement) -> { }),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import io.confluent.ksql.rest.entity.TablesList;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.rest.server.computation.Command;
import io.confluent.ksql.rest.server.computation.CommandRunner;
import io.confluent.ksql.rest.server.computation.CommandStatusFuture;
import io.confluent.ksql.rest.server.computation.CommandStore;
import io.confluent.ksql.rest.server.computation.QueuedCommandStatus;
Expand Down Expand Up @@ -277,6 +278,8 @@ public class KsqlResourceTest {
@Mock
private CommandStore commandStore;
@Mock
private CommandRunner commandRunner;
@Mock
private ActivenessRegistrar activenessRegistrar;
@Mock
private Function<ServiceContext, Injector> schemaInjectorFactory;
Expand Down Expand Up @@ -339,6 +342,7 @@ public void setUp() throws IOException, RestClientException {

securityContext = new KsqlSecurityContext(Optional.empty(), serviceContext);

when(commandRunner.getCommandQueue()).thenReturn(commandStore);
when(commandStore.createTransactionalProducer())
.thenReturn(transactionalProducer);

Expand Down Expand Up @@ -399,7 +403,7 @@ public void shouldThrowOnHandleStatementIfNotConfigured() {
// Given:
ksqlResource = new KsqlResource(
ksqlEngine,
commandStore,
commandRunner,
DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT,
activenessRegistrar,
(ec, sc) -> InjectorChain.of(
Expand Down Expand Up @@ -430,7 +434,7 @@ public void shouldThrowOnHandleTerminateIfNotConfigured() {
// Given:
ksqlResource = new KsqlResource(
ksqlEngine,
commandStore,
commandRunner,
DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT,
activenessRegistrar,
(ec, sc) -> InjectorChain.of(
Expand Down Expand Up @@ -2180,7 +2184,7 @@ private static void validateQueryDescription(
private void setUpKsqlResource() {
ksqlResource = new KsqlResource(
ksqlEngine,
commandStore,
commandRunner,
DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT,
activenessRegistrar,
(ec, sc) -> InjectorChain.of(
Expand Down Expand Up @@ -2224,7 +2228,7 @@ public void shouldThrowOnDenyListValidatorWhenHandleKsqlStatement() {
// Given:
ksqlResource = new KsqlResource(
ksqlEngine,
commandStore,
commandRunner,
DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT,
activenessRegistrar,
(ec, sc) -> InjectorChain.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@

public class DefaultErrorMessages implements ErrorMessages {

static String COMMAND_RUNNER_DEGRADED_ERROR_MESSAGE =
"The server has encountered an incompatible entry in its log "
+ "and cannot process further DDL statements."
+ System.lineSeparator()
+ "This is most likely due to the service being rolled back to an earlier version.";

@Override
public String kafkaAuthorizationErrorMessage(final Exception e) {
return ErrorMessageUtil.buildErrorMessage(e);
Expand All @@ -42,4 +48,9 @@ public String transactionInitTimeoutErrorMessage(final Exception e) {
public String schemaRegistryUnconfiguredErrorMessage(final Exception e) {
return ErrorMessageUtil.buildErrorMessage(e);
}

@Override
public String commandRunnerDegradedErrorMessage() {
return COMMAND_RUNNER_DEGRADED_ERROR_MESSAGE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ public interface ErrorMessages {
String transactionInitTimeoutErrorMessage(Exception e);

String schemaRegistryUnconfiguredErrorMessage(Exception e);

String commandRunnerDegradedErrorMessage();
}
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ public String transactionInitTimeoutErrorMessage(final Exception e) {
return errorMessages.transactionInitTimeoutErrorMessage(e);
}

public String commandRunnerDegradedErrorMessage() {
return errorMessages.commandRunnerDegradedErrorMessage();
}

public EndpointResponse generateResponse(
final Exception e,
final EndpointResponse defaultResponse
Expand Down

0 comments on commit 62b6d9a

Please sign in to comment.