Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #3594 from manan164/join_task_fix
Browse files Browse the repository at this point in the history
Join task callBackDuration
  • Loading branch information
v1r3n authored Jun 8, 2023
2 parents 04b8747 + f29657b commit e11a3e7
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,16 @@ public void execute(WorkflowSystemTask systemTask, String taskId) {
LOGGER.debug("{} removed from queue: {}", task, queueName);
} else {
task.setCallbackAfterSeconds(systemTaskCallbackTime);
systemTask
.getEvaluationOffset(task, systemTaskCallbackTime)
.ifPresentOrElse(
task::setCallbackAfterSeconds,
() -> task.setCallbackAfterSeconds(systemTaskCallbackTime));
queueDAO.postpone(
queueName,
task.getTaskId(),
task.getWorkflowPriority(),
systemTaskCallbackTime);
task.getCallbackAfterSeconds());
LOGGER.debug("{} postponed in queue: {}", task, queueName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package com.netflix.conductor.core.execution.tasks;

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import org.springframework.stereotype.Component;
Expand Down Expand Up @@ -98,6 +99,15 @@ public boolean execute(
return false;
}

@Override
public Optional<Long> getEvaluationOffset(TaskModel taskModel, long defaultOffset) {
int index = taskModel.getPollCount() > 0 ? taskModel.getPollCount() - 1 : 0;
if (index == 0) {
return Optional.of(0L);
}
return Optional.of(Math.min((long) Math.pow(2, index), defaultOffset));
}

public boolean isAsync() {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ public boolean execute(
*/
public void cancel(WorkflowModel workflow, TaskModel task, WorkflowExecutor workflowExecutor) {}

public Optional<Long> getEvaluationOffset(TaskModel taskModel, long defaultOffset) {
return Optional.empty();
}

/**
* @return True if the task is supposed to be started asynchronously using internal queues.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ class AsyncSystemTaskExecutorTest extends Specification {
taskDefName: "taskDefName", workflowPriority: 10)
WorkflowModel workflow = new WorkflowModel(workflowId: workflowId, status: WorkflowModel.Status.RUNNING)
String queueName = QueueUtils.getQueueName(task)
workflowSystemTask.getEvaluationOffset(task, 1) >> Optional.empty();


when:
executor.execute(workflowSystemTask, taskId)
Expand Down

0 comments on commit e11a3e7

Please sign in to comment.