Skip to content

Commit

Permalink
feat(core): add a delay on Pause task in order to restart automatical…
Browse files Browse the repository at this point in the history
…ly after some delay (#549)

close #542
  • Loading branch information
tchiotludo committed Apr 11, 2022
1 parent a481bdd commit b2b0bbf
Show file tree
Hide file tree
Showing 14 changed files with 358 additions and 46 deletions.
8 changes: 8 additions & 0 deletions core/src/main/java/io/kestra/core/models/flows/State.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ public String humanDuration() {
}
}

public Instant maxDate() {
if (this.histories.size() == 0) {
return Instant.now();
}

return this.histories.get(this.histories.size() - 1).getDate();
}

@JsonIgnore
public boolean isTerninated() {
return this.current.isTerninated();
Expand Down
22 changes: 22 additions & 0 deletions core/src/main/java/io/kestra/core/runners/ExecutionDelay.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.kestra.core.runners;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Value;

import java.time.Instant;
import javax.validation.constraints.NotNull;

@Value
@AllArgsConstructor
@Builder
public class ExecutionDelay {
@NotNull
String taskRunId;

@NotNull
String executionId;

@NotNull
Instant date;
}
8 changes: 8 additions & 0 deletions core/src/main/java/io/kestra/core/runners/Executor.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class Executor {
private final List<TaskRun> nexts = new ArrayList<>();
private final List<WorkerTask> workerTasks = new ArrayList<>();
private final List<WorkerTaskResult> workerTaskResults = new ArrayList<>();
private final List<ExecutionDelay> executionDelays = new ArrayList<>();
private WorkerTaskResult joined;
private final List<WorkerTaskExecution> workerTaskExecutions = new ArrayList<>();

Expand Down Expand Up @@ -78,6 +79,13 @@ public Executor withWorkerTaskResults(List<WorkerTaskResult> workerTaskResults,
return this;
}

public Executor withWorkerTaskDelays(List<ExecutionDelay> executionDelays, String from) {
this.executionDelays.addAll(executionDelays);
this.from.add(from);

return this;
}

public Executor withWorkerTaskExecutions(List<WorkerTaskExecution> newExecutions, String from) {
this.workerTaskExecutions.addAll(newExecutions);
this.from.add(from);
Expand Down
47 changes: 47 additions & 0 deletions core/src/main/java/io/kestra/core/runners/ExecutorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.services.ConditionService;
import io.kestra.core.tasks.flows.Pause;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
Expand All @@ -20,6 +21,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -368,9 +370,42 @@ private Executor handleChildWorkerTaskResult(Executor executor) throws InternalE
return executor;
}

executor = this.handlePausedDelay(executor, list);

return executor.withWorkerTaskResults(list, "handleChildWorkerTaskResult");
}

private Executor handlePausedDelay(Executor executor, List<WorkerTaskResult> workerTaskResults) throws InternalException {
List<ExecutionDelay> list = workerTaskResults
.stream()
.filter(workerTaskResult -> workerTaskResult.getTaskRun().getState().getCurrent() == State.Type.PAUSED)
.map(throwFunction(workerTaskResult -> {
Task task = executor.getFlow().findTaskByTaskId(workerTaskResult.getTaskRun().getTaskId());

if (task instanceof Pause) {
Pause pauseTask = (Pause) task;

if (pauseTask.getDelay() != null) {
return ExecutionDelay.builder()
.taskRunId(workerTaskResult.getTaskRun().getId())
.executionId(executor.getExecution().getId())
.date(workerTaskResult.getTaskRun().getState().maxDate().plus(pauseTask.getDelay()))
.build();
}
}

return null;
}))
.filter(Objects::nonNull)
.collect(Collectors.toList());

if (list.size() == 0) {
return executor;
}

return executor.withWorkerTaskDelays(list, "handlePausedDelay");
}

private Executor handleChildWorkerCreatedKilling(Executor executor) throws InternalException {
if (executor.getExecution().getTaskRunList() == null || executor.getExecution().getState().getCurrent() != State.Type.KILLING) {
return executor;
Expand Down Expand Up @@ -566,6 +601,18 @@ private Executor handleFlowTask(final Executor executor) {
return resultExecutor;
}

public boolean canBePurged(final Executor executor) {
return conditionService.isTerminatedWithListeners(executor.getFlow(), executor.getExecution())
// we don't purge pause execution in order to be able to restart automatically in case of delay
&& executor.getExecution().getState().getCurrent() != State.Type.PAUSED
// we don't purge killed execution in order to have feedback about child running tasks
// this can be killed lately (after the executor kill the execution), but we want to keep
// feedback about the actual state (killed or not)
// @TODO: this can lead to infinite state store for most executor topic
&& executor.getExecution().getState().getCurrent() != State.Type.KILLED;

}

public void log(Logger log, Boolean in, WorkerTask value) {
log.debug(
"{} {} : {}",
Expand Down
9 changes: 8 additions & 1 deletion core/src/main/java/io/kestra/core/tasks/flows/Pause.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import lombok.ToString;
import lombok.experimental.SuperBuilder;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand All @@ -28,7 +29,7 @@
@Getter
@NoArgsConstructor
@Schema(
title = "Pause current execution and wait for a manual approval"
title = "Pause current execution and wait for a manual approval or a delay"
)
@Plugin(
examples = {
Expand All @@ -54,6 +55,12 @@
}
)
public class Pause extends Sequential implements FlowableTask<VoidOutput> {
@Schema(
title = "Duration of the pause.",
description = "If null, a manual approval is need, if not, the delay before automatically continue the execution"
)
private Duration delay;

@Override
public List<NextTaskRun> resolveNexts(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
if (this.needPause(parentTaskRun) || parentTaskRun.getState().getCurrent() == State.Type.PAUSED) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/java/io/kestra/core/Helpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.function.Consumer;

public class Helpers {
public static long FLOWS_COUNT = 48;
public static long FLOWS_COUNT = 49;

public static ApplicationContext applicationContext() throws URISyntaxException {
return applicationContext(
Expand Down
27 changes: 21 additions & 6 deletions core/src/test/java/io/kestra/core/tasks/flows/PauseTest.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package io.kestra.core.tasks.flows;

import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
Expand All @@ -10,14 +8,12 @@
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.AbstractMemoryRunnerTest;
import io.kestra.core.services.ExecutionService;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeoutException;

import jakarta.inject.Inject;
import jakarta.inject.Named;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
Expand Down Expand Up @@ -57,4 +53,23 @@ void run() throws Exception {

assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
}

@Test
void runDelay() throws Exception {
Execution execution = runnerUtils.runOne("io.kestra.tests", "pause-delay");

assertThat(execution.getState().getCurrent(), is(State.Type.PAUSED));
assertThat(execution.getTaskRunList().get(0).getState().getCurrent(), is(State.Type.PAUSED));
assertThat(execution.getTaskRunList(), hasSize(1));

execution = runnerUtils.awaitExecution(
e -> e.getState().getCurrent() == State.Type.SUCCESS,
() -> {},
Duration.ofSeconds(30)
);

assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.PAUSED).count(), is(1L));
assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.RUNNING).count(), is(2L));
assertThat(execution.getTaskRunList(), hasSize(3));
}
}
15 changes: 15 additions & 0 deletions core/src/test/resources/flows/valids/pause-delay.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
id: pause-delay
namespace: io.kestra.tests

tasks:
- id: pause
type: io.kestra.core.tasks.flows.Pause
delay: PT1S
tasks:
- id: ko
type: io.kestra.core.tasks.scripts.Bash
commands:
- echo "trigger 1 seconds pause"
- id: last
type: io.kestra.core.tasks.debugs.Return
format: "{{task.id}} > {{taskrun.startDate}}"
Loading

0 comments on commit b2b0bbf

Please sign in to comment.