diff --git a/core/src/main/java/io/kestra/core/models/flows/State.java b/core/src/main/java/io/kestra/core/models/flows/State.java index 54ebc04ad83..4f2f002b605 100644 --- a/core/src/main/java/io/kestra/core/models/flows/State.java +++ b/core/src/main/java/io/kestra/core/models/flows/State.java @@ -91,6 +91,14 @@ public String humanDuration() { } } + public Instant maxDate() { + if (this.histories.size() == 0) { + return Instant.now(); + } + + return this.histories.get(this.histories.size() - 1).getDate(); + } + @JsonIgnore public boolean isTerninated() { return this.current.isTerninated(); diff --git a/core/src/main/java/io/kestra/core/runners/ExecutionDelay.java b/core/src/main/java/io/kestra/core/runners/ExecutionDelay.java new file mode 100644 index 00000000000..6b43d82e807 --- /dev/null +++ b/core/src/main/java/io/kestra/core/runners/ExecutionDelay.java @@ -0,0 +1,22 @@ +package io.kestra.core.runners; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Value; + +import java.time.Instant; +import javax.validation.constraints.NotNull; + +@Value +@AllArgsConstructor +@Builder +public class ExecutionDelay { + @NotNull + String taskRunId; + + @NotNull + String executionId; + + @NotNull + Instant date; +} diff --git a/core/src/main/java/io/kestra/core/runners/Executor.java b/core/src/main/java/io/kestra/core/runners/Executor.java index 8c82415aa55..f95f4de8468 100644 --- a/core/src/main/java/io/kestra/core/runners/Executor.java +++ b/core/src/main/java/io/kestra/core/runners/Executor.java @@ -23,6 +23,7 @@ public class Executor { private final List nexts = new ArrayList<>(); private final List workerTasks = new ArrayList<>(); private final List workerTaskResults = new ArrayList<>(); + private final List executionDelays = new ArrayList<>(); private WorkerTaskResult joined; private final List workerTaskExecutions = new ArrayList<>(); @@ -78,6 +79,13 @@ public Executor withWorkerTaskResults(List workerTaskResults, return this; } + public Executor withWorkerTaskDelays(List executionDelays, String from) { + this.executionDelays.addAll(executionDelays); + this.from.add(from); + + return this; + } + public Executor withWorkerTaskExecutions(List newExecutions, String from) { this.workerTaskExecutions.addAll(newExecutions); this.from.add(from); diff --git a/core/src/main/java/io/kestra/core/runners/ExecutorService.java b/core/src/main/java/io/kestra/core/runners/ExecutorService.java index fc7d28f4ad6..bc6e34800d4 100644 --- a/core/src/main/java/io/kestra/core/runners/ExecutorService.java +++ b/core/src/main/java/io/kestra/core/runners/ExecutorService.java @@ -12,6 +12,7 @@ import io.kestra.core.models.tasks.ResolvedTask; import io.kestra.core.models.tasks.Task; import io.kestra.core.services.ConditionService; +import io.kestra.core.tasks.flows.Pause; import io.micronaut.context.ApplicationContext; import jakarta.inject.Inject; import jakarta.inject.Singleton; @@ -20,6 +21,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; @@ -368,9 +370,42 @@ private Executor handleChildWorkerTaskResult(Executor executor) throws InternalE return executor; } + executor = this.handlePausedDelay(executor, list); + return executor.withWorkerTaskResults(list, "handleChildWorkerTaskResult"); } + private Executor handlePausedDelay(Executor executor, List workerTaskResults) throws InternalException { + List list = workerTaskResults + .stream() + .filter(workerTaskResult -> workerTaskResult.getTaskRun().getState().getCurrent() == State.Type.PAUSED) + .map(throwFunction(workerTaskResult -> { + Task task = executor.getFlow().findTaskByTaskId(workerTaskResult.getTaskRun().getTaskId()); + + if (task instanceof Pause) { + Pause pauseTask = (Pause) task; + + if (pauseTask.getDelay() != null) { + return ExecutionDelay.builder() + .taskRunId(workerTaskResult.getTaskRun().getId()) + .executionId(executor.getExecution().getId()) + .date(workerTaskResult.getTaskRun().getState().maxDate().plus(pauseTask.getDelay())) + .build(); + } + } + + return null; + })) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + if (list.size() == 0) { + return executor; + } + + return executor.withWorkerTaskDelays(list, "handlePausedDelay"); + } + private Executor handleChildWorkerCreatedKilling(Executor executor) throws InternalException { if (executor.getExecution().getTaskRunList() == null || executor.getExecution().getState().getCurrent() != State.Type.KILLING) { return executor; @@ -566,6 +601,18 @@ private Executor handleFlowTask(final Executor executor) { return resultExecutor; } + public boolean canBePurged(final Executor executor) { + return conditionService.isTerminatedWithListeners(executor.getFlow(), executor.getExecution()) + // we don't purge pause execution in order to be able to restart automatically in case of delay + && executor.getExecution().getState().getCurrent() != State.Type.PAUSED + // we don't purge killed execution in order to have feedback about child running tasks + // this can be killed lately (after the executor kill the execution), but we want to keep + // feedback about the actual state (killed or not) + // @TODO: this can lead to infinite state store for most executor topic + && executor.getExecution().getState().getCurrent() != State.Type.KILLED; + + } + public void log(Logger log, Boolean in, WorkerTask value) { log.debug( "{} {} : {}", diff --git a/core/src/main/java/io/kestra/core/tasks/flows/Pause.java b/core/src/main/java/io/kestra/core/tasks/flows/Pause.java index cd66394abfb..7d25be234cd 100644 --- a/core/src/main/java/io/kestra/core/tasks/flows/Pause.java +++ b/core/src/main/java/io/kestra/core/tasks/flows/Pause.java @@ -18,6 +18,7 @@ import lombok.ToString; import lombok.experimental.SuperBuilder; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -28,7 +29,7 @@ @Getter @NoArgsConstructor @Schema( - title = "Pause current execution and wait for a manual approval" + title = "Pause current execution and wait for a manual approval or a delay" ) @Plugin( examples = { @@ -54,6 +55,12 @@ } ) public class Pause extends Sequential implements FlowableTask { + @Schema( + title = "Duration of the pause.", + description = "If null, a manual approval is need, if not, the delay before automatically continue the execution" + ) + private Duration delay; + @Override public List resolveNexts(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException { if (this.needPause(parentTaskRun) || parentTaskRun.getState().getCurrent() == State.Type.PAUSED) { diff --git a/core/src/test/java/io/kestra/core/Helpers.java b/core/src/test/java/io/kestra/core/Helpers.java index ae3d6c475bd..b9aef3a5a5b 100644 --- a/core/src/test/java/io/kestra/core/Helpers.java +++ b/core/src/test/java/io/kestra/core/Helpers.java @@ -20,7 +20,7 @@ import java.util.function.Consumer; public class Helpers { - public static long FLOWS_COUNT = 48; + public static long FLOWS_COUNT = 49; public static ApplicationContext applicationContext() throws URISyntaxException { return applicationContext( diff --git a/core/src/test/java/io/kestra/core/tasks/flows/PauseTest.java b/core/src/test/java/io/kestra/core/tasks/flows/PauseTest.java index 207bd1305b4..2d00412e9a7 100644 --- a/core/src/test/java/io/kestra/core/tasks/flows/PauseTest.java +++ b/core/src/test/java/io/kestra/core/tasks/flows/PauseTest.java @@ -1,7 +1,5 @@ package io.kestra.core.tasks.flows; -import com.google.common.collect.ImmutableMap; -import io.kestra.core.exceptions.InternalException; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.State; @@ -10,14 +8,12 @@ import io.kestra.core.repositories.FlowRepositoryInterface; import io.kestra.core.runners.AbstractMemoryRunnerTest; import io.kestra.core.services.ExecutionService; +import jakarta.inject.Inject; +import jakarta.inject.Named; import org.junit.jupiter.api.Test; import java.time.Duration; import java.util.List; -import java.util.concurrent.TimeoutException; - -import jakarta.inject.Inject; -import jakarta.inject.Named; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasSize; @@ -57,4 +53,23 @@ void run() throws Exception { assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); } + + @Test + void runDelay() throws Exception { + Execution execution = runnerUtils.runOne("io.kestra.tests", "pause-delay"); + + assertThat(execution.getState().getCurrent(), is(State.Type.PAUSED)); + assertThat(execution.getTaskRunList().get(0).getState().getCurrent(), is(State.Type.PAUSED)); + assertThat(execution.getTaskRunList(), hasSize(1)); + + execution = runnerUtils.awaitExecution( + e -> e.getState().getCurrent() == State.Type.SUCCESS, + () -> {}, + Duration.ofSeconds(30) + ); + + assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.PAUSED).count(), is(1L)); + assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.RUNNING).count(), is(2L)); + assertThat(execution.getTaskRunList(), hasSize(3)); + } } \ No newline at end of file diff --git a/core/src/test/resources/flows/valids/pause-delay.yaml b/core/src/test/resources/flows/valids/pause-delay.yaml new file mode 100644 index 00000000000..ace4f4a6aaf --- /dev/null +++ b/core/src/test/resources/flows/valids/pause-delay.yaml @@ -0,0 +1,15 @@ +id: pause-delay +namespace: io.kestra.tests + +tasks: + - id: pause + type: io.kestra.core.tasks.flows.Pause + delay: PT1S + tasks: + - id: ko + type: io.kestra.core.tasks.scripts.Bash + commands: + - echo "trigger 1 seconds pause" + - id: last + type: io.kestra.core.tasks.debugs.Return + format: "{{task.id}} > {{taskrun.startDate}}" diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/executors/ExecutorMain.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/executors/ExecutorMain.java index 6b5108f1a19..4f24cdc26ac 100644 --- a/runner-kafka/src/main/java/io/kestra/runner/kafka/executors/ExecutorMain.java +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/executors/ExecutorMain.java @@ -5,23 +5,19 @@ import io.kestra.core.models.executions.ExecutionKilled; import io.kestra.core.models.executions.LogEntry; import io.kestra.core.models.executions.TaskRun; -import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.State; -import io.kestra.core.models.templates.Template; import io.kestra.core.queues.QueueService; import io.kestra.core.runners.*; import io.kestra.core.services.ConditionService; +import io.kestra.core.services.ExecutionService; import io.kestra.core.services.FlowService; import io.kestra.runner.kafka.KafkaFlowExecutor; import io.kestra.runner.kafka.KafkaQueueEnabled; -import io.kestra.runner.kafka.KafkaTemplateExecutor; import io.kestra.runner.kafka.serializers.JsonSerde; import io.kestra.runner.kafka.services.KafkaAdminService; -import io.kestra.runner.kafka.services.KafkaStreamService; import io.kestra.runner.kafka.services.KafkaStreamSourceService; import io.kestra.runner.kafka.services.KafkaStreamsBuilder; import io.kestra.runner.kafka.streams.*; -import io.micronaut.context.ApplicationContext; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; @@ -32,10 +28,10 @@ import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.*; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.Stores; import org.slf4j.event.Level; +import java.time.Duration; import java.util.HashMap; import java.util.Map; @@ -50,6 +46,7 @@ public class ExecutorMain implements KafkaExecutorInterface { private static final String WORKERTASK_DEDUPLICATION_STATE_STORE_NAME = "workertask_deduplication"; private static final String TRIGGER_DEDUPLICATION_STATE_STORE_NAME = "trigger_deduplication"; private static final String NEXTS_DEDUPLICATION_STATE_STORE_NAME = "next_deduplication"; + private static final String EXECUTION_DELAY_STATE_STORE_NAME = "execution_delay"; @Inject private KafkaAdminService kafkaAdminService; @@ -78,6 +75,9 @@ public class ExecutorMain implements KafkaExecutorInterface { @Inject private KafkaFlowExecutor kafkaFlowExecutor; + @Inject + private ExecutionService executionService; + public StreamsBuilder topology() { StreamsBuilder builder = new KafkaStreamsBuilder(); @@ -109,6 +109,13 @@ public StreamsBuilder topology() { Serdes.String() )); + // Execution delay + builder.addStateStore(Stores.windowStoreBuilder( + Stores.persistentWindowStore(EXECUTION_DELAY_STATE_STORE_NAME, Duration.ofDays(7), Duration.ofSeconds(1), false), + Serdes.String(), + JsonSerde.of(ExecutionDelay.class) + )); + // declare common stream KStream workerTaskResultKStream = this.workerTaskResultKStream(builder); KStream executorKStream = this.executorKStream(builder); @@ -126,8 +133,9 @@ public StreamsBuilder topology() { stream = this.handleExecutor(stream); // save execution - this.toExecution(stream); + this.toExecution(stream, "Main"); this.toWorkerTask(stream); + this.handleExecutionDelay(stream); this.toWorkerTaskResult(stream); this.toExecutorFlowTriggerTopic(stream); @@ -248,21 +256,12 @@ private KStream handleExecutor(KStream strea } private void purgeExecutor(KStream stream) { - KStream terminatedWithKilled = stream + KStream terminated = stream .filter( - (key, value) -> conditionService.isTerminatedWithListeners(value.getFlow(), value.getExecution()), + (key, value) -> executorService.canBePurged(value), Named.as("PurgeExecutor.filterTerminated") ); - // we don't purge killed execution in order to have feedback about child running tasks - // this can be killed lately (after the executor kill the execution), but we want to keep - // feedback about the actual state (killed or not) - // @TODO: this can lead to infinite state store for most executor topic - KStream terminated = terminatedWithKilled.filter( - (key, value) -> value.getExecution().getState().getCurrent() != State.Type.KILLED, - Named.as("PurgeExecutor.filterKilledExecution") - ); - // clean up executor terminated .mapValues( @@ -332,7 +331,11 @@ private void purgeExecutor(KStream stream) { ); // clean up killed - terminatedWithKilled + terminated + .filter( + (key, value) -> value.getExecution().getState().getCurrent() == State.Type.KILLED, + Named.as("PurgeExecutor.filterKilledToNull") + ) .mapValues( (readOnlyKey, value) -> (ExecutionKilled) null, Named.as("PurgeExecutor.executionKilledToNull") @@ -553,11 +556,28 @@ private void handleWorkerTaskExecution(KTable worke toWorkerTaskResultSend(joinKStream, "HandleWorkerTaskExecution"); } + private void handleExecutionDelay(KStream stream) { + KStream executionDelayStream = stream + .flatMapValues( + (readOnlyKey, value) -> value.getExecutionDelays(), + Named.as("HandleExecutionDelay.flapMap") + ) + .transform( + () -> new ExecutorPausedTransformer(EXECUTION_DELAY_STATE_STORE_NAME, EXECUTOR_STATE_STORE_NAME, executionService), + Named.as("HandleExecutionDelay.transform"), + EXECUTION_DELAY_STATE_STORE_NAME, + EXECUTOR_STATE_STORE_NAME + ) + .filter((key, value) -> value != null, Named.as("HandleExecutionDelay.notNullFilter")); + + toExecution(executionDelayStream, "Delay"); + } + private void toWorkerTaskResult(KStream stream) { KStream workerTaskResultKStream = stream .flatMapValues( (readOnlyKey, value) -> value.getWorkerTaskResults(), - Named.as("HandleWorkerTaskResult.flapMap") + Named.as("ToWorkerTaskResult.flapMap") ); toWorkerTaskResultSend(workerTaskResultKStream, "HandleWorkerTaskResult"); @@ -603,40 +623,43 @@ private void purgeWorkerRunning(KStream workerTaskResu ); } - private void toExecution(KStream stream) { + private void toExecution(KStream stream, String name) { KStream streamFrom = stream - .filter((key, value) -> value.isExecutionUpdated(), Named.as("ToExecution.haveFrom")) + .filter((key, value) -> value.isExecutionUpdated(), Named.as(name + "ToExecution.haveFrom")) .transformValues( ExecutorAddHeaderTransformer::new, - Named.as("ToExecution.addHeaders") + Named.as(name + "ToExecution.addHeaders") ); // send execution KStream executionKStream = streamFrom - .filter((key, value) -> value.getException() == null, Named.as("ToExecutionExecution.notException")); + .filter((key, value) -> value.getException() == null, Named.as(name + "ToExecutionExecution.notException")); - toExecutionSend(executionKStream, "ToExecutionExecution"); + toExecutionSend(executionKStream, name + "ToExecutionExecution"); // send exception KStream> failedStream = streamFrom - .filter((key, value) -> value.getException() != null, Named.as("ToExecutionException.isException")) + .filter((key, value) -> value.getException() != null, Named.as(name + "ToExecutionException.isException")) .mapValues( e -> Pair.of(e, e.getExecution().failedExecutionFromExecutor(e.getException())), - Named.as("ToExecutionException.mapToFailedExecutionWithLog") + Named.as(name + "ToExecutionException.mapToFailedExecutionWithLog") ); failedStream - .flatMapValues(e -> e.getRight().getLogs(), Named.as("ToExecutionException.flatmapLogs")) - .selectKey((key, value) -> (String)null, Named.as("ToExecutionException.removeKey")) + .flatMapValues(e -> e.getRight().getLogs(), Named.as(name + "ToExecutionException.flatmapLogs")) + .selectKey((key, value) -> (String)null, Named.as(name + "ToExecutionException.removeKey")) .to( kafkaAdminService.getTopicName(LogEntry.class), - Produced.with(Serdes.String(), JsonSerde.of(LogEntry.class)).withName("ToExecutionException.toLogEntry") + Produced.with(Serdes.String(), JsonSerde.of(LogEntry.class)).withName(name + "ToExecutionException.toLogEntry") ); KStream executorFailedKStream = failedStream - .mapValues(e -> e.getLeft().withExecution(e.getRight().getExecution(), "failedExecutionFromExecutor"), Named.as("ToExecutionException.mapToExecutor")); + .mapValues( + e -> e.getLeft().withExecution(e.getRight().getExecution(), "failedExecutionFromExecutor"), + Named.as(name + "ToExecutionException.mapToExecutor") + ); - toExecutionSend(executorFailedKStream, "ToExecutionException"); + toExecutionSend(executorFailedKStream, name + "ToExecutionException"); } private void toExecutionSend(KStream stream, String from) { diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/streams/ExecutorPausedTransformer.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/streams/ExecutorPausedTransformer.java new file mode 100644 index 00000000000..62020957cf7 --- /dev/null +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/streams/ExecutorPausedTransformer.java @@ -0,0 +1,93 @@ +package io.kestra.runner.kafka.streams; + +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.flows.State; +import io.kestra.core.runners.ExecutionDelay; +import io.kestra.core.runners.Executor; +import io.kestra.core.services.ExecutionService; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.WindowStore; + +import java.time.Duration; +import java.time.Instant; + +@Slf4j +public class ExecutorPausedTransformer implements Transformer> { + private final String storeName; + private final String storeExecutorName; + private final ExecutionService executionService; + + private WindowStore store; + + private KeyValueStore storeExecutor; + + public ExecutorPausedTransformer(String storeName, String storeExecutorName, ExecutionService executionService) { + this.storeName = storeName; + this.storeExecutorName = storeExecutorName; + this.executionService = executionService; + } + + @Override + public void init(ProcessorContext context) { + this.store = context.getStateStore(this.storeName); + this.storeExecutor = context.getStateStore(this.storeExecutorName); + + context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, (timestamp) -> { + try (final KeyValueIterator, ExecutionDelay> iter = store.fetchAll(Instant.EPOCH.toEpochMilli(), timestamp)) { + while (iter.hasNext()) { + final KeyValue, ExecutionDelay> entry = iter.next(); + + Executor executor = this.storeExecutor.get(entry.key.key()); + + if (executor == null) { + log.warn("Unable to find execution '" + entry.key.key() + "', cannot restart pause!"); + } else if (executor.getExecution().getState().getCurrent() != State.Type.PAUSED) { + log.debug("Execution '" + entry.key.key() + "' is not paused (" + executor.getExecution().getState().getCurrent() + "), skipping!"); + } else { + try { + Execution markAsExecution = executionService.markAs( + executor.getExecution(), + entry.value.getTaskRunId(), + State.Type.RUNNING + ); + + context.forward( + entry.key.key(), + new Executor(markAsExecution, null) + .withExecution(markAsExecution, "pausedRestart") + ); + + } catch (Exception e) { + context.forward( + entry.key.key(), + new Executor(executor.getExecution(), null) + .withException(e, "pausedRestart") + ); + } + } + + store.put(entry.key.key(), null, entry.value.getDate().toEpochMilli()); + } + } + }); + } + + @Override + public KeyValue transform(String key, ExecutionDelay value) { + store.put(key, value, value.getDate().toEpochMilli()); + + return null; + } + + @Override + public void close() { + + } +} diff --git a/runner-kafka/src/test/java/io/kestra/runner/kafka/KafkaExecutorTest.java b/runner-kafka/src/test/java/io/kestra/runner/kafka/KafkaExecutorTest.java index de2a6ceb323..8be37e34f3b 100644 --- a/runner-kafka/src/test/java/io/kestra/runner/kafka/KafkaExecutorTest.java +++ b/runner-kafka/src/test/java/io/kestra/runner/kafka/KafkaExecutorTest.java @@ -38,6 +38,7 @@ import java.io.IOException; import java.net.URISyntaxException; +import java.time.Duration; import java.util.*; import static org.hamcrest.MatcherAssert.assertThat; @@ -551,6 +552,33 @@ void multipleTrigger() { assertThat(triggerExecution.getFlowId(), is("trigger-multiplecondition-listener")); } + @Test + void paused() { + startStream(this.executorStore); + + Flow flow = flowRepository.findById("io.kestra.tests", "pause-delay").orElseThrow(); + this.flowInput().pipeInput(flow.uid(), flow); + + + startStream(this.executorMain); + + Execution execution = createExecution(flow); + assertThat(execution.getState().getCurrent(), is(State.Type.CREATED)); + + execution = executionOutput().readRecord().getValue(); + execution = executionOutput().readRecord().getValue(); + execution = executionOutput().readRecord().getValue(); + + assertThat(execution.getState().getCurrent(), is(State.Type.PAUSED)); + this.testTopology.advanceWallClockTime(Duration.ofSeconds(10)); + + execution = executionOutput().readRecord().getValue(); + assertThat(execution.getState().getCurrent(), is(State.Type.RESTARTED)); + + execution = executionOutput().readRecord().getValue(); + assertThat(execution.getState().getCurrent(), is(State.Type.RUNNING)); + } + @Test void workerRebalanced() { startStream(this.executorStore); diff --git a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java index 00d85514ceb..9f52548d20e 100644 --- a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java +++ b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java @@ -12,14 +12,20 @@ import io.kestra.core.repositories.FlowRepositoryInterface; import io.kestra.core.runners.*; import io.kestra.core.services.ConditionService; +import io.kestra.core.services.ExecutionService; import io.kestra.core.services.FlowService; import io.kestra.core.services.TaskDefaultService; import io.kestra.core.tasks.flows.Template; import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import io.micronaut.context.ApplicationContext; @@ -36,6 +42,7 @@ public class MemoryExecutor implements ExecutorInterface { private static final ConcurrentHashMap EXECUTIONS = new ConcurrentHashMap<>(); private static final ConcurrentHashMap WORKERTASKEXECUTIONS_WATCHER = new ConcurrentHashMap<>(); private List allFlows; + private final ScheduledExecutorService schedulerDelay = Executors.newSingleThreadScheduledExecutor(); @Inject private ApplicationContext applicationContext; @@ -80,6 +87,9 @@ public class MemoryExecutor implements ExecutorInterface { @Inject private MetricRegistry metricRegistry; + @Inject + private ExecutionService executionService; + @Override public void run() { applicationContext.registerSingleton(new MemoryFlowExecutor(this.flowRepository)); @@ -111,8 +121,7 @@ private Flow transform(Flow flow, Execution execution) { private void handleExecution(ExecutionState state) { synchronized (this) { - Flow flow = this.flowRepository.findByExecution(state.execution); - flow = transform(flow, state.execution); + final Flow flow = transform(this.flowRepository.findByExecution(state.execution), state.execution); Execution execution = state.execution; Executor executor = new Executor(execution, null).withFlow(flow); @@ -160,6 +169,38 @@ private void handleExecution(ExecutionState state) { .forEach(workerTaskResultQueue::emit); } + if (executor.getExecutionDelays().size() > 0) { + executor.getExecutionDelays() + .forEach(workerTaskResultDelay -> { + long between = ChronoUnit.MICROS.between(Instant.now(), workerTaskResultDelay.getDate()); + + if (between <= 0) { + between = 1; + } + + schedulerDelay.schedule( + () -> { + try { + ExecutionState executionState = EXECUTIONS.get(workerTaskResultDelay.getExecutionId()); + + Execution markAsExecution = executionService.markAs( + executionState.execution, + workerTaskResultDelay.getTaskRunId(), + State.Type.RUNNING + ); + + executionQueue.emit(markAsExecution); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, + between, + TimeUnit.MICROSECONDS + ); + }); + } + + if (executor.getWorkerTaskExecutions().size() > 0) { executor.getWorkerTaskExecutions() .forEach(workerTaskExecution -> { @@ -245,7 +286,7 @@ private void toExecution(Executor executor) { this.handleExecution(saveExecution(executor.getExecution())); // delete if ended - if (conditionService.isTerminatedWithListeners(executor.getFlow(), executor.getExecution())) { + if (executorService.canBePurged(executor)) { EXECUTIONS.remove(executor.getExecution().getId()); } } diff --git a/ui/src/utils/state.js b/ui/src/utils/state.js index b0c089ed2a9..b564dcfc313 100644 --- a/ui/src/utils/state.js +++ b/ui/src/utils/state.js @@ -79,7 +79,7 @@ const STATE = Object.freeze({ colorClass: "purple", color: "#6d81f5", icon: "pause-circle", - isRunning: false, + isRunning: true, isKillable: false, isFailed: false, } diff --git a/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java b/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java index d78c4c74626..680e37a5b05 100644 --- a/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java +++ b/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java @@ -511,6 +511,11 @@ public HttpResponse kill(String executionId) { return HttpResponse.noContent(); } + private boolean isStopFollow(Flow flow, Execution execution) { + return conditionService.isTerminatedWithListeners(flow, execution) && + execution.getState().getCurrent() != State.Type.PAUSED; + } + /** * Trigger a new execution for current flow and follow execution * @@ -531,7 +536,7 @@ public Flowable> follow(String executionId) { ); Flow flow = flowRepository.findByExecution(execution); - if (conditionService.isTerminatedWithListeners(flow, execution)) { + if (this.isStopFollow(flow, execution)) { emitter.onNext(Event.of(execution).id("end")); emitter.onComplete(); return; @@ -546,7 +551,7 @@ public Flowable> follow(String executionId) { emitter.onNext(Event.of(current).id("progress")); - if (conditionService.isTerminatedWithListeners(flow, current)) { + if (this.isStopFollow(flow, execution)) { emitter.onNext(Event.of(current).id("end")); emitter.onComplete(); }