Skip to content

Commit

Permalink
feat(core): handle timeout on Pause task (#1040)
Browse files Browse the repository at this point in the history
close #1039
  • Loading branch information
tchiotludo authored Mar 3, 2023
1 parent 10c5d65 commit c8d32f0
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 20 deletions.
3 changes: 3 additions & 0 deletions core/src/main/java/io/kestra/core/runners/ExecutionDelay.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.core.runners;

import io.kestra.core.models.flows.State;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Value;
Expand All @@ -19,4 +20,6 @@ public class ExecutionDelay {

@NotNull
Instant date;

@NotNull State.Type state;
}
Original file line number Diff line number Diff line change
Expand Up @@ -386,11 +386,12 @@ private Executor handlePausedDelay(Executor executor, List<WorkerTaskResult> wor
if (task instanceof Pause) {
Pause pauseTask = (Pause) task;

if (pauseTask.getDelay() != null) {
if (pauseTask.getDelay() != null || pauseTask.getTimeout() != null) {
return ExecutionDelay.builder()
.taskRunId(workerTaskResult.getTaskRun().getId())
.executionId(executor.getExecution().getId())
.date(workerTaskResult.getTaskRun().getState().maxDate().plus(pauseTask.getDelay()))
.date(workerTaskResult.getTaskRun().getState().maxDate().plus(pauseTask.getDelay() != null ? pauseTask.getDelay() : pauseTask.getTimeout()))
.state(pauseTask.getDelay() != null ? State.Type.RUNNING : State.Type.FAILED)
.build();
}
}
Expand Down
9 changes: 8 additions & 1 deletion core/src/main/java/io/kestra/core/tasks/flows/Pause.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,18 @@
public class Pause extends Sequential implements FlowableTask<VoidOutput> {
@Schema(
title = "Duration of the pause.",
description = "If null, a manual approval is need, if not, the delay before automatically continue the execution"
description = "If null and no timeout, a manual approval is needed, if not, the delay before continuing the execution"
)
@PluginProperty
private Duration delay;

@Schema(
title = "Timeout of the pause.",
description = "If null and no delay, a manual approval is needed, else a manual approval is needed before the timeout or the task will fail"
)
@PluginProperty
private Duration timeout;

@Override
public List<NextTaskRun> resolveNexts(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
if (this.needPause(parentTaskRun) || parentTaskRun.getState().getCurrent() == State.Type.PAUSED) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/java/io/kestra/core/Helpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.function.Consumer;

public class Helpers {
public static long FLOWS_COUNT = 52;
public static long FLOWS_COUNT = 53;

public static ApplicationContext applicationContext() throws URISyntaxException {
return applicationContext(
Expand Down
28 changes: 26 additions & 2 deletions core/src/test/java/io/kestra/core/tasks/flows/PauseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,15 @@ void run() throws Exception {
}

@Test
void failed() throws Exception {
void delay() throws Exception {
suite.runDelay(runnerUtils);
}

@Test
void timeout() throws Exception {
suite.runTimeout(runnerUtils);
}

@Singleton
public static class Suite {
@Inject
Expand Down Expand Up @@ -87,6 +92,25 @@ public void runDelay(RunnerUtils runnerUtils) throws Exception {
assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.RUNNING).count(), is(2L));
assertThat(execution.getTaskRunList(), hasSize(3));
}
}


public void runTimeout(RunnerUtils runnerUtils) throws Exception {
Execution execution = runnerUtils.runOne("io.kestra.tests", "pause-timeout");

assertThat(execution.getState().getCurrent(), is(State.Type.PAUSED));
assertThat(execution.getTaskRunList().get(0).getState().getCurrent(), is(State.Type.PAUSED));
assertThat(execution.getTaskRunList(), hasSize(1));

execution = runnerUtils.awaitExecution(
e -> e.getState().getCurrent() == State.Type.FAILED,
() -> {},
Duration.ofSeconds(30)
);

assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.PAUSED).count(), is(1L));
assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.RUNNING).count(), is(1L));
assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.FAILED).count(), is(1L));
assertThat(execution.getTaskRunList(), hasSize(1));
}
}
}
15 changes: 15 additions & 0 deletions core/src/test/resources/flows/valids/pause-timeout.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
id: pause-timeout
namespace: io.kestra.tests

tasks:
- id: pause
type: io.kestra.core.tasks.flows.Pause
timeout: PT1S
tasks:
- id: ko
type: io.kestra.core.tasks.scripts.Bash
commands:
- echo "trigger 1 seconds pause"
- id: last
type: io.kestra.core.tasks.debugs.Return
format: "{{task.id}} > {{taskrun.startDate}}"
15 changes: 9 additions & 6 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -443,13 +443,16 @@ private void executionDelaySend() {
Executor executor = new Executor(pair.getLeft(), null);

try {
Execution markAsExecution = executionService.markAs(
pair.getKey(),
executionDelay.getTaskRunId(),
State.Type.RUNNING
);
if (executor.getExecution().findTaskRunByTaskRunId(executionDelay.getTaskRunId()).getState().getCurrent() == State.Type.PAUSED) {

Execution markAsExecution = executionService.markAs(
pair.getKey(),
executionDelay.getTaskRunId(),
executionDelay.getState()
);

executor = executor.withExecution(markAsExecution, "pausedRestart");
executor = executor.withExecution(markAsExecution, "pausedRestart");
}
} catch (Exception e) {
executor = handleFailedExecutionFromExecutor(executor, e);
}
Expand Down
5 changes: 5 additions & 0 deletions jdbc/src/test/java/io/kestra/jdbc/runner/JdbcRunnerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -227,4 +227,9 @@ public void pauseRun() throws Exception {
public void pauseRunDelay() throws Exception {
pauseTest.runDelay(runnerUtils);
}

@Test
public void pauseRunTimeout() throws Exception {
pauseTest.runTimeout(runnerUtils);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,15 @@ private void handleExecution(ExecutionState state) {
try {
ExecutionState executionState = EXECUTIONS.get(workerTaskResultDelay.getExecutionId());

Execution markAsExecution = executionService.markAs(
executionState.execution,
workerTaskResultDelay.getTaskRunId(),
State.Type.RUNNING
);

executionQueue.emit(markAsExecution);
if (executionState.execution.findTaskRunByTaskRunId(workerTaskResultDelay.getTaskRunId()).getState().getCurrent() == State.Type.PAUSED) {
Execution markAsExecution = executionService.markAs(
executionState.execution,
workerTaskResultDelay.getTaskRunId(),
workerTaskResultDelay.getState()
);

executionQueue.emit(markAsExecution);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ void exportByQuery() throws IOException {
Files.write(file.toPath(), zip);

try (ZipFile zipFile = new ZipFile(file)) {
assertThat(zipFile.stream().count(), is(52L));
assertThat(zipFile.stream().count(), is(Helpers.FLOWS_COUNT));
}

file.delete();
Expand Down

0 comments on commit c8d32f0

Please sign in to comment.