From 570374eaa93c610fae26e6395fc964c294086b80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Mathieu?= Date: Mon, 20 Mar 2023 17:01:31 +0100 Subject: [PATCH] feat(core): store metrics in a dedicated repository (#1047) close #969 --- cli/src/main/resources/application.yml | 3 + .../executions/AbstractMetricEntry.java | 5 +- .../core/models/executions/MetricEntry.java | 77 +++++++++ .../models/executions/TaskRunAttempt.java | 20 +-- .../core/queues/QueueFactoryInterface.java | 5 +- .../io/kestra/core/queues/QueueService.java | 3 + .../MetricRepositoryInterface.java | 17 ++ .../java/io/kestra/core/runners/Indexer.java | 11 ++ .../io/kestra/core/runners/RunContext.java | 4 - .../java/io/kestra/core/runners/Worker.java | 19 ++- .../core/services/ExecutionService.java | 9 ++ .../io/kestra/core/tasks/storages/Purge.java | 8 + .../AbstractExecutionServiceTest.java | 2 + .../AbstractMetricRepositoryTest.java | 61 +++++++ .../core/repositories/ExecutionFixture.java | 2 - .../kestra/core/runners/RunContextTest.java | 18 --- .../repository/h2/H2MetricRepository.java | 17 ++ .../io/kestra/runner/h2/H2QueueFactory.java | 8 + .../resources/migrations/h2/V8__metrics.sql | 32 ++++ .../repository/h2/H2MetricRepositoryTest.java | 6 + jdbc-h2/src/test/resources/application.yml | 3 + .../mysql/MysqlMetricRepository.java | 17 ++ .../runner/mysql/MysqlQueueFactory.java | 8 + .../migrations/mysql/V8__metrics.sql | 31 ++++ .../mysql/MysqlMetricRepositoryTest.java | 6 + jdbc-mysql/src/test/resources/application.yml | 3 + .../postgres/PostgresMetricRepository.java | 17 ++ .../runner/postgres/PostgresQueueFactory.java | 8 + .../migrations/postgres/V8__metrics.sql | 19 +++ .../PostgresMetricRepositoryTest.java | 6 + .../src/test/resources/application.yml | 3 + .../AbstractJdbcMetricRepository.java | 100 ++++++++++++ .../AbstractJdbcMetricRepositoryTest.java | 17 ++ .../memory/MemoryMetricRepository.java | 45 ++++++ .../runner/memory/MemoryQueueFactory.java | 8 + .../components/executions/ExecutionMetric.vue | 151 ++++++++++++++++++ .../components/executions/ExecutionRoot.vue | 6 + ui/src/components/executions/Metrics.vue | 30 +++- ui/src/components/logs/LogList.vue | 2 +- ui/src/stores/executions.js | 30 +++- ui/src/translations.json | 2 + .../controllers/MetricController.java | 55 +++++++ .../controllers/MetricControllerTest.java | 53 ++++++ 43 files changed, 888 insertions(+), 59 deletions(-) create mode 100644 core/src/main/java/io/kestra/core/models/executions/MetricEntry.java create mode 100644 core/src/main/java/io/kestra/core/repositories/MetricRepositoryInterface.java create mode 100644 core/src/test/java/io/kestra/core/repositories/AbstractMetricRepositoryTest.java create mode 100644 jdbc-h2/src/main/java/io/kestra/repository/h2/H2MetricRepository.java create mode 100644 jdbc-h2/src/main/resources/migrations/h2/V8__metrics.sql create mode 100644 jdbc-h2/src/test/java/io/kestra/repository/h2/H2MetricRepositoryTest.java create mode 100644 jdbc-mysql/src/main/java/io/kestra/repository/mysql/MysqlMetricRepository.java create mode 100644 jdbc-mysql/src/main/resources/migrations/mysql/V8__metrics.sql create mode 100644 jdbc-mysql/src/test/java/io/kestra/repository/mysql/MysqlMetricRepositoryTest.java create mode 100644 jdbc-postgres/src/main/java/io/kestra/repository/postgres/PostgresMetricRepository.java create mode 100644 jdbc-postgres/src/main/resources/migrations/postgres/V8__metrics.sql create mode 100644 jdbc-postgres/src/test/java/io/kestra/repository/postgres/PostgresMetricRepositoryTest.java create mode 100644 jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcMetricRepository.java create mode 100644 jdbc/src/test/java/io/kestra/jdbc/repository/AbstractJdbcMetricRepositoryTest.java create mode 100644 repository-memory/src/main/java/io/kestra/repository/memory/MemoryMetricRepository.java create mode 100644 ui/src/components/executions/ExecutionMetric.vue create mode 100644 webserver/src/main/java/io/kestra/webserver/controllers/MetricController.java create mode 100644 webserver/src/test/java/io/kestra/webserver/controllers/MetricControllerTest.java diff --git a/cli/src/main/resources/application.yml b/cli/src/main/resources/application.yml index 9025df54e4f..32f35670cdd 100644 --- a/cli/src/main/resources/application.yml +++ b/cli/src/main/resources/application.yml @@ -87,6 +87,9 @@ kestra: logs: table: "logs" cls: io.kestra.core.models.executions.LogEntry + metrics: + table: "metrics" + cls: io.kestra.core.models.executions.MetricEntry multipleconditions: table: "multipleconditions" cls: io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow diff --git a/core/src/main/java/io/kestra/core/models/executions/AbstractMetricEntry.java b/core/src/main/java/io/kestra/core/models/executions/AbstractMetricEntry.java index 6e0c2f6350c..c7647246b9d 100644 --- a/core/src/main/java/io/kestra/core/models/executions/AbstractMetricEntry.java +++ b/core/src/main/java/io/kestra/core/models/executions/AbstractMetricEntry.java @@ -12,6 +12,7 @@ import lombok.NoArgsConstructor; import lombok.ToString; +import java.time.Instant; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -23,7 +24,7 @@ @JsonSubTypes.Type(value = Timer.class, name = "timer"), }) @ToString -@EqualsAndHashCode +@EqualsAndHashCode(exclude="timestamp") @Getter @NoArgsConstructor @Introspected @@ -35,6 +36,8 @@ abstract public class AbstractMetricEntry { protected Map tags; + protected Instant timestamp = Instant.now(); + protected AbstractMetricEntry(@NotNull String name, String[] tags) { this.name = name; this.tags = tagsAsMap(tags); diff --git a/core/src/main/java/io/kestra/core/models/executions/MetricEntry.java b/core/src/main/java/io/kestra/core/models/executions/MetricEntry.java new file mode 100644 index 00000000000..74cd45dbdfc --- /dev/null +++ b/core/src/main/java/io/kestra/core/models/executions/MetricEntry.java @@ -0,0 +1,77 @@ +package io.kestra.core.models.executions; + +import io.kestra.core.models.DeletedInterface; +import io.kestra.core.models.executions.metrics.Counter; +import io.kestra.core.models.executions.metrics.Timer; +import io.micronaut.core.annotation.Nullable; +import lombok.Builder; +import lombok.Value; + +import java.time.Instant; +import java.util.Map; +import javax.validation.constraints.NotNull; + +@Value +@Builder(toBuilder = true) +public class MetricEntry implements DeletedInterface { + @NotNull + String namespace; + + @NotNull + String flowId; + + @Nullable + String taskId; + + @Nullable + String executionId; + + @Nullable + String taskRunId; + + @NotNull + String type; + + @NotNull + String name; + + @NotNull + Double value; + + @NotNull + Instant timestamp; + + @Nullable + Map tags; + + @NotNull + @Builder.Default + boolean deleted = false; + + public static MetricEntry of(TaskRun taskRun, AbstractMetricEntry metricEntry) { + return MetricEntry.builder() + .namespace(taskRun.getNamespace()) + .flowId(taskRun.getFlowId()) + .executionId(taskRun.getExecutionId()) + .taskId(taskRun.getTaskId()) + .taskRunId(taskRun.getId()) + .type(metricEntry.getType()) + .name(metricEntry.name) + .tags(metricEntry.getTags()) + .value(computeValue(metricEntry)) + .timestamp(metricEntry.getTimestamp()) + .build(); + } + + private static Double computeValue(AbstractMetricEntry metricEntry) { + if (metricEntry instanceof Counter) { + return ((Counter) metricEntry).getValue(); + } + + if (metricEntry instanceof Timer) { + return (double) ((Timer) metricEntry).getValue().toMillis(); + } + + throw new IllegalArgumentException("Unknown metric type: " + metricEntry.getClass()); + } +} diff --git a/core/src/main/java/io/kestra/core/models/executions/TaskRunAttempt.java b/core/src/main/java/io/kestra/core/models/executions/TaskRunAttempt.java index 6e345ad62c9..a01352eede8 100644 --- a/core/src/main/java/io/kestra/core/models/executions/TaskRunAttempt.java +++ b/core/src/main/java/io/kestra/core/models/executions/TaskRunAttempt.java @@ -2,7 +2,6 @@ import lombok.Builder; import lombok.Value; -import lombok.With; import io.kestra.core.models.flows.State; import java.util.List; @@ -12,23 +11,20 @@ @Value @Builder public class TaskRunAttempt { - @With - List> metrics; + /** + * @deprecated Should always be null, we need to keep it for backward compatibility or the deserialization of old attempt will no longer work. + */ + @Deprecated + public void setMetrics(List> metrics) { + + } @NotNull State state; public TaskRunAttempt withState(State.Type state) { return new TaskRunAttempt( - this.metrics, this.state.withState(state) ); } - - public Optional> findMetrics(String name) { - return this.metrics - .stream() - .filter(metricEntry -> metricEntry.getName().equals(name)) - .findFirst(); - } -} +} \ No newline at end of file diff --git a/core/src/main/java/io/kestra/core/queues/QueueFactoryInterface.java b/core/src/main/java/io/kestra/core/queues/QueueFactoryInterface.java index 56bb59505d8..c095b58cf48 100644 --- a/core/src/main/java/io/kestra/core/queues/QueueFactoryInterface.java +++ b/core/src/main/java/io/kestra/core/queues/QueueFactoryInterface.java @@ -3,6 +3,7 @@ import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.ExecutionKilled; import io.kestra.core.models.executions.LogEntry; +import io.kestra.core.models.executions.MetricEntry; import io.kestra.core.models.triggers.Trigger; import io.kestra.core.runners.*; import io.kestra.core.models.flows.Flow; @@ -16,11 +17,11 @@ public interface QueueFactoryInterface { String FLOW_NAMED = "flowQueue"; String TEMPLATE_NAMED = "templateQueue"; String WORKERTASKLOG_NAMED = "workerTaskLogQueue"; + String METRIC_QUEUE = "workerTaskMetricQueue"; String KILL_NAMED = "executionKilledQueue"; String WORKERINSTANCE_NAMED = "workerInstanceQueue"; String WORKERTASKRUNNING_NAMED = "workerTaskRuninngQueue"; String TRIGGER_NAMED = "triggerQueue"; - String LOG_NAMED = "logQueue"; QueueInterface execution(); @@ -32,6 +33,8 @@ public interface QueueFactoryInterface { QueueInterface logEntry(); + QueueInterface metricEntry(); + QueueInterface flow(); QueueInterface kill(); diff --git a/core/src/main/java/io/kestra/core/queues/QueueService.java b/core/src/main/java/io/kestra/core/queues/QueueService.java index 33dbf122b86..7396ae5aa45 100644 --- a/core/src/main/java/io/kestra/core/queues/QueueService.java +++ b/core/src/main/java/io/kestra/core/queues/QueueService.java @@ -4,6 +4,7 @@ import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.ExecutionKilled; import io.kestra.core.models.executions.LogEntry; +import io.kestra.core.models.executions.MetricEntry; import io.kestra.core.models.flows.Flow; import io.kestra.core.models.templates.Template; import io.kestra.core.models.topologies.FlowTopology; @@ -49,6 +50,8 @@ public String key(Object object) { return ((Executor) object).getExecution().getId(); } else if (object.getClass() == FlowTopology.class) { return ((FlowTopology) object).uid(); + } else if (object.getClass() == MetricEntry.class) { + return null; } else { throw new IllegalArgumentException("Unknown type '" + object.getClass().getName() + "'"); } diff --git a/core/src/main/java/io/kestra/core/repositories/MetricRepositoryInterface.java b/core/src/main/java/io/kestra/core/repositories/MetricRepositoryInterface.java new file mode 100644 index 00000000000..1030ae144b1 --- /dev/null +++ b/core/src/main/java/io/kestra/core/repositories/MetricRepositoryInterface.java @@ -0,0 +1,17 @@ +package io.kestra.core.repositories; + +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.executions.MetricEntry; +import io.micronaut.data.model.Pageable; + +import java.util.List; + +public interface MetricRepositoryInterface extends SaveRepositoryInterface { + ArrayListTotal findByExecutionId(String id, Pageable pageable); + + ArrayListTotal findByExecutionIdAndTaskId(String executionId, String taskId, Pageable pageable); + + ArrayListTotal findByExecutionIdAndTaskRunId(String executionId, String taskRunId, Pageable pageable); + + Integer purge(Execution execution); +} diff --git a/core/src/main/java/io/kestra/core/runners/Indexer.java b/core/src/main/java/io/kestra/core/runners/Indexer.java index e010fbbb25b..829ffb50f3e 100644 --- a/core/src/main/java/io/kestra/core/runners/Indexer.java +++ b/core/src/main/java/io/kestra/core/runners/Indexer.java @@ -3,10 +3,12 @@ import io.kestra.core.metrics.MetricRegistry; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.LogEntry; +import io.kestra.core.models.executions.MetricEntry; import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueInterface; import io.kestra.core.repositories.ExecutionRepositoryInterface; import io.kestra.core.repositories.LogRepositoryInterface; +import io.kestra.core.repositories.MetricRepositoryInterface; import io.kestra.core.repositories.SaveRepositoryInterface; import io.kestra.core.repositories.TriggerRepositoryInterface; import io.micronaut.context.annotation.Requires; @@ -23,6 +25,9 @@ public class Indexer implements IndexerInterface { private final QueueInterface executionQueue; private final LogRepositoryInterface logRepository; private final QueueInterface logQueue; + + private final MetricRepositoryInterface metricRepository; + private final QueueInterface metricQueue; private final MetricRegistry metricRegistry; @Inject @@ -31,12 +36,16 @@ public Indexer( @Named(QueueFactoryInterface.EXECUTION_NAMED) QueueInterface executionQueue, LogRepositoryInterface logRepository, @Named(QueueFactoryInterface.WORKERTASKLOG_NAMED) QueueInterface logQueue, + MetricRepositoryInterface metricRepositor, + @Named(QueueFactoryInterface.METRIC_QUEUE) QueueInterface metricQueue, MetricRegistry metricRegistry ) { this.executionRepository = executionRepository; this.executionQueue = executionQueue; this.logRepository = logRepository; this.logQueue = logQueue; + this.metricRepository = metricRepositor; + this.metricQueue = metricQueue; this.metricRegistry = metricRegistry; } @@ -44,6 +53,7 @@ public Indexer( public void run() { this.send(executionQueue, executionRepository); this.send(logQueue, logRepository); + this.send(metricQueue, metricRepository); } protected void send(QueueInterface queueInterface, SaveRepositoryInterface saveRepositoryInterface) { @@ -62,5 +72,6 @@ protected void send(QueueInterface queueInterface, SaveRepositoryInterfac public void close() throws IOException { this.executionQueue.close(); this.logQueue.close(); + this.metricQueue.close(); } } diff --git a/core/src/main/java/io/kestra/core/runners/RunContext.java b/core/src/main/java/io/kestra/core/runners/RunContext.java index a0a4e4fdf52..bf93486822e 100644 --- a/core/src/main/java/io/kestra/core/runners/RunContext.java +++ b/core/src/main/java/io/kestra/core/runners/RunContext.java @@ -1,7 +1,6 @@ package io.kestra.core.runners; import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.CaseFormat; import com.google.common.collect.ImmutableMap; @@ -17,7 +16,6 @@ import io.kestra.core.models.triggers.AbstractTrigger; import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueInterface; -import io.kestra.core.serializers.JacksonMapper; import io.kestra.core.storages.StorageInterface; import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.Slugify; @@ -36,8 +34,6 @@ @NoArgsConstructor public class RunContext { - private final static ObjectMapper MAPPER = JacksonMapper.ofJson(); - // Injected private ApplicationContext applicationContext; private VariableRenderer variableRenderer; diff --git a/core/src/main/java/io/kestra/core/runners/Worker.java b/core/src/main/java/io/kestra/core/runners/Worker.java index 2e2411c17a2..c93711144bd 100644 --- a/core/src/main/java/io/kestra/core/runners/Worker.java +++ b/core/src/main/java/io/kestra/core/runners/Worker.java @@ -6,6 +6,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.hash.Hashing; +import io.kestra.core.models.executions.MetricEntry; import io.kestra.core.models.executions.TaskRun; import io.kestra.core.models.tasks.Task; import io.micronaut.context.ApplicationContext; @@ -52,6 +53,7 @@ public class Worker implements Runnable, Closeable { private final WorkerTaskQueueInterface workerTaskQueue; private final QueueInterface workerTaskResultQueue; private final QueueInterface executionKilledQueue; + private final QueueInterface metricEntryQueue; private final MetricRegistry metricRegistry; private final Set killedExecution = ConcurrentHashMap.newKeySet(); @@ -75,6 +77,10 @@ public Worker(ApplicationContext applicationContext, int thread) { QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.KILL_NAMED) ); + this.metricEntryQueue = (QueueInterface) applicationContext.getBean( + QueueInterface.class, + Qualifiers.byName(QueueFactoryInterface.METRIC_QUEUE) + ); this.metricRegistry = applicationContext.getBean(MetricRegistry.class); ExecutorsUtils executorsUtils = applicationContext.getBean(ExecutorsUtils.class); @@ -87,9 +93,7 @@ public void run() { if (executionKilled != null) { // @FIXME: the hashset will never expire killed execution killedExecution.add(executionKilled.getExecutionId()); - } - if (executionKilled != null) { synchronized (this) { workerThreadReferences .stream() @@ -189,6 +193,8 @@ private WorkerTaskResult run(WorkerTask workerTask, Boolean cleanUp) throws Queu this.logTerminated(workerTask); + //FIXME should we remove it from the killedExecution set ? + return workerTaskResult; } @@ -350,19 +356,21 @@ private WorkerTask runAttempt(WorkerTask workerTask) { // attempt TaskRunAttempt taskRunAttempt = builder - .metrics(runContext.metrics()) .build() .withState(state); // logs - if (workerThread.getTaskOutput() != null) { + if (workerThread.getTaskOutput() != null && log.isDebugEnabled()) { log.debug("Outputs\n{}", JacksonMapper.log(workerThread.getTaskOutput())); } - if (runContext.metrics().size() > 0) { + if (runContext.metrics().size() > 0 && log.isTraceEnabled()) { log.trace("Metrics\n{}", JacksonMapper.log(runContext.metrics())); } + // metrics + runContext.metrics().forEach(metric -> this.metricEntryQueue.emit(MetricEntry.of(workerTask.getTaskRun(), metric))); + // save outputs List attempts = this.addAttempt(workerTask, taskRunAttempt); @@ -448,6 +456,7 @@ public void close() throws IOException { workerTaskQueue.close(); executionKilledQueue.close(); workerTaskResultQueue.close(); + metricEntryQueue.close(); } @Getter diff --git a/core/src/main/java/io/kestra/core/services/ExecutionService.java b/core/src/main/java/io/kestra/core/services/ExecutionService.java index 2d8fa041f48..93752f5c9eb 100644 --- a/core/src/main/java/io/kestra/core/services/ExecutionService.java +++ b/core/src/main/java/io/kestra/core/services/ExecutionService.java @@ -12,6 +12,7 @@ import io.kestra.core.repositories.ExecutionRepositoryInterface; import io.kestra.core.repositories.FlowRepositoryInterface; import io.kestra.core.repositories.LogRepositoryInterface; +import io.kestra.core.repositories.MetricRepositoryInterface; import io.kestra.core.storages.StorageInterface; import io.kestra.core.tasks.flows.Worker; import io.kestra.core.utils.IdUtils; @@ -51,6 +52,9 @@ public class ExecutionService { @Inject private LogRepositoryInterface logRepository; + @Inject + private MetricRepositoryInterface metricRepository; + public Execution restart(final Execution execution, @Nullable Integer revision) throws Exception { if (!execution.getState().isTerminated()) { throw new IllegalStateException("Execution must be terminated to be restarted, " + @@ -221,6 +225,7 @@ public Execution markAs(final Execution execution, String taskRunId, State.Type public PurgeResult purge( Boolean purgeExecution, Boolean purgeLog, + Boolean purgeMetric, Boolean purgeStorage, @Nullable String namespace, @Nullable String flowId, @@ -247,6 +252,10 @@ public PurgeResult purge( builder.logsCount(this.logRepository.purge(execution)); } + if(purgeMetric) { + this.metricRepository.purge(execution); + } + if (purgeStorage) { builder.storagesCount(storageInterface.deleteByPrefix(URI.create("/" + storageInterface.executionPrefix( execution))).size()); diff --git a/core/src/main/java/io/kestra/core/tasks/storages/Purge.java b/core/src/main/java/io/kestra/core/tasks/storages/Purge.java index 5700413101f..e70cbf98616 100644 --- a/core/src/main/java/io/kestra/core/tasks/storages/Purge.java +++ b/core/src/main/java/io/kestra/core/tasks/storages/Purge.java @@ -79,6 +79,13 @@ public class Purge extends Task implements RunnableTask { @Builder.Default private boolean purgeLog = true; + @Schema( + title = "Purge metric from repository" + ) + @PluginProperty + @Builder.Default + private boolean purgeMetric = true; + @Schema( title = "Purge file from internal storage" ) @@ -93,6 +100,7 @@ public Purge.Output run(RunContext runContext) throws Exception { ExecutionService.PurgeResult purgeResult = executionService.purge( purgeExecution, purgeLog, + purgeMetric, purgeStorage, namespace, flowId, diff --git a/core/src/test/java/io/kestra/core/repositories/AbstractExecutionServiceTest.java b/core/src/test/java/io/kestra/core/repositories/AbstractExecutionServiceTest.java index 9c2c12aaf6e..4f2eec2b379 100644 --- a/core/src/test/java/io/kestra/core/repositories/AbstractExecutionServiceTest.java +++ b/core/src/test/java/io/kestra/core/repositories/AbstractExecutionServiceTest.java @@ -97,6 +97,7 @@ void purge() throws Exception { true, true, true, + true, flow.getNamespace(), flow.getId(), ZonedDateTime.now(), @@ -112,6 +113,7 @@ void purge() throws Exception { true, true, true, + true, flow.getNamespace(), flow.getId(), ZonedDateTime.now(), diff --git a/core/src/test/java/io/kestra/core/repositories/AbstractMetricRepositoryTest.java b/core/src/test/java/io/kestra/core/repositories/AbstractMetricRepositoryTest.java new file mode 100644 index 00000000000..4a901be18d1 --- /dev/null +++ b/core/src/test/java/io/kestra/core/repositories/AbstractMetricRepositoryTest.java @@ -0,0 +1,61 @@ +package io.kestra.core.repositories; + +import com.devskiller.friendly_id.FriendlyId; +import io.kestra.core.models.executions.MetricEntry; +import io.kestra.core.models.executions.TaskRun; +import io.kestra.core.models.executions.metrics.Counter; +import io.kestra.core.models.executions.metrics.Timer; +import io.micronaut.data.model.Pageable; +import io.micronaut.test.extensions.junit5.annotation.MicronautTest; +import jakarta.inject.Inject; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +@MicronautTest(transactional = false) +public abstract class AbstractMetricRepositoryTest { + @Inject + protected MetricRepositoryInterface metricRepository; + + @Test + void all() { + String executionId = FriendlyId.createFriendlyId(); + TaskRun taskRun1 = taskRun(executionId); + MetricEntry counter = MetricEntry.of(taskRun1, counter()); + TaskRun taskRun2 = taskRun(executionId); + MetricEntry timer = MetricEntry.of(taskRun2, timer()); + metricRepository.save(counter); + metricRepository.save(timer); + + List results = metricRepository.findByExecutionId(executionId, Pageable.from(1, 10)); + assertThat(results.size(), is(2)); + + results = metricRepository.findByExecutionIdAndTaskId(executionId, taskRun1.getTaskId(), Pageable.from(1, 10)); + assertThat(results.size(), is(2)); + + results = metricRepository.findByExecutionIdAndTaskRunId(executionId, taskRun1.getId(), Pageable.from(1, 10)); + assertThat(results.size(), is(1)); + } + + private Counter counter() { + return Counter.of("counter", 1); + } + + private Timer timer() { + return Timer.of("counter", Duration.ofSeconds(5)); + } + + private TaskRun taskRun(String executionId) { + return TaskRun.builder() + .flowId("flow") + .namespace("namespace") + .executionId(executionId) + .taskId("task") + .id(FriendlyId.createFriendlyId()) + .build(); + } +} diff --git a/core/src/test/java/io/kestra/core/repositories/ExecutionFixture.java b/core/src/test/java/io/kestra/core/repositories/ExecutionFixture.java index d1796988c94..0a0d046d24c 100644 --- a/core/src/test/java/io/kestra/core/repositories/ExecutionFixture.java +++ b/core/src/test/java/io/kestra/core/repositories/ExecutionFixture.java @@ -28,7 +28,6 @@ class ExecutionFixture { .state(new State()) .attempts(Collections.singletonList( TaskRunAttempt.builder() - .metrics(Collections.singletonList(Counter.of("counter", 1))) .build() )) .outputs(ImmutableMap.of( @@ -53,7 +52,6 @@ class ExecutionFixture { .state(new State()) .attempts(Collections.singletonList( TaskRunAttempt.builder() - .metrics(Collections.singletonList(Timer.of("test", Duration.ofMillis(150)))) .build() )) .outputs(ImmutableMap.of( diff --git a/core/src/test/java/io/kestra/core/runners/RunContextTest.java b/core/src/test/java/io/kestra/core/runners/RunContextTest.java index 05608fa27d8..06ee5f74c0e 100644 --- a/core/src/test/java/io/kestra/core/runners/RunContextTest.java +++ b/core/src/test/java/io/kestra/core/runners/RunContextTest.java @@ -124,24 +124,6 @@ void variables() throws TimeoutException { assertThat(execution.getTaskRunList().get(2).getOutputs().get("value"), is("return")); } - @SuppressWarnings("OptionalGetWithoutIsPresent") - @Test - void metrics() throws TimeoutException { - Execution execution = runnerUtils.runOne("io.kestra.tests", "return"); - - TaskRunAttempt taskRunAttempt = execution.getTaskRunList() - .get(1) - .getAttempts() - .get(0); - Counter length = (Counter) taskRunAttempt.findMetrics("length").get(); - Timer duration = (Timer) taskRunAttempt.findMetrics("duration").get(); - - assertThat(execution.getTaskRunList(), hasSize(3)); - assertThat(length.getValue(), is(7.0D)); - assertThat(duration.getValue().getNano(), is(greaterThan(0))); - assertThat(duration.getTags().get("format"), is("{{task.id}}")); - } - @Test void taskDefaults() throws TimeoutException, IOException, URISyntaxException { repositoryLoader.load(Objects.requireNonNull(ListenersTest.class.getClassLoader().getResource("flows/tests/task-defaults.yaml"))); diff --git a/jdbc-h2/src/main/java/io/kestra/repository/h2/H2MetricRepository.java b/jdbc-h2/src/main/java/io/kestra/repository/h2/H2MetricRepository.java new file mode 100644 index 00000000000..665994d3e01 --- /dev/null +++ b/jdbc-h2/src/main/java/io/kestra/repository/h2/H2MetricRepository.java @@ -0,0 +1,17 @@ +package io.kestra.repository.h2; + +import io.kestra.core.models.executions.MetricEntry; +import io.kestra.jdbc.repository.AbstractJdbcMetricRepository; +import io.micronaut.context.ApplicationContext; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; + +@Singleton +@H2RepositoryEnabled +public class H2MetricRepository extends AbstractJdbcMetricRepository { + @Inject + public H2MetricRepository(ApplicationContext applicationContext) { + super(new H2Repository<>(MetricEntry.class, applicationContext)); + } +} + diff --git a/jdbc-h2/src/main/java/io/kestra/runner/h2/H2QueueFactory.java b/jdbc-h2/src/main/java/io/kestra/runner/h2/H2QueueFactory.java index 7767a66c010..f168eb6bf6a 100644 --- a/jdbc-h2/src/main/java/io/kestra/runner/h2/H2QueueFactory.java +++ b/jdbc-h2/src/main/java/io/kestra/runner/h2/H2QueueFactory.java @@ -3,6 +3,7 @@ import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.ExecutionKilled; import io.kestra.core.models.executions.LogEntry; +import io.kestra.core.models.executions.MetricEntry; import io.kestra.core.models.flows.Flow; import io.kestra.core.models.templates.Template; import io.kestra.core.models.triggers.Trigger; @@ -58,6 +59,13 @@ public QueueInterface logEntry() { return new H2Queue<>(LogEntry.class, applicationContext); } + @Override + @Singleton + @Named(QueueFactoryInterface.METRIC_QUEUE) + public QueueInterface metricEntry() { + return new H2Queue<>(MetricEntry.class, applicationContext); + } + @Override @Singleton @Named(QueueFactoryInterface.FLOW_NAMED) diff --git a/jdbc-h2/src/main/resources/migrations/h2/V8__metrics.sql b/jdbc-h2/src/main/resources/migrations/h2/V8__metrics.sql new file mode 100644 index 00000000000..f9bf50572c2 --- /dev/null +++ b/jdbc-h2/src/main/resources/migrations/h2/V8__metrics.sql @@ -0,0 +1,32 @@ +ALTER TABLE queues +ALTER COLUMN "type" ENUM( + 'io.kestra.core.models.executions.Execution', + 'io.kestra.core.models.flows.Flow', + 'io.kestra.core.models.templates.Template', + 'io.kestra.core.models.executions.ExecutionKilled', + 'io.kestra.core.runners.WorkerTask', + 'io.kestra.core.runners.WorkerTaskResult', + 'io.kestra.core.runners.WorkerInstance', + 'io.kestra.core.runners.WorkerTaskRunning', + 'io.kestra.core.models.executions.LogEntry', + 'io.kestra.core.models.triggers.Trigger', + 'io.kestra.core.models.executions.MetricEntry' + ) NOT NULL; + +/* ----------------------- metrics ----------------------- */ +CREATE TABLE metrics ( + "key" VARCHAR(30) NOT NULL PRIMARY KEY, + "value" TEXT NOT NULL, + "deleted" BOOL NOT NULL GENERATED ALWAYS AS (JQ_BOOLEAN("value", '.deleted')), + "namespace" VARCHAR(150) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value", '.namespace')), + "flow_id" VARCHAR(150) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value", '.flowId')), + "task_id" VARCHAR(150) GENERATED ALWAYS AS (JQ_STRING("value", '.taskId')), + "execution_id" VARCHAR(150) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value", '.executionId')), + "taskrun_id" VARCHAR(150) GENERATED ALWAYS AS (JQ_STRING("value", '.taskRunId')), + "metric_name" VARCHAR(150) GENERATED ALWAYS AS (JQ_STRING("value", '.name')), + "timestamp" TIMESTAMP NOT NULL GENERATED ALWAYS AS (PARSEDATETIME(JQ_STRING("value", '.timestamp'), 'yyyy-MM-dd''T''HH:mm:ss.SSS''Z''')) +); + +CREATE INDEX metrics_flow_id ON logs ("deleted", "namespace", "flow_id"); +CREATE INDEX metrics_execution_id ON logs ("deleted", "execution_id"); +CREATE INDEX metrics_timestamp ON logs ("deleted", "timestamp"); \ No newline at end of file diff --git a/jdbc-h2/src/test/java/io/kestra/repository/h2/H2MetricRepositoryTest.java b/jdbc-h2/src/test/java/io/kestra/repository/h2/H2MetricRepositoryTest.java new file mode 100644 index 00000000000..de6499d9c41 --- /dev/null +++ b/jdbc-h2/src/test/java/io/kestra/repository/h2/H2MetricRepositoryTest.java @@ -0,0 +1,6 @@ +package io.kestra.repository.h2; + +import io.kestra.jdbc.repository.AbstractJdbcMetricRepositoryTest; + +public class H2MetricRepositoryTest extends AbstractJdbcMetricRepositoryTest { +} diff --git a/jdbc-h2/src/test/resources/application.yml b/jdbc-h2/src/test/resources/application.yml index 11ec5d68dd8..16b2d29ffbc 100644 --- a/jdbc-h2/src/test/resources/application.yml +++ b/jdbc-h2/src/test/resources/application.yml @@ -41,6 +41,9 @@ kestra: logs: table: "logs" cls: io.kestra.core.models.executions.LogEntry + metrics: + table: "metrics" + cls: io.kestra.core.models.executions.MetricEntry multipleconditions: table: "multipleconditions" cls: io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow diff --git a/jdbc-mysql/src/main/java/io/kestra/repository/mysql/MysqlMetricRepository.java b/jdbc-mysql/src/main/java/io/kestra/repository/mysql/MysqlMetricRepository.java new file mode 100644 index 00000000000..82fa6643da1 --- /dev/null +++ b/jdbc-mysql/src/main/java/io/kestra/repository/mysql/MysqlMetricRepository.java @@ -0,0 +1,17 @@ +package io.kestra.repository.mysql; + +import io.kestra.core.models.executions.MetricEntry; +import io.kestra.jdbc.repository.AbstractJdbcMetricRepository; +import io.micronaut.context.ApplicationContext; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; + +@Singleton +@MysqlRepositoryEnabled +public class MysqlMetricRepository extends AbstractJdbcMetricRepository { + @Inject + public MysqlMetricRepository(ApplicationContext applicationContext) { + super(new MysqlRepository<>(MetricEntry.class, applicationContext)); + } +} + diff --git a/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlQueueFactory.java b/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlQueueFactory.java index 47b6059d61e..829f76778a3 100644 --- a/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlQueueFactory.java +++ b/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlQueueFactory.java @@ -3,6 +3,7 @@ import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.ExecutionKilled; import io.kestra.core.models.executions.LogEntry; +import io.kestra.core.models.executions.MetricEntry; import io.kestra.core.models.flows.Flow; import io.kestra.core.models.templates.Template; import io.kestra.core.models.triggers.Trigger; @@ -58,6 +59,13 @@ public QueueInterface logEntry() { return new MysqlQueue<>(LogEntry.class, applicationContext); } + @Override + @Singleton + @Named(QueueFactoryInterface.METRIC_QUEUE) + public QueueInterface metricEntry() { + return new MysqlQueue<>(MetricEntry.class, applicationContext); + } + @Override @Singleton @Named(QueueFactoryInterface.FLOW_NAMED) diff --git a/jdbc-mysql/src/main/resources/migrations/mysql/V8__metrics.sql b/jdbc-mysql/src/main/resources/migrations/mysql/V8__metrics.sql new file mode 100644 index 00000000000..c2a82eb0526 --- /dev/null +++ b/jdbc-mysql/src/main/resources/migrations/mysql/V8__metrics.sql @@ -0,0 +1,31 @@ +ALTER TABLE queues MODIFY COLUMN `type` + ENUM( + 'io.kestra.core.models.executions.Execution', + 'io.kestra.core.models.flows.Flow', + 'io.kestra.core.models.templates.Template', + 'io.kestra.core.models.executions.ExecutionKilled', + 'io.kestra.core.runners.WorkerTask', + 'io.kestra.core.runners.WorkerTaskResult', + 'io.kestra.core.runners.WorkerInstance', + 'io.kestra.core.runners.WorkerTaskRunning', + 'io.kestra.core.models.executions.LogEntry', + 'io.kestra.core.models.triggers.Trigger', + 'io.kestra.core.models.executions.MetricEntry' + ) NOT NULL; + +/* ----------------------- metrics ----------------------- */ +CREATE TABLE metrics ( + `key` VARCHAR(30) NOT NULL PRIMARY KEY, + `value` JSON NOT NULL, + `deleted` BOOL GENERATED ALWAYS AS (value ->> '$.deleted' = 'true') STORED NOT NULL, + `namespace` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.namespace') STORED NOT NULL, + `flow_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.flowId') STORED NOT NULL, + `task_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.taskId') STORED, + `execution_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.executionId') STORED NOT NULL, + `taskrun_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.taskRunId') STORED, + `metric_name` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.name') STORED, + `timestamp` DATETIME(6) GENERATED ALWAYS AS (STR_TO_DATE(value ->> '$.timestamp' , '%Y-%m-%dT%H:%i:%s.%fZ')) STORED NOT NULL, + INDEX ix_metrics_flow_id (deleted, namespace, flow_id), + INDEX ix_metrics_execution_id (deleted, execution_id), + INDEX ix_metrics_timestamp (deleted, timestamp) +) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; diff --git a/jdbc-mysql/src/test/java/io/kestra/repository/mysql/MysqlMetricRepositoryTest.java b/jdbc-mysql/src/test/java/io/kestra/repository/mysql/MysqlMetricRepositoryTest.java new file mode 100644 index 00000000000..d3471bdfbcf --- /dev/null +++ b/jdbc-mysql/src/test/java/io/kestra/repository/mysql/MysqlMetricRepositoryTest.java @@ -0,0 +1,6 @@ +package io.kestra.repository.mysql; + +import io.kestra.jdbc.repository.AbstractJdbcMetricRepositoryTest; + +public class MysqlMetricRepositoryTest extends AbstractJdbcMetricRepositoryTest { +} diff --git a/jdbc-mysql/src/test/resources/application.yml b/jdbc-mysql/src/test/resources/application.yml index 18b1171e2da..00f49ceeaad 100644 --- a/jdbc-mysql/src/test/resources/application.yml +++ b/jdbc-mysql/src/test/resources/application.yml @@ -42,6 +42,9 @@ kestra: logs: table: "logs" cls: io.kestra.core.models.executions.LogEntry + metrics: + table: "metrics" + cls: io.kestra.core.models.executions.MetricEntry multipleconditions: table: "multipleconditions" cls: io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow diff --git a/jdbc-postgres/src/main/java/io/kestra/repository/postgres/PostgresMetricRepository.java b/jdbc-postgres/src/main/java/io/kestra/repository/postgres/PostgresMetricRepository.java new file mode 100644 index 00000000000..c7a3a8fd85a --- /dev/null +++ b/jdbc-postgres/src/main/java/io/kestra/repository/postgres/PostgresMetricRepository.java @@ -0,0 +1,17 @@ +package io.kestra.repository.postgres; + +import io.kestra.core.models.executions.MetricEntry; +import io.kestra.jdbc.repository.AbstractJdbcMetricRepository; +import io.micronaut.context.ApplicationContext; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; + +@Singleton +@PostgresRepositoryEnabled +public class PostgresMetricRepository extends AbstractJdbcMetricRepository { + @Inject + public PostgresMetricRepository(ApplicationContext applicationContext) { + super(new PostgresRepository<>(MetricEntry.class, applicationContext)); + } +} + diff --git a/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresQueueFactory.java b/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresQueueFactory.java index 6a1e2fed165..7dcd87636d3 100644 --- a/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresQueueFactory.java +++ b/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresQueueFactory.java @@ -3,6 +3,7 @@ import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.ExecutionKilled; import io.kestra.core.models.executions.LogEntry; +import io.kestra.core.models.executions.MetricEntry; import io.kestra.core.models.flows.Flow; import io.kestra.core.models.templates.Template; import io.kestra.core.models.triggers.Trigger; @@ -58,6 +59,13 @@ public QueueInterface logEntry() { return new PostgresQueue<>(LogEntry.class, applicationContext); } + @Override + @Singleton + @Named(QueueFactoryInterface.METRIC_QUEUE) + public QueueInterface metricEntry() { + return new PostgresQueue<>(MetricEntry.class, applicationContext); + } + @Override @Singleton @Named(QueueFactoryInterface.FLOW_NAMED) diff --git a/jdbc-postgres/src/main/resources/migrations/postgres/V8__metrics.sql b/jdbc-postgres/src/main/resources/migrations/postgres/V8__metrics.sql new file mode 100644 index 00000000000..bb479c09c41 --- /dev/null +++ b/jdbc-postgres/src/main/resources/migrations/postgres/V8__metrics.sql @@ -0,0 +1,19 @@ +ALTER TYPE queue_type ADD VALUE 'io.kestra.core.models.executions.MetricEntry'; + +/* ----------------------- metrics ----------------------- */ +CREATE TABLE metrics ( + key VARCHAR(30) NOT NULL PRIMARY KEY, + value JSONB NOT NULL, + deleted BOOL NOT NULL GENERATED ALWAYS AS (CAST(value ->> 'deleted' AS bool)) STORED, + namespace VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'namespace') STORED, + flow_id VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'flowId') STORED, + task_id VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'taskId') STORED, + execution_id VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'executionId') STORED, + taskrun_id VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'taskRunId') STORED, + metric_name VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'name') STORED, + timestamp TIMESTAMPTZ NOT NULL GENERATED ALWAYS AS (PARSE_ISO8601_DATETIME(value ->> 'timestamp')) STORED +); + +CREATE INDEX metrics_flow_id ON logs (deleted, namespace, flow_id); +CREATE INDEX metrics_execution_id ON logs (deleted, execution_id); +CREATE INDEX metrics_timestamp ON logs (deleted, timestamp); \ No newline at end of file diff --git a/jdbc-postgres/src/test/java/io/kestra/repository/postgres/PostgresMetricRepositoryTest.java b/jdbc-postgres/src/test/java/io/kestra/repository/postgres/PostgresMetricRepositoryTest.java new file mode 100644 index 00000000000..34b229625c4 --- /dev/null +++ b/jdbc-postgres/src/test/java/io/kestra/repository/postgres/PostgresMetricRepositoryTest.java @@ -0,0 +1,6 @@ +package io.kestra.repository.postgres; + +import io.kestra.jdbc.repository.AbstractJdbcMetricRepositoryTest; + +public class PostgresMetricRepositoryTest extends AbstractJdbcMetricRepositoryTest { +} diff --git a/jdbc-postgres/src/test/resources/application.yml b/jdbc-postgres/src/test/resources/application.yml index cf5b1fa375a..0bc8c053d3d 100644 --- a/jdbc-postgres/src/test/resources/application.yml +++ b/jdbc-postgres/src/test/resources/application.yml @@ -42,6 +42,9 @@ kestra: logs: table: "logs" cls: io.kestra.core.models.executions.LogEntry + metrics: + table: "metrics" + cls: io.kestra.core.models.executions.MetricEntry multipleconditions: table: "multipleconditions" cls: io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow diff --git a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcMetricRepository.java b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcMetricRepository.java new file mode 100644 index 00000000000..28fd4e309e3 --- /dev/null +++ b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcMetricRepository.java @@ -0,0 +1,100 @@ +package io.kestra.jdbc.repository; + +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.executions.MetricEntry; +import io.kestra.core.repositories.ArrayListTotal; +import io.kestra.core.repositories.MetricRepositoryInterface; +import io.kestra.jdbc.runner.JdbcIndexerInterface; +import io.micronaut.data.model.Pageable; +import jakarta.inject.Singleton; +import org.jooq.Condition; +import org.jooq.DSLContext; +import org.jooq.Field; +import org.jooq.Record1; +import org.jooq.SelectConditionStep; +import org.jooq.SortOrder; +import org.jooq.impl.DSL; + +import java.util.List; +import java.util.Map; + +@Singleton +public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepository implements MetricRepositoryInterface, JdbcIndexerInterface { + protected io.kestra.jdbc.AbstractJdbcRepository jdbcRepository; + + public AbstractJdbcMetricRepository(io.kestra.jdbc.AbstractJdbcRepository jdbcRepository) { + this.jdbcRepository = jdbcRepository; + } + + @Override + public ArrayListTotal findByExecutionId(String id, Pageable pageable) { + return this.query( + field("execution_id").eq(id) + , pageable + ); + } + + @Override + public ArrayListTotal findByExecutionIdAndTaskId(String executionId, String taskId, Pageable pageable) { + return this.query( + field("execution_id").eq(executionId) + .and(field("task_id").eq(taskId)), + pageable + ); + } + + @Override + public ArrayListTotal findByExecutionIdAndTaskRunId(String executionId, String taskRunId, Pageable pageable) { + return this.query( + field("execution_id").eq(executionId) + .and(field("taskrun_id").eq(taskRunId)), + pageable + ); + } + + @Override + public MetricEntry save(MetricEntry metric) { + Map, Object> fields = this.jdbcRepository.persistFields(metric); + this.jdbcRepository.persist(metric, fields); + + return metric; + } + + @Override + public Integer purge(Execution execution) { + return this.jdbcRepository + .getDslContextWrapper() + .transactionResult(configuration -> { + DSLContext context = DSL.using(configuration); + + return context.delete(this.jdbcRepository.getTable()) + .where(field("execution_id", String.class).eq(execution.getId())) + .execute(); + }); + } + + @Override + public MetricEntry save(DSLContext dslContext, MetricEntry metric) { + Map, Object> fields = this.jdbcRepository.persistFields(metric); + this.jdbcRepository.persist(metric, dslContext, fields); + + return metric; + } + + private ArrayListTotal query(Condition condition, Pageable pageable) { + return this.jdbcRepository + .getDslContextWrapper() + .transactionResult(configuration -> { + DSLContext context = DSL.using(configuration); + SelectConditionStep> select = DSL + .using(configuration) + .select(field("value")) + .from(this.jdbcRepository.getTable()) + .where(this.defaultFilter()); + + select = select.and(condition); + + return this.jdbcRepository.fetchPage(context, select, pageable); + }); + } +} diff --git a/jdbc/src/test/java/io/kestra/jdbc/repository/AbstractJdbcMetricRepositoryTest.java b/jdbc/src/test/java/io/kestra/jdbc/repository/AbstractJdbcMetricRepositoryTest.java new file mode 100644 index 00000000000..688fb33d339 --- /dev/null +++ b/jdbc/src/test/java/io/kestra/jdbc/repository/AbstractJdbcMetricRepositoryTest.java @@ -0,0 +1,17 @@ +package io.kestra.jdbc.repository; + +import io.kestra.core.repositories.AbstractMetricRepositoryTest; +import io.kestra.jdbc.JdbcTestUtils; +import jakarta.inject.Inject; +import org.junit.jupiter.api.BeforeEach; + +public abstract class AbstractJdbcMetricRepositoryTest extends AbstractMetricRepositoryTest { + @Inject + JdbcTestUtils jdbcTestUtils; + + @BeforeEach + protected void init() { + jdbcTestUtils.drop(); + jdbcTestUtils.migrate(); + } +} diff --git a/repository-memory/src/main/java/io/kestra/repository/memory/MemoryMetricRepository.java b/repository-memory/src/main/java/io/kestra/repository/memory/MemoryMetricRepository.java new file mode 100644 index 00000000000..a6663964da9 --- /dev/null +++ b/repository-memory/src/main/java/io/kestra/repository/memory/MemoryMetricRepository.java @@ -0,0 +1,45 @@ +package io.kestra.repository.memory; + +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.executions.MetricEntry; +import io.kestra.core.repositories.ArrayListTotal; +import io.kestra.core.repositories.MetricRepositoryInterface; +import io.micronaut.data.model.Pageable; +import jakarta.inject.Singleton; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +@Singleton +@MemoryRepositoryEnabled +public class MemoryMetricRepository implements MetricRepositoryInterface { + private final List metrics = new ArrayList<>(); + + @Override + public ArrayListTotal findByExecutionId(String id, Pageable pageable) { + var results = metrics.stream().filter(metrics -> metrics.getExecutionId().equals(id)).collect(Collectors.toList()); + return new ArrayListTotal<>(results, results.size()); + } + + @Override + public ArrayListTotal findByExecutionIdAndTaskId(String executionId, String taskId, Pageable pageable) { + throw new UnsupportedOperationException(); + } + + @Override + public ArrayListTotal findByExecutionIdAndTaskRunId(String executionId, String taskRunId, Pageable pageable) { + throw new UnsupportedOperationException(); + } + + @Override + public Integer purge(Execution execution) { + throw new UnsupportedOperationException(); + } + + @Override + public MetricEntry save(MetricEntry metricEntry) { + metrics.add(metricEntry); + return metricEntry; + } +} diff --git a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryQueueFactory.java b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryQueueFactory.java index fcb3464436d..b170fc41db8 100644 --- a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryQueueFactory.java +++ b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryQueueFactory.java @@ -1,5 +1,6 @@ package io.kestra.runner.memory; +import io.kestra.core.models.executions.MetricEntry; import io.micronaut.context.ApplicationContext; import io.micronaut.context.annotation.Factory; import org.apache.commons.lang3.NotImplementedException; @@ -59,6 +60,13 @@ public QueueInterface logEntry() { return new MemoryQueue<>(LogEntry.class, applicationContext); } + @Override + @Singleton + @Named(QueueFactoryInterface.METRIC_QUEUE) + public QueueInterface metricEntry() { + return new MemoryQueue<>(MetricEntry.class, applicationContext); + } + @Override @Singleton @Named(QueueFactoryInterface.FLOW_NAMED) diff --git a/ui/src/components/executions/ExecutionMetric.vue b/ui/src/components/executions/ExecutionMetric.vue new file mode 100644 index 00000000000..b0bda7667c0 --- /dev/null +++ b/ui/src/components/executions/ExecutionMetric.vue @@ -0,0 +1,151 @@ + + diff --git a/ui/src/components/executions/ExecutionRoot.vue b/ui/src/components/executions/ExecutionRoot.vue index 6bdabab5a64..75b381473a3 100644 --- a/ui/src/components/executions/ExecutionRoot.vue +++ b/ui/src/components/executions/ExecutionRoot.vue @@ -47,6 +47,7 @@ import Tabs from "../../components/Tabs.vue"; import State from "../../utils/state"; + import ExecutionMetric from "./ExecutionMetric.vue"; export default { mixins: [RouteContext], @@ -145,6 +146,11 @@ name: "outputs", component: ExecutionOutput, title: title("outputs") + }, + { + name: "metrics", + component: ExecutionMetric, + title: title("metrics") } ]; }, diff --git a/ui/src/components/executions/Metrics.vue b/ui/src/components/executions/Metrics.vue index 5dcdaf54788..70666f6ed6e 100644 --- a/ui/src/components/executions/Metrics.vue +++ b/ui/src/components/executions/Metrics.vue @@ -1,7 +1,6 @@