Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Join task callBackDuration #3594

Merged
merged 3 commits into from
Jun 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,16 @@ public void execute(WorkflowSystemTask systemTask, String taskId) {
LOGGER.debug("{} removed from queue: {}", task, queueName);
} else {
task.setCallbackAfterSeconds(systemTaskCallbackTime);
systemTask
.getEvaluationOffset(task, systemTaskCallbackTime)
.ifPresentOrElse(
task::setCallbackAfterSeconds,
() -> task.setCallbackAfterSeconds(systemTaskCallbackTime));
queueDAO.postpone(
queueName,
task.getTaskId(),
task.getWorkflowPriority(),
systemTaskCallbackTime);
task.getCallbackAfterSeconds());
LOGGER.debug("{} postponed in queue: {}", task, queueName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package com.netflix.conductor.core.execution.tasks;

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import org.springframework.stereotype.Component;
Expand Down Expand Up @@ -98,6 +99,15 @@ public boolean execute(
return false;
}

@Override
public Optional<Long> getEvaluationOffset(TaskModel taskModel, long defaultOffset) {
int index = taskModel.getPollCount() > 0 ? taskModel.getPollCount() - 1 : 0;
if (index == 0) {
return Optional.of(0L);
}
return Optional.of(Math.min((long) Math.pow(2, index), defaultOffset));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if someone has their systemTaskWorkerCallbackDuration to one second though? It will always still be 1 second, so this won't be an exponential backoff in that scenario. It feels like defaultOffset should be a configurable maximum offset or something like that. Maybe the default 30 seconds?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @james-deee , I think if we are setting systemTaskWorkerCallbackDuration to 1 second then ideally we don't want the exponential backoff thing. Here we are trying to exponential backoff from 1 to 30 seconds.

Copy link
Contributor

@james-deee james-deee May 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@manan164 One hand, I see what you're saying, but in practice what we are seeing is that having 1 second doesn't alleviate the problem. The issue is that we have a lot of long running workflows (thus JOINs) in the queue. If we do NOT do an exponential backoff from our setting of 1 second, then our JOIN tasks don't get processed fast enough because the async task worker is trying to process say 10ks JOINs.......... which then makes JOINs that you want to process immediately be backed up and slowed down.

With my proposal, and with what we changed locally, fixes all of our issues with processing the async JOINS.

In our situation we use a duration of 1 second, but we have a cap of 30 seconds in place of that defaultOffset and it completely solves our issue. The result of it is that JOINs on fast running workflows get processed almost instantaneously, and JOINs in the long running workflows get pushed back to the max of 30 seconds..... which is what frees up the processing of the immediate ones.

I really think these are 2 different levers/values that need to be present. Just using 1 second always won't work in heavy workfload Conductor instances where there might be a LOT of joins to process.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe what you're saying is that we should be using a setting of 30 seconds for the systemTaskWorkerCallbackDuration always, and then it sounds like no one should ever use 1 second for the systemTaskWorkerCallbackDuration value.

Then, I think that this could get confusing, because it would feel like someone would want to set the value to 1 for fast processing, but they wouldnt know that actually using 30 seconds will get the fastest/best processing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@manan164 maybe a compromise is to use a hardcoded 30 seconds for that defaultOffset. I really think it will be confusing and could cause issues for people to use 1 second for that setting then, because that doesn't actually give them the best performance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @james-deee , The expectation here is when we use systemTaskWorkerCallbackDuration to 30 seconds ideally we are fine with tasks getting checked every 30 seconds and getting completed but if tasks are getting checked at 1,2,4,8,16 seconds and get completed that should also be fine. But when we use systemTaskWorkerCallbackDuration to 1 seconds which means I want my task to get checked every second. Hope this answers the question.

}

public boolean isAsync() {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ public boolean execute(
*/
public void cancel(WorkflowModel workflow, TaskModel task, WorkflowExecutor workflowExecutor) {}

public Optional<Long> getEvaluationOffset(TaskModel taskModel, long defaultOffset) {
return Optional.empty();
}

/**
* @return True if the task is supposed to be started asynchronously using internal queues.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ class AsyncSystemTaskExecutorTest extends Specification {
taskDefName: "taskDefName", workflowPriority: 10)
WorkflowModel workflow = new WorkflowModel(workflowId: workflowId, status: WorkflowModel.Status.RUNNING)
String queueName = QueueUtils.getQueueName(task)
workflowSystemTask.getEvaluationOffset(task, 1) >> Optional.empty();


when:
executor.execute(workflowSystemTask, taskId)
Expand Down