Skip to content

Commit

Permalink
feat(kafka-runner): add a metric on kafka stream state
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Apr 29, 2022
1 parent c5e81db commit 3111f38
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 18 deletions.
16 changes: 9 additions & 7 deletions core/src/main/java/io/kestra/core/metrics/MetricRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ public class MetricRegistry {
public final static String METRIC_WORKER_ENDED_COUNT = "worker.ended.count";
public final static String METRIC_WORKER_ENDED_DURATION = "worker.ended.duration";

public final static String KESTRA_EXECUTOR_TASKRUN_NEXT_COUNT = "executor.taskrun.next.count";
public final static String KESTRA_EXECUTOR_TASKRUN_ENDED_COUNT = "executor.taskrun.ended.count";
public final static String KESTRA_EXECUTOR_TASKRUN_ENDED_DURATION = "executor.taskrun.ended.duration";
public final static String KESTRA_EXECUTOR_WORKERTASKRESULT_COUNT = "executor.workertaskresult.count";
public final static String KESTRA_EXECUTOR_EXECUTION_STARTED_COUNT = "executor.execution.started.count";
public final static String KESTRA_EXECUTOR_EXECUTION_END_COUNT = "executor.execution.end.count";
public final static String METRIC_EXECUTOR_EXECUTION_DURATION = "executor.execution.duration";
public final static String EXECUTOR_TASKRUN_NEXT_COUNT = "executor.taskrun.next.count";
public final static String EXECUTOR_TASKRUN_ENDED_COUNT = "executor.taskrun.ended.count";
public final static String EXECUTOR_TASKRUN_ENDED_DURATION = "executor.taskrun.ended.duration";
public final static String EXECUTOR_WORKERTASKRESULT_COUNT = "executor.workertaskresult.count";
public final static String EXECUTOR_EXECUTION_STARTED_COUNT = "executor.execution.started.count";
public final static String EXECUTOR_EXECUTION_END_COUNT = "executor.execution.end.count";
public final static String EXECUTOR_EXECUTION_DURATION = "executor.execution.duration";

public final static String METRIC_INDEXER_REQUEST_COUNT = "indexer.request.count";
public final static String METRIC_INDEXER_REQUEST_DURATION = "indexer.request.duration";
Expand All @@ -51,6 +51,8 @@ public class MetricRegistry {
public final static String SCHEDULER_EXECUTION_RUNNING_DURATION = "scheduler.execution.running.duration";
public final static String SCHEDULER_EXECUTION_MISSING_DURATION = "scheduler.execution.missing.duration";

public final static String STREAMS_STATE_COUNT = "stream.state.count";

public final static String TAG_TASK_TYPE = "task_type";
public final static String TAG_FLOW_ID = "flow_id";
public final static String TAG_NAMESPACE_ID = "namespace_id";
Expand Down
13 changes: 6 additions & 7 deletions core/src/main/java/io/kestra/core/runners/ExecutorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.DynamicTask;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
Expand Down Expand Up @@ -118,7 +117,7 @@ public Execution onNexts(Flow flow, Execution execution, List<TaskRun> nexts) {

if (execution.getState().getCurrent() == State.Type.CREATED) {
metricRegistry
.counter(MetricRegistry.KESTRA_EXECUTOR_EXECUTION_STARTED_COUNT, metricRegistry.tags(execution))
.counter(MetricRegistry.EXECUTOR_EXECUTION_STARTED_COUNT, metricRegistry.tags(execution))
.increment();

flow.logger().info(
Expand All @@ -132,7 +131,7 @@ public Execution onNexts(Flow flow, Execution execution, List<TaskRun> nexts) {
}

metricRegistry
.counter(MetricRegistry.KESTRA_EXECUTOR_TASKRUN_NEXT_COUNT, metricRegistry.tags(execution))
.counter(MetricRegistry.EXECUTOR_TASKRUN_NEXT_COUNT, metricRegistry.tags(execution))
.increment(nexts.size());

return newExecution;
Expand Down Expand Up @@ -201,7 +200,7 @@ private Optional<WorkerTaskResult> childWorkerTaskTypeToWorkerTask(
.peek(workerTaskResult -> {
metricRegistry
.counter(
MetricRegistry.KESTRA_EXECUTOR_WORKERTASKRESULT_COUNT,
MetricRegistry.EXECUTOR_WORKERTASKRESULT_COUNT,
metricRegistry.tags(workerTaskResult)
)
.increment();
Expand Down Expand Up @@ -296,11 +295,11 @@ private Executor onEnd(Executor executor) {
}

metricRegistry
.counter(MetricRegistry.KESTRA_EXECUTOR_EXECUTION_END_COUNT, metricRegistry.tags(newExecution))
.counter(MetricRegistry.EXECUTOR_EXECUTION_END_COUNT, metricRegistry.tags(newExecution))
.increment();

metricRegistry
.timer(MetricRegistry.METRIC_EXECUTOR_EXECUTION_DURATION, metricRegistry.tags(newExecution))
.timer(MetricRegistry.EXECUTOR_EXECUTION_DURATION, metricRegistry.tags(newExecution))
.record(newExecution.getState().getDuration());

return executor.withExecution(newExecution, "onEnd");
Expand Down Expand Up @@ -481,7 +480,7 @@ private Executor handleRestart(Executor executor) {
}

metricRegistry
.counter(MetricRegistry.KESTRA_EXECUTOR_EXECUTION_STARTED_COUNT, metricRegistry.tags(executor.getExecution()))
.counter(MetricRegistry.EXECUTOR_EXECUTION_STARTED_COUNT, metricRegistry.tags(executor.getExecution()))
.increment();

executor.getFlow().logger().info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.micronaut.context.annotation.Value;
import io.micronaut.context.event.ApplicationEventPublisher;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand Down Expand Up @@ -115,11 +116,23 @@ public KafkaStreamService.Stream of(Class<?> clientId, Class<?> groupId, Topolog

public static class Stream extends KafkaStreams {
private final Logger logger;

private final MetricRegistry meterRegistry;

private final String[] tags;

private KafkaStreamsMetrics metrics;

private boolean hasStarted = false;

private Stream(Topology topology, Properties props, MetricRegistry meterRegistry, Logger logger) {
super(topology, props);
this.meterRegistry = meterRegistry;

tags = new String[]{
"client_class_id",
(String) props.get(CommonClientConfigs.CLIENT_ID_CONFIG)
};

if (meterRegistry != null) {
metrics = new KafkaStreamsMetrics(
Expand Down Expand Up @@ -148,6 +161,18 @@ public synchronized void start(final KafkaStreams.StateListener listener) throws
this.setGlobalStateRestoreListener(new StateRestoreLoggerListeners(logger));

this.setStateListener((newState, oldState) -> {
meterRegistry.gauge(
MetricRegistry.STREAMS_STATE_COUNT,
0,
ArrayUtils.addAll(tags, "state", oldState.name())
);

meterRegistry.gauge(
MetricRegistry.STREAMS_STATE_COUNT,
1,
ArrayUtils.addAll(tags, "state", newState.name())
);

if (newState == State.RUNNING) {
this.hasStarted = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,14 @@ public Executor transform(final String key, final Executor value) {
if (workerTaskResult.getTaskRun().getState().isTerninated()) {
metricRegistry
.counter(
MetricRegistry.KESTRA_EXECUTOR_TASKRUN_ENDED_COUNT,
MetricRegistry.EXECUTOR_TASKRUN_ENDED_COUNT,
metricRegistry.tags(workerTaskResult)
)
.increment();

metricRegistry
.timer(
MetricRegistry.KESTRA_EXECUTOR_TASKRUN_ENDED_DURATION,
MetricRegistry.EXECUTOR_TASKRUN_ENDED_DURATION,
metricRegistry.tags(workerTaskResult)
)
.record(workerTaskResult.getTaskRun().getState().getDuration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,11 +299,11 @@ private void workerTaskResultQueue(WorkerTaskResult message) {
// send metrics on terminated
if (message.getTaskRun().getState().isTerninated()) {
metricRegistry
.counter(MetricRegistry.KESTRA_EXECUTOR_TASKRUN_ENDED_COUNT, metricRegistry.tags(message))
.counter(MetricRegistry.EXECUTOR_TASKRUN_ENDED_COUNT, metricRegistry.tags(message))
.increment();

metricRegistry
.timer(MetricRegistry.KESTRA_EXECUTOR_TASKRUN_ENDED_DURATION, metricRegistry.tags(message))
.timer(MetricRegistry.EXECUTOR_TASKRUN_ENDED_DURATION, metricRegistry.tags(message))
.record(message.getTaskRun().getState().getDuration());
}

Expand Down

0 comments on commit 3111f38

Please sign in to comment.