Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: store metrics in a dedicated repository #1047

Merged
merged 11 commits into from
Mar 20, 2023
3 changes: 3 additions & 0 deletions cli/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ kestra:
logs:
table: "logs"
cls: io.kestra.core.models.executions.LogEntry
metrics:
table: "metrics"
cls: io.kestra.core.models.executions.MetricEntry
multipleconditions:
table: "multipleconditions"
cls: io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import lombok.NoArgsConstructor;
import lombok.ToString;

import java.time.Instant;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -23,7 +24,7 @@
@JsonSubTypes.Type(value = Timer.class, name = "timer"),
})
@ToString
@EqualsAndHashCode
@EqualsAndHashCode(exclude="timestamp")
@Getter
@NoArgsConstructor
@Introspected
Expand All @@ -35,6 +36,8 @@ abstract public class AbstractMetricEntry<T> {

protected Map<String, String> tags;

protected Instant timestamp = Instant.now();

protected AbstractMetricEntry(@NotNull String name, String[] tags) {
this.name = name;
this.tags = tagsAsMap(tags);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package io.kestra.core.models.executions;

import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.executions.metrics.Timer;
import io.micronaut.core.annotation.Nullable;
import lombok.Builder;
import lombok.Value;

import java.time.Instant;
import java.util.Map;
import javax.validation.constraints.NotNull;

@Value
@Builder(toBuilder = true)
public class MetricEntry implements DeletedInterface {
@NotNull
String namespace;

@NotNull
String flowId;

@Nullable
String taskId;

@Nullable
String executionId;

@Nullable
String taskRunId;

@NotNull
String type;

@NotNull
String name;

@NotNull
Double value;

@NotNull
Instant timestamp;

@Nullable
Map<String, String> tags;

@NotNull
@Builder.Default
boolean deleted = false;

public static MetricEntry of(TaskRun taskRun, AbstractMetricEntry<?> metricEntry) {
return MetricEntry.builder()
.namespace(taskRun.getNamespace())
.flowId(taskRun.getFlowId())
.executionId(taskRun.getExecutionId())
.taskId(taskRun.getTaskId())
.taskRunId(taskRun.getId())
.type(metricEntry.getType())
.name(metricEntry.name)
.tags(metricEntry.getTags())
.value(computeValue(metricEntry))
.timestamp(metricEntry.getTimestamp())
.build();
}

private static Double computeValue(AbstractMetricEntry<?> metricEntry) {
if (metricEntry instanceof Counter) {
return ((Counter) metricEntry).getValue();
}

if (metricEntry instanceof Timer) {
return (double) ((Timer) metricEntry).getValue().toMillis();
}

throw new IllegalArgumentException("Unknown metric type: " + metricEntry.getClass());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import lombok.Builder;
import lombok.Value;
import lombok.With;
import io.kestra.core.models.flows.State;

import java.util.List;
Expand All @@ -12,23 +11,20 @@
@Value
@Builder
public class TaskRunAttempt {
@With
List<AbstractMetricEntry<?>> metrics;
/**
* @deprecated Should always be null, we need to keep it for backward compatibility or the deserialization of old attempt will no longer work.
*/
@Deprecated
public void setMetrics(List<AbstractMetricEntry<?>> metrics) {

}

@NotNull
State state;

public TaskRunAttempt withState(State.Type state) {
return new TaskRunAttempt(
this.metrics,
this.state.withState(state)
);
}

public Optional<AbstractMetricEntry<?>> findMetrics(String name) {
return this.metrics
.stream()
.filter(metricEntry -> metricEntry.getName().equals(name))
.findFirst();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.runners.*;
import io.kestra.core.models.flows.Flow;
Expand All @@ -16,11 +17,11 @@ public interface QueueFactoryInterface {
String FLOW_NAMED = "flowQueue";
String TEMPLATE_NAMED = "templateQueue";
String WORKERTASKLOG_NAMED = "workerTaskLogQueue";
String METRIC_QUEUE = "workerTaskMetricQueue";
String KILL_NAMED = "executionKilledQueue";
String WORKERINSTANCE_NAMED = "workerInstanceQueue";
String WORKERTASKRUNNING_NAMED = "workerTaskRuninngQueue";
String TRIGGER_NAMED = "triggerQueue";
String LOG_NAMED = "logQueue";

QueueInterface<Execution> execution();

Expand All @@ -32,6 +33,8 @@ public interface QueueFactoryInterface {

QueueInterface<LogEntry> logEntry();

QueueInterface<MetricEntry> metricEntry();

QueueInterface<Flow> flow();

QueueInterface<ExecutionKilled> kill();
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/io/kestra/core/queues/QueueService.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.topologies.FlowTopology;
Expand Down Expand Up @@ -49,6 +50,8 @@ public String key(Object object) {
return ((Executor) object).getExecution().getId();
} else if (object.getClass() == FlowTopology.class) {
return ((FlowTopology) object).uid();
} else if (object.getClass() == MetricEntry.class) {
return null;
} else {
throw new IllegalArgumentException("Unknown type '" + object.getClass().getName() + "'");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.kestra.core.repositories;

import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.MetricEntry;
import io.micronaut.data.model.Pageable;

import java.util.List;

public interface MetricRepositoryInterface extends SaveRepositoryInterface<MetricEntry> {
ArrayListTotal<MetricEntry> findByExecutionId(String id, Pageable pageable);

ArrayListTotal<MetricEntry> findByExecutionIdAndTaskId(String executionId, String taskId, Pageable pageable);

ArrayListTotal<MetricEntry> findByExecutionIdAndTaskRunId(String executionId, String taskRunId, Pageable pageable);

Integer purge(Execution execution);
}
11 changes: 11 additions & 0 deletions core/src/main/java/io/kestra/core/runners/Indexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.kestra.core.repositories.MetricRepositoryInterface;
import io.kestra.core.repositories.SaveRepositoryInterface;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.micronaut.context.annotation.Requires;
Expand All @@ -23,6 +25,9 @@ public class Indexer implements IndexerInterface {
private final QueueInterface<Execution> executionQueue;
private final LogRepositoryInterface logRepository;
private final QueueInterface<LogEntry> logQueue;

private final MetricRepositoryInterface metricRepository;
private final QueueInterface<MetricEntry> metricQueue;
private final MetricRegistry metricRegistry;

@Inject
Expand All @@ -31,19 +36,24 @@ public Indexer(
@Named(QueueFactoryInterface.EXECUTION_NAMED) QueueInterface<Execution> executionQueue,
LogRepositoryInterface logRepository,
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED) QueueInterface<LogEntry> logQueue,
MetricRepositoryInterface metricRepositor,
@Named(QueueFactoryInterface.METRIC_QUEUE) QueueInterface<MetricEntry> metricQueue,
MetricRegistry metricRegistry
) {
this.executionRepository = executionRepository;
this.executionQueue = executionQueue;
this.logRepository = logRepository;
this.logQueue = logQueue;
this.metricRepository = metricRepositor;
this.metricQueue = metricQueue;
this.metricRegistry = metricRegistry;
}

@Override
public void run() {
this.send(executionQueue, executionRepository);
this.send(logQueue, logRepository);
this.send(metricQueue, metricRepository);
}

protected <T> void send(QueueInterface<T> queueInterface, SaveRepositoryInterface<T> saveRepositoryInterface) {
Expand All @@ -62,5 +72,6 @@ protected <T> void send(QueueInterface<T> queueInterface, SaveRepositoryInterfac
public void close() throws IOException {
this.executionQueue.close();
this.logQueue.close();
this.metricQueue.close();
}
}
4 changes: 0 additions & 4 deletions core/src/main/java/io/kestra/core/runners/RunContext.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.kestra.core.runners;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CaseFormat;
import com.google.common.collect.ImmutableMap;
Expand All @@ -17,7 +16,6 @@
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.Slugify;
Expand All @@ -36,8 +34,6 @@

@NoArgsConstructor
public class RunContext {
private final static ObjectMapper MAPPER = JacksonMapper.ofJson();

// Injected
private ApplicationContext applicationContext;
private VariableRenderer variableRenderer;
Expand Down
19 changes: 14 additions & 5 deletions core/src/main/java/io/kestra/core/runners/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.hash.Hashing;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.tasks.Task;
import io.micronaut.context.ApplicationContext;
Expand Down Expand Up @@ -52,6 +53,7 @@ public class Worker implements Runnable, Closeable {
private final WorkerTaskQueueInterface workerTaskQueue;
private final QueueInterface<WorkerTaskResult> workerTaskResultQueue;
private final QueueInterface<ExecutionKilled> executionKilledQueue;
private final QueueInterface<MetricEntry> metricEntryQueue;
private final MetricRegistry metricRegistry;

private final Set<String> killedExecution = ConcurrentHashMap.newKeySet();
Expand All @@ -75,6 +77,10 @@ public Worker(ApplicationContext applicationContext, int thread) {
QueueInterface.class,
Qualifiers.byName(QueueFactoryInterface.KILL_NAMED)
);
this.metricEntryQueue = (QueueInterface<MetricEntry>) applicationContext.getBean(
QueueInterface.class,
Qualifiers.byName(QueueFactoryInterface.METRIC_QUEUE)
);
this.metricRegistry = applicationContext.getBean(MetricRegistry.class);

ExecutorsUtils executorsUtils = applicationContext.getBean(ExecutorsUtils.class);
Expand All @@ -87,9 +93,7 @@ public void run() {
if (executionKilled != null) {
// @FIXME: the hashset will never expire killed execution
killedExecution.add(executionKilled.getExecutionId());
}

if (executionKilled != null) {
synchronized (this) {
workerThreadReferences
.stream()
Expand Down Expand Up @@ -189,6 +193,8 @@ private WorkerTaskResult run(WorkerTask workerTask, Boolean cleanUp) throws Queu

this.logTerminated(workerTask);

//FIXME should we remove it from the killedExecution set ?

return workerTaskResult;
}

Expand Down Expand Up @@ -350,19 +356,21 @@ private WorkerTask runAttempt(WorkerTask workerTask) {

// attempt
TaskRunAttempt taskRunAttempt = builder
.metrics(runContext.metrics())
.build()
.withState(state);

// logs
if (workerThread.getTaskOutput() != null) {
if (workerThread.getTaskOutput() != null && log.isDebugEnabled()) {
log.debug("Outputs\n{}", JacksonMapper.log(workerThread.getTaskOutput()));
}

if (runContext.metrics().size() > 0) {
if (runContext.metrics().size() > 0 && log.isTraceEnabled()) {
log.trace("Metrics\n{}", JacksonMapper.log(runContext.metrics()));
}

// metrics
runContext.metrics().forEach(metric -> this.metricEntryQueue.emit(MetricEntry.of(workerTask.getTaskRun(), metric)));

// save outputs
List<TaskRunAttempt> attempts = this.addAttempt(workerTask, taskRunAttempt);

Expand Down Expand Up @@ -448,6 +456,7 @@ public void close() throws IOException {
workerTaskQueue.close();
executionKilledQueue.close();
workerTaskResultQueue.close();
metricEntryQueue.close();
}

@Getter
Expand Down
Loading