Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #3633 from lsfe87/sweeper_timeout_config
Browse files Browse the repository at this point in the history
Add property for the timeout of sweeper polling for workflows
  • Loading branch information
v1r3n authored Jun 8, 2023
2 parents 4fb9806 + c5343ac commit 04b8747
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ Changes to configurations:
| workflow.executor.service.max.threads | conductor.app.executorServiceMaxThreadCount | 50 |
| decider.sweep.frequency.seconds | conductor.app.sweepFrequency | 30s |
| workflow.sweeper.thread.count | conductor.app.sweeperThreadCount | 5 |
| - | conductor.app.sweeperWorkflowPollTimeout | 2000ms |
| workflow.event.processor.thread.count | conductor.app.eventProcessorThreadCount | 2 |
| workflow.event.message.indexing.enabled | conductor.app.eventMessageIndexingEnabled | true |
| workflow.event.execution.indexing.enabled | conductor.app.eventExecutionIndexingEnabled | true |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public class ConductorProperties {
/** The number of threads to use to do background sweep on active workflows. */
private int sweeperThreadCount = Runtime.getRuntime().availableProcessors() * 2;

/** The timeout (in milliseconds) for the polling of workflows to be swept. */
private Duration sweeperWorkflowPollTimeout = Duration.ofMillis(2000);

/** The number of threads to configure the threadpool in the event processor. */
private int eventProcessorThreadCount = 2;

Expand Down Expand Up @@ -250,6 +253,14 @@ public void setSweeperThreadCount(int sweeperThreadCount) {
this.sweeperThreadCount = sweeperThreadCount;
}

public Duration getSweeperWorkflowPollTimeout() {
return sweeperWorkflowPollTimeout;
}

public void setSweeperWorkflowPollTimeout(Duration sweeperWorkflowPollTimeout) {
this.sweeperWorkflowPollTimeout = sweeperWorkflowPollTimeout;
}

public int getEventProcessorThreadCount() {
return eventProcessorThreadCount;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class WorkflowReconciler extends LifecycleAwareComponent {
private final WorkflowSweeper workflowSweeper;
private final QueueDAO queueDAO;
private final int sweeperThreadCount;
private final int sweeperWorkflowPollTimeout;

private static final Logger LOGGER = LoggerFactory.getLogger(WorkflowReconciler.class);

Expand All @@ -50,6 +51,8 @@ public WorkflowReconciler(
this.workflowSweeper = workflowSweeper;
this.queueDAO = queueDAO;
this.sweeperThreadCount = properties.getSweeperThreadCount();
this.sweeperWorkflowPollTimeout =
(int) properties.getSweeperWorkflowPollTimeout().toMillis();
LOGGER.info(
"WorkflowReconciler initialized with {} sweeper threads",
properties.getSweeperThreadCount());
Expand All @@ -63,7 +66,8 @@ public void pollAndSweep() {
if (!isRunning()) {
LOGGER.debug("Component stopped, skip workflow sweep");
} else {
List<String> workflowIds = queueDAO.pop(DECIDER_QUEUE, sweeperThreadCount, 2000);
List<String> workflowIds =
queueDAO.pop(DECIDER_QUEUE, sweeperThreadCount, sweeperWorkflowPollTimeout);
if (workflowIds != null) {
// wait for all workflow ids to be "swept"
CompletableFuture.allOf(
Expand Down

0 comments on commit 04b8747

Please sign in to comment.