From 52bbdb1d85cf9b5c47dc9f29e8452466f9dcbb35 Mon Sep 17 00:00:00 2001 From: Steven Zhang Date: Mon, 2 Dec 2019 09:54:50 -0800 Subject: [PATCH] feat: add metric for commandRunner status --- .../ksql/rest/server/KsqlRestApplication.java | 3 +- .../server/computation/CommandRunner.java | 113 ++++++++++++++++-- .../server/computation/CommandRunnerTest.java | 7 +- .../rest/server/computation/RecoveryTest.java | 3 +- 4 files changed, 113 insertions(+), 13 deletions(-) diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 658821aa422d..9318eead1ba2 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -519,7 +519,8 @@ static KsqlRestApplication buildApplication( commandStore, maxStatementRetries, new ClusterTerminator(ksqlEngine, serviceContext, managedTopics), - serverState + serverState, + ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG) ); final KsqlResource ksqlResource = new KsqlResource( diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java index 046a3e994d78..b32c365f3855 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java @@ -17,22 +17,35 @@ import com.google.common.annotations.VisibleForTesting; import io.confluent.ksql.engine.KsqlEngine; +import io.confluent.ksql.internal.KsqlMetric; +import io.confluent.ksql.metrics.MetricCollectors; import io.confluent.ksql.rest.entity.ClusterTerminateRequest; import io.confluent.ksql.rest.server.state.ServerState; import io.confluent.ksql.rest.util.ClusterTerminator; import io.confluent.ksql.rest.util.TerminateCluster; +import io.confluent.ksql.util.KsqlConstants; +import io.confluent.ksql.util.Pair; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.RetryUtil; import java.io.Closeable; import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.metrics.MeasurableStat; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,12 +57,17 @@ */ public class CommandRunner implements Closeable { + private static final String DEFAULT_METRIC_GROUP_PREFIX = "ksql-rest-app"; + private static final String METRIC_GROUP_POST_FIX = "-command-runner-status"; + private static final String metricGroupName = DEFAULT_METRIC_GROUP_PREFIX + METRIC_GROUP_POST_FIX; + private static final Logger log = LoggerFactory.getLogger(CommandRunner.class); private static final int STATEMENT_RETRY_MS = 100; private static final int MAX_STATEMENT_RETRY_MS = 5 * 1000; private static final Duration NEW_CMDS_TIMEOUT = Duration.ofMillis(MAX_STATEMENT_RETRY_MS); private static final int SHUTDOWN_TIMEOUT_MS = 3 * MAX_STATEMENT_RETRY_MS; + private static final Duration COMMAND_RUNNER_HEALTH_TIMEOUT = Duration.ofMillis(15000); private final InteractiveStatementExecutor statementExecutor; private final CommandQueue commandStore; @@ -59,12 +77,20 @@ public class CommandRunner implements Closeable { private final ClusterTerminator clusterTerminator; private final ServerState serverState; + private final List sensors; + private final Map customMetricsTags; + private final Metrics metrics; + private final String ksqlServiceId; + + private volatile Pair currentCommand; + public CommandRunner( final InteractiveStatementExecutor statementExecutor, final CommandQueue commandStore, final int maxRetries, final ClusterTerminator clusterTerminator, - final ServerState serverState + final ServerState serverState, + final String ksqlServiceId ) { this( statementExecutor, @@ -72,7 +98,9 @@ public CommandRunner( maxRetries, clusterTerminator, Executors.newSingleThreadExecutor(r -> new Thread(r, "CommandRunner")), - serverState + serverState, + MetricCollectors.getMetrics(), + ksqlServiceId ); } @@ -83,7 +111,9 @@ public CommandRunner( final int maxRetries, final ClusterTerminator clusterTerminator, final ExecutorService executor, - final ServerState serverState + final ServerState serverState, + final Metrics metrics, + final String ksqlServiceId ) { this.statementExecutor = Objects.requireNonNull(statementExecutor, "statementExecutor"); this.commandStore = Objects.requireNonNull(commandStore, "commandStore"); @@ -91,6 +121,12 @@ public CommandRunner( this.clusterTerminator = Objects.requireNonNull(clusterTerminator, "clusterTerminator"); this.executor = Objects.requireNonNull(executor, "executor"); this.serverState = Objects.requireNonNull(serverState, "serverState"); + this.customMetricsTags = Collections.emptyMap(); + this.sensors = new ArrayList<>(); + this.metrics = Objects.requireNonNull(metrics, "metrics"); + this.currentCommand = null; + this.ksqlServiceId = KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX + ksqlServiceId; + configureCommandRunnerStatusMetric(); } /** @@ -114,6 +150,7 @@ public void close() { } catch (final InterruptedException e) { Thread.currentThread().interrupt(); } + sensors.forEach(sensor -> metrics.removeSensor(sensor.name())); commandStore.close(); } @@ -128,13 +165,17 @@ public void processPriorCommands() { return; } restoreCommands.forEach( - command -> RetryUtil.retryWithBackoff( - maxRetries, - STATEMENT_RETRY_MS, - MAX_STATEMENT_RETRY_MS, - () -> statementExecutor.handleRestore(command), - WakeupException.class - ) + command -> { + currentCommand = new Pair<>(command, Instant.now()); + RetryUtil.retryWithBackoff( + maxRetries, + STATEMENT_RETRY_MS, + MAX_STATEMENT_RETRY_MS, + () -> statementExecutor.handleRestore(command), + WakeupException.class + ); + currentCommand = null; + } ); final KsqlEngine ksqlEngine = statementExecutor.getKsqlEngine(); ksqlEngine.getPersistentQueries().forEach(PersistentQueryMetadata::start); @@ -169,7 +210,9 @@ private void executeStatement(final QueuedCommand queuedCommand) { if (closed) { log.info("Execution aborted as system is closing down"); } else { + currentCommand = new Pair<>(queuedCommand, Instant.now()); statementExecutor.handleStatement(queuedCommand); + currentCommand = null; log.info("Executed statement: " + queuedCommand.getCommand().getStatement()); } }; @@ -204,6 +247,56 @@ private void terminateCluster(final Command command) { log.info("The KSQL server was terminated."); } + public double checkCommandRunnerStatus() { + if (currentCommand == null) { + return 1; + } + + return Duration.between(currentCommand.right, Instant.now()).toMillis() + < COMMAND_RUNNER_HEALTH_TIMEOUT.toMillis() ? 1 : 0; + } + + private void configureCommandRunnerStatusMetric() { + final String metricName = "liveness-indicator"; + final String description = + "A metric indicating the status of the commandRunner. " + + "If value 1, the commandRunner is processing commands normally." + + "If value 0, the commandRunner is stuck processing a command"; + final Supplier statSupplier = + () -> new MeasurableStat() { + @Override + public double measure(final MetricConfig metricConfig, final long l) { + return checkCommandRunnerStatus(); + } + + @Override + public void record(final MetricConfig metricConfig, final double v, final long l) { + } + }; + createSensor(KsqlMetric.of(metricName, description, statSupplier)); + } + + private void createSensor(final KsqlMetric metric) { + final Sensor sensor = metrics.sensor(metricGroupName + "-" + metric.name()); + configureMetric(sensor, metric); + sensors.add(sensor); + } + + private void configureMetric( + final Sensor sensor, + final KsqlMetric metric + ) { + sensor.add( + metrics.metricName( + metric.name(), + ksqlServiceId + metricGroupName, + metric.description(), + customMetricsTags + ), + metric.statSupplier().get() + ); + } + private class Runner implements Runnable { @Override diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java index 5aa9515cf176..3cd4737912e1 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java @@ -29,6 +29,7 @@ import static org.mockito.hamcrest.MockitoHamcrest.argThat; import io.confluent.ksql.engine.KsqlEngine; +import io.confluent.ksql.metrics.MetricCollectors; import io.confluent.ksql.rest.server.state.ServerState; import io.confluent.ksql.rest.util.ClusterTerminator; import io.confluent.ksql.rest.util.TerminateCluster; @@ -72,6 +73,7 @@ public class CommandRunnerTest { @Before public void setup() { + MetricCollectors.initialize(); when(statementExecutor.getKsqlEngine()).thenReturn(ksqlEngine); when(command.getStatement()).thenReturn("something that is not terminate"); @@ -90,7 +92,10 @@ public void setup() { 1, clusterTerminator, executor, - serverState); + serverState, + MetricCollectors.getMetrics(), + "ksql-service-id" + ); } @Test diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index 61c93043d59a..5cc9755bfc7c 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -207,7 +207,8 @@ private class KsqlServer { fakeCommandQueue, 1, mock(ClusterTerminator.class), - serverState + serverState, + "ksql-service-id" ); this.ksqlResource = new KsqlResource(