diff --git a/CHANGELOG.md b/CHANGELOG.md index beb97c8efc..0c4cb63117 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -131,6 +131,7 @@ Changes to configurations: | workflow.executor.service.max.threads | conductor.app.executorServiceMaxThreadCount | 50 | | decider.sweep.frequency.seconds | conductor.app.sweepFrequency | 30s | | workflow.sweeper.thread.count | conductor.app.sweeperThreadCount | 5 | +| - | conductor.app.sweeperWorkflowPollTimeout | 2000ms | | workflow.event.processor.thread.count | conductor.app.eventProcessorThreadCount | 2 | | workflow.event.message.indexing.enabled | conductor.app.eventMessageIndexingEnabled | true | | workflow.event.execution.indexing.enabled | conductor.app.eventExecutionIndexingEnabled | true | diff --git a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java index fca20a88e3..c54b7a9de6 100644 --- a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java +++ b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java @@ -45,6 +45,9 @@ public class ConductorProperties { /** The number of threads to use to do background sweep on active workflows. */ private int sweeperThreadCount = Runtime.getRuntime().availableProcessors() * 2; + /** The timeout (in milliseconds) for the polling of workflows to be swept. */ + private Duration sweeperWorkflowPollTimeout = Duration.ofMillis(2000); + /** The number of threads to configure the threadpool in the event processor. */ private int eventProcessorThreadCount = 2; @@ -250,6 +253,14 @@ public void setSweeperThreadCount(int sweeperThreadCount) { this.sweeperThreadCount = sweeperThreadCount; } + public Duration getSweeperWorkflowPollTimeout() { + return sweeperWorkflowPollTimeout; + } + + public void setSweeperWorkflowPollTimeout(Duration sweeperWorkflowPollTimeout) { + this.sweeperWorkflowPollTimeout = sweeperWorkflowPollTimeout; + } + public int getEventProcessorThreadCount() { return eventProcessorThreadCount; } diff --git a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowReconciler.java b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowReconciler.java index 05086f341e..d5e8fe5db2 100644 --- a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowReconciler.java +++ b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowReconciler.java @@ -42,6 +42,7 @@ public class WorkflowReconciler extends LifecycleAwareComponent { private final WorkflowSweeper workflowSweeper; private final QueueDAO queueDAO; private final int sweeperThreadCount; + private final int sweeperWorkflowPollTimeout; private static final Logger LOGGER = LoggerFactory.getLogger(WorkflowReconciler.class); @@ -50,6 +51,8 @@ public WorkflowReconciler( this.workflowSweeper = workflowSweeper; this.queueDAO = queueDAO; this.sweeperThreadCount = properties.getSweeperThreadCount(); + this.sweeperWorkflowPollTimeout = + (int) properties.getSweeperWorkflowPollTimeout().toMillis(); LOGGER.info( "WorkflowReconciler initialized with {} sweeper threads", properties.getSweeperThreadCount()); @@ -63,7 +66,8 @@ public void pollAndSweep() { if (!isRunning()) { LOGGER.debug("Component stopped, skip workflow sweep"); } else { - List workflowIds = queueDAO.pop(DECIDER_QUEUE, sweeperThreadCount, 2000); + List workflowIds = + queueDAO.pop(DECIDER_QUEUE, sweeperThreadCount, sweeperWorkflowPollTimeout); if (workflowIds != null) { // wait for all workflow ids to be "swept" CompletableFuture.allOf(