From 11e943f5fe0ac6b2b754fd9d873cedf960d867b0 Mon Sep 17 00:00:00 2001 From: Steven Zhang Date: Wed, 4 Dec 2019 11:22:06 -0800 Subject: [PATCH] separate metric into separate class and added tests --- .../ksql/rest/server/KsqlRestApplication.java | 3 +- .../ksql/rest/server/KsqlRestConfig.java | 14 +++ .../server/computation/CommandRunner.java | 30 ++--- .../CommandRunnerStatusMetricTest.java | 108 ++++++++++++++++++ .../server/computation/CommandRunnerTest.java | 81 ++++++++++++- .../rest/server/computation/RecoveryTest.java | 3 +- 6 files changed, 223 insertions(+), 16 deletions(-) create mode 100644 ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerStatusMetricTest.java diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 9318eead1ba2..65ee167f7dac 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -520,7 +520,8 @@ static KsqlRestApplication buildApplication( maxStatementRetries, new ClusterTerminator(ksqlEngine, serviceContext, managedTopics), serverState, - ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG) + ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG), + Duration.ofMillis(restConfig.getLong(KsqlRestConfig.KSQL_COMMAND_RUNNER_HEALTH_CHECK_MS)) ); final KsqlResource ksqlResource = new KsqlResource( diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java index e98aad32d6e9..c49e6b5a046b 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java @@ -77,6 +77,13 @@ public class KsqlRestConfig extends RestConfig { "Minimum time between consecutive health check evaluations. Health check queries before " + "the interval has elapsed will receive cached responses."; + static final String KSQL_COMMAND_RUNNER_HEALTH_CHECK_MS = + KSQL_CONFIG_PREFIX + "server.command.runner.healthcheck.ms"; + + private static final String KSQL_COMMAND_RUNNER_HEALTH_CHECK_MS_DOC = + "How long to wait for the command runner to process a command from the command topic " + + "before reporting an error metric"; + private static final ConfigDef CONFIG_DEF; static { @@ -122,6 +129,13 @@ public class KsqlRestConfig extends RestConfig { 5000L, Importance.LOW, KSQL_HEALTHCHECK_INTERVAL_MS_DOC + ) + .define( + KSQL_COMMAND_RUNNER_HEALTH_CHECK_MS, + Type.LONG, + 15000L, + Importance.LOW, + KSQL_COMMAND_RUNNER_HEALTH_CHECK_MS_DOC ); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java index 6e6df4f79d60..ee01cb945a16 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java @@ -35,7 +35,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; - import org.apache.kafka.common.errors.WakeupException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,18 +46,12 @@ * Also responsible for taking care of any exceptions that occur in the process. */ public class CommandRunner implements Closeable { - - private static final String DEFAULT_METRIC_GROUP_PREFIX = "ksql-rest-app"; - private static final String METRIC_GROUP_POST_FIX = "-command-runner-status"; - private static final String metricGroupName = DEFAULT_METRIC_GROUP_PREFIX + METRIC_GROUP_POST_FIX; - private static final Logger log = LoggerFactory.getLogger(CommandRunner.class); private static final int STATEMENT_RETRY_MS = 100; private static final int MAX_STATEMENT_RETRY_MS = 5 * 1000; private static final Duration NEW_CMDS_TIMEOUT = Duration.ofMillis(MAX_STATEMENT_RETRY_MS); private static final int SHUTDOWN_TIMEOUT_MS = 3 * MAX_STATEMENT_RETRY_MS; - private static final Duration COMMAND_RUNNER_HEALTH_TIMEOUT = Duration.ofMillis(15000); private final InteractiveStatementExecutor statementExecutor; private final CommandQueue commandStore; @@ -67,10 +60,12 @@ public class CommandRunner implements Closeable { private final int maxRetries; private final ClusterTerminator clusterTerminator; private final ServerState serverState; + private final CommandRunnerStatusMetric commandRunnerStatusMetric; private final AtomicReference> currentCommandRef; + private final Duration commandRunnerHealthTimeout; - protected enum CommandRunnerStatus { + public enum CommandRunnerStatus { RUNNING, ERROR } @@ -81,7 +76,8 @@ public CommandRunner( final int maxRetries, final ClusterTerminator clusterTerminator, final ServerState serverState, - final String ksqlServiceId + final String ksqlServiceId, + final Duration commandRunnerHealthTimeout ) { this( statementExecutor, @@ -90,7 +86,8 @@ public CommandRunner( clusterTerminator, Executors.newSingleThreadExecutor(r -> new Thread(r, "CommandRunner")), serverState, - ksqlServiceId + ksqlServiceId, + commandRunnerHealthTimeout ); } @@ -102,7 +99,8 @@ public CommandRunner( final ClusterTerminator clusterTerminator, final ExecutorService executor, final ServerState serverState, - final String ksqlServiceId + final String ksqlServiceId, + final Duration commandRunnerHealthTimeout ) { this.statementExecutor = Objects.requireNonNull(statementExecutor, "statementExecutor"); this.commandStore = Objects.requireNonNull(commandStore, "commandStore"); @@ -110,6 +108,8 @@ public CommandRunner( this.clusterTerminator = Objects.requireNonNull(clusterTerminator, "clusterTerminator"); this.executor = Objects.requireNonNull(executor, "executor"); this.serverState = Objects.requireNonNull(serverState, "serverState"); + this.commandRunnerHealthTimeout = + Objects.requireNonNull(commandRunnerHealthTimeout, "commandRunnerHealthTimeout"); this.currentCommandRef = new AtomicReference<>(null); this.commandRunnerStatusMetric = new CommandRunnerStatusMetric(ksqlServiceId, this); } @@ -232,16 +232,20 @@ private void terminateCluster(final Command command) { log.info("The KSQL server was terminated."); } - protected CommandRunnerStatus checkCommandRunnerStatus() { + CommandRunnerStatus checkCommandRunnerStatus() { final Pair currentCommand = currentCommandRef.get(); if (currentCommand == null) { return CommandRunnerStatus.RUNNING; } return Duration.between(currentCommand.right, Instant.now()).toMillis() - < COMMAND_RUNNER_HEALTH_TIMEOUT.toMillis() + < commandRunnerHealthTimeout.toMillis() ? CommandRunnerStatus.RUNNING : CommandRunnerStatus.ERROR; } + + QueuedCommand getCurrentCommand() { + return currentCommandRef.get().left; + } private class Runner implements Runnable { diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerStatusMetricTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerStatusMetricTest.java new file mode 100644 index 000000000000..4f5763101633 --- /dev/null +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerStatusMetricTest.java @@ -0,0 +1,108 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server.computation; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableMap; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.Metrics; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.Collections; + +@RunWith(MockitoJUnitRunner.class) +public class CommandRunnerStatusMetricTest { + + private static final MetricName METRIC_NAME = + new MetricName("bob", "g1", "d1", ImmutableMap.of()); + private static final String KSQL_SERVICE_ID = "kcql-1"; + + @Mock + private Metrics metrics; + @Mock + private CommandRunner commandRunner; + @Captor + private ArgumentCaptor> gaugeCaptor; + + private CommandRunnerStatusMetric commandRunnerStatusMetric; + + @Before + public void setUp() { + when(metrics.metricName(any(), any(), any(), anyMap())).thenReturn(METRIC_NAME); + when(commandRunner.checkCommandRunnerStatus()).thenReturn(CommandRunner.CommandRunnerStatus.RUNNING); + + commandRunnerStatusMetric = new CommandRunnerStatusMetric(metrics, commandRunner, KSQL_SERVICE_ID); + } + + @Test + public void shouldAddMetricOnCreation() { + // When: + // Listener created in setup + + // Then: + verify(metrics).metricName("status", "_confluent-ksql-kcql-1ksql-rest-app-command-runner", + "The status of the commandRunner thread as it processes the command topic.", + Collections.emptyMap()); + + verify(metrics).addMetric(eq(METRIC_NAME), isA(Gauge.class)); + } + + @Test + public void shouldInitiallyBeRunningState() { + // When: + // CommandRunnerStatusMetric created in setup + + // Then: + assertThat(currentGaugeValue(), is(CommandRunner.CommandRunnerStatus.RUNNING.name())); + } + + @Test + public void shouldUpdateToErrorState() { + // When: + when(commandRunner.checkCommandRunnerStatus()).thenReturn(CommandRunner.CommandRunnerStatus.ERROR); + + // Then: + assertThat(currentGaugeValue(), is(CommandRunner.CommandRunnerStatus.ERROR.name())); + } + + @Test + public void shouldRemoveMetricOnClose() { + // When: + commandRunnerStatusMetric.close(); + + // Then: + verify(metrics).removeMetric(METRIC_NAME); + } + + private String currentGaugeValue() { + verify(metrics).addMetric(any(), gaugeCaptor.capture()); + return gaugeCaptor.getValue().value(null, 0L); + } +} diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java index 51086a2fc8dc..03d1c40184a9 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java @@ -15,7 +15,9 @@ package io.confluent.ksql.rest.server.computation; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; +import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyLong; @@ -35,7 +37,13 @@ import io.confluent.ksql.rest.util.TerminateCluster; import java.time.Duration; import java.util.Arrays; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; + import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -46,6 +54,7 @@ @RunWith(MockitoJUnitRunner.class) public class CommandRunnerTest { + private static long COMMAND_RUNNER_HEALTH_TIMEOUT = 2000; @Mock private InteractiveStatementExecutor statementExecutor; @@ -93,7 +102,8 @@ public void setup() { clusterTerminator, executor, serverState, - "ksql-service-id" + "ksql-service-id", + Duration.ofMillis(COMMAND_RUNNER_HEALTH_TIMEOUT) ); } @@ -171,6 +181,75 @@ public void shouldEarlyOutIfNewCommandsContainsTerminate() { verify(statementExecutor, never()).handleRestore(queuedCommand3); } + @Test + public void shouldReportRunningIfNotStuckProcessingCommand() throws BrokenBarrierException, InterruptedException, ExecutionException { + try { + checkCommandRunnerStatus( + COMMAND_RUNNER_HEALTH_TIMEOUT - 500, + COMMAND_RUNNER_HEALTH_TIMEOUT - 1000, + CommandRunner.CommandRunnerStatus.RUNNING + ); + } catch (Exception e) { + // fail test if an exception happens + assertThat(true, equalTo(false)); + } + } + + @Test + public void shouldReportErrorIfStuckProcessingCommand() throws BrokenBarrierException, InterruptedException, ExecutionException { + try { + checkCommandRunnerStatus( + COMMAND_RUNNER_HEALTH_TIMEOUT + 1000, + COMMAND_RUNNER_HEALTH_TIMEOUT + 500, + CommandRunner.CommandRunnerStatus.ERROR + ); + } catch (Exception e) { + // fail test if an exception happens + assertThat(true, equalTo(false)); + } + } + + private void checkCommandRunnerStatus( + long commandProcessingTimeMs, + long timeToCheckMetricMs, + CommandRunner.CommandRunnerStatus expectedStatus + ) throws BrokenBarrierException, InterruptedException, ExecutionException { + // Given: + givenQueuedCommands(queuedCommand1); + doAnswer((Answer) invocation -> { + Thread.sleep(commandProcessingTimeMs); + return null; + }).when(statementExecutor).handleStatement(queuedCommand1); + + // When: + final CyclicBarrier gate = new CyclicBarrier(3); + AtomicReference expectedException = new AtomicReference<>(null); + (new Thread(() -> { + try { + gate.await(); + commandRunner.fetchAndRunCommands(); + } catch (Exception e) { + expectedException.set(e); + } + })).start(); + + CompletableFuture statusFuture = new CompletableFuture<>(); + (new Thread(() -> { + try { + gate.await(); + Thread.sleep(timeToCheckMetricMs); + statusFuture.complete(commandRunner.checkCommandRunnerStatus()); + } catch (Exception e) { + expectedException.set(e); + } + })).start(); + + // Then: + gate.await(); + assertThat(statusFuture.get(), equalTo(expectedStatus)); + assertThat(expectedException.get(), equalTo(null)); + } + @Test public void shouldEarlyOutOnShutdown() { // Given: diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index 5cc9755bfc7c..6af8e76fe096 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -208,7 +208,8 @@ private class KsqlServer { 1, mock(ClusterTerminator.class), serverState, - "ksql-service-id" + "ksql-service-id", + Duration.ofMillis(2000) ); this.ksqlResource = new KsqlResource(