Skip to content

Commit

Permalink
feat: add metric for commandRunner status
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenpyzhang committed Dec 2, 2019
1 parent 9568634 commit 52bbdb1
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,8 @@ static KsqlRestApplication buildApplication(
commandStore,
maxStatementRetries,
new ClusterTerminator(ksqlEngine, serviceContext, managedTopics),
serverState
serverState,
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG)
);

final KsqlResource ksqlResource = new KsqlResource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,35 @@

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.internal.KsqlMetric;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.rest.entity.ClusterTerminateRequest;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.ClusterTerminator;
import io.confluent.ksql.rest.util.TerminateCluster;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.RetryUtil;
import java.io.Closeable;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -44,12 +57,17 @@
*/
public class CommandRunner implements Closeable {

private static final String DEFAULT_METRIC_GROUP_PREFIX = "ksql-rest-app";
private static final String METRIC_GROUP_POST_FIX = "-command-runner-status";
private static final String metricGroupName = DEFAULT_METRIC_GROUP_PREFIX + METRIC_GROUP_POST_FIX;

private static final Logger log = LoggerFactory.getLogger(CommandRunner.class);

private static final int STATEMENT_RETRY_MS = 100;
private static final int MAX_STATEMENT_RETRY_MS = 5 * 1000;
private static final Duration NEW_CMDS_TIMEOUT = Duration.ofMillis(MAX_STATEMENT_RETRY_MS);
private static final int SHUTDOWN_TIMEOUT_MS = 3 * MAX_STATEMENT_RETRY_MS;
private static final Duration COMMAND_RUNNER_HEALTH_TIMEOUT = Duration.ofMillis(15000);

private final InteractiveStatementExecutor statementExecutor;
private final CommandQueue commandStore;
Expand All @@ -59,20 +77,30 @@ public class CommandRunner implements Closeable {
private final ClusterTerminator clusterTerminator;
private final ServerState serverState;

private final List<Sensor> sensors;
private final Map<String, String> customMetricsTags;
private final Metrics metrics;
private final String ksqlServiceId;

private volatile Pair<QueuedCommand, Instant> currentCommand;

public CommandRunner(
final InteractiveStatementExecutor statementExecutor,
final CommandQueue commandStore,
final int maxRetries,
final ClusterTerminator clusterTerminator,
final ServerState serverState
final ServerState serverState,
final String ksqlServiceId
) {
this(
statementExecutor,
commandStore,
maxRetries,
clusterTerminator,
Executors.newSingleThreadExecutor(r -> new Thread(r, "CommandRunner")),
serverState
serverState,
MetricCollectors.getMetrics(),
ksqlServiceId
);
}

Expand All @@ -83,14 +111,22 @@ public CommandRunner(
final int maxRetries,
final ClusterTerminator clusterTerminator,
final ExecutorService executor,
final ServerState serverState
final ServerState serverState,
final Metrics metrics,
final String ksqlServiceId
) {
this.statementExecutor = Objects.requireNonNull(statementExecutor, "statementExecutor");
this.commandStore = Objects.requireNonNull(commandStore, "commandStore");
this.maxRetries = maxRetries;
this.clusterTerminator = Objects.requireNonNull(clusterTerminator, "clusterTerminator");
this.executor = Objects.requireNonNull(executor, "executor");
this.serverState = Objects.requireNonNull(serverState, "serverState");
this.customMetricsTags = Collections.emptyMap();
this.sensors = new ArrayList<>();
this.metrics = Objects.requireNonNull(metrics, "metrics");
this.currentCommand = null;
this.ksqlServiceId = KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX + ksqlServiceId;
configureCommandRunnerStatusMetric();
}

/**
Expand All @@ -114,6 +150,7 @@ public void close() {
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
sensors.forEach(sensor -> metrics.removeSensor(sensor.name()));
commandStore.close();
}

Expand All @@ -128,13 +165,17 @@ public void processPriorCommands() {
return;
}
restoreCommands.forEach(
command -> RetryUtil.retryWithBackoff(
maxRetries,
STATEMENT_RETRY_MS,
MAX_STATEMENT_RETRY_MS,
() -> statementExecutor.handleRestore(command),
WakeupException.class
)
command -> {
currentCommand = new Pair<>(command, Instant.now());
RetryUtil.retryWithBackoff(
maxRetries,
STATEMENT_RETRY_MS,
MAX_STATEMENT_RETRY_MS,
() -> statementExecutor.handleRestore(command),
WakeupException.class
);
currentCommand = null;
}
);
final KsqlEngine ksqlEngine = statementExecutor.getKsqlEngine();
ksqlEngine.getPersistentQueries().forEach(PersistentQueryMetadata::start);
Expand Down Expand Up @@ -169,7 +210,9 @@ private void executeStatement(final QueuedCommand queuedCommand) {
if (closed) {
log.info("Execution aborted as system is closing down");
} else {
currentCommand = new Pair<>(queuedCommand, Instant.now());
statementExecutor.handleStatement(queuedCommand);
currentCommand = null;
log.info("Executed statement: " + queuedCommand.getCommand().getStatement());
}
};
Expand Down Expand Up @@ -204,6 +247,56 @@ private void terminateCluster(final Command command) {
log.info("The KSQL server was terminated.");
}

public double checkCommandRunnerStatus() {
if (currentCommand == null) {
return 1;
}

return Duration.between(currentCommand.right, Instant.now()).toMillis()
< COMMAND_RUNNER_HEALTH_TIMEOUT.toMillis() ? 1 : 0;
}

private void configureCommandRunnerStatusMetric() {
final String metricName = "liveness-indicator";
final String description =
"A metric indicating the status of the commandRunner. "
+ "If value 1, the commandRunner is processing commands normally."
+ "If value 0, the commandRunner is stuck processing a command";
final Supplier<MeasurableStat> statSupplier =
() -> new MeasurableStat() {
@Override
public double measure(final MetricConfig metricConfig, final long l) {
return checkCommandRunnerStatus();
}

@Override
public void record(final MetricConfig metricConfig, final double v, final long l) {
}
};
createSensor(KsqlMetric.of(metricName, description, statSupplier));
}

private void createSensor(final KsqlMetric metric) {
final Sensor sensor = metrics.sensor(metricGroupName + "-" + metric.name());
configureMetric(sensor, metric);
sensors.add(sensor);
}

private void configureMetric(
final Sensor sensor,
final KsqlMetric metric
) {
sensor.add(
metrics.metricName(
metric.name(),
ksqlServiceId + metricGroupName,
metric.description(),
customMetricsTags
),
metric.statSupplier().get()
);
}

private class Runner implements Runnable {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.mockito.hamcrest.MockitoHamcrest.argThat;

import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.ClusterTerminator;
import io.confluent.ksql.rest.util.TerminateCluster;
Expand Down Expand Up @@ -72,6 +73,7 @@ public class CommandRunnerTest {

@Before
public void setup() {
MetricCollectors.initialize();
when(statementExecutor.getKsqlEngine()).thenReturn(ksqlEngine);

when(command.getStatement()).thenReturn("something that is not terminate");
Expand All @@ -90,7 +92,10 @@ public void setup() {
1,
clusterTerminator,
executor,
serverState);
serverState,
MetricCollectors.getMetrics(),
"ksql-service-id"
);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ private class KsqlServer {
fakeCommandQueue,
1,
mock(ClusterTerminator.class),
serverState
serverState,
"ksql-service-id"
);

this.ksqlResource = new KsqlResource(
Expand Down

0 comments on commit 52bbdb1

Please sign in to comment.