Skip to content

Commit

Permalink
Avoid super-huge thread pool core sizes
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Jan 16, 2025
1 parent b0a78f1 commit 6836ffa
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public interface TaskHandler<TT> {
@Nonnull String identity,
@Nonnull TaskHandler<T> handler,
@Nonnull PollerOptions pollerOptions,
int workerTaskSlots,
int threadPoolMax,
boolean synchronousQueue,
boolean useVirtualThreads) {
this.namespace = Objects.requireNonNull(namespace);
Expand All @@ -75,14 +75,16 @@ public interface TaskHandler<TT> {
t.setUncaughtExceptionHandler(pollerOptions.getUncaughtExceptionHandler());
});
} else {
// Because the thread pool executor will keep allocating threads (even if other threads are
// idle) until it reaches the Core number, we want to limit this to something not completely
// bananas.
int coreMax = Math.min(threadPoolMax, Runtime.getRuntime().availableProcessors() * 2);
ThreadPoolExecutor threadPoolTaskExecutor =
new ThreadPoolExecutor(
// for SynchronousQueue we can afford to set it to 0, because the queue is always full
// or empty for LinkedBlockingQueue we have to set slots to workerTaskSlots to avoid
// situation when the queue grows, but the amount of threads is not, because the queue
// is not (and never) full
synchronousQueue ? 0 : workerTaskSlots,
workerTaskSlots,
// For SynchronousQueue we can afford to set it to 0, because the queue is always full
// or empty
synchronousQueue ? 0 : coreMax,
threadPoolMax,
10,
TimeUnit.SECONDS,
synchronousQueue ? new SynchronousQueue<>() : new LinkedBlockingQueue<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
* at once.
*
* @param <SI> The type of information that will be used to reserve a slot. The three info types are
* {@link WorkflowSlotInfo}, {@link ActivitySlotInfo}, and {@link LocalActivitySlotInfo}.
* {@link WorkflowSlotInfo}, {@link ActivitySlotInfo}, {@link LocalActivitySlotInfo}, and {@link
* NexusSlotInfo}.
*/
@Experimental
public interface SlotSupplier<SI extends SlotInfo> {
Expand Down Expand Up @@ -77,11 +78,11 @@ public interface SlotSupplier<SI extends SlotInfo> {
void releaseSlot(SlotReleaseContext<SI> ctx);

/**
* Because we currently use thread pools to execute tasks, there must be *some* defined
* upper-limit on the size of the thread pool for each kind of task. You must not hand out more
* permits than this number. If unspecified, the default is {@link Integer#MAX_VALUE}. Be aware
* that if your implementation hands out unreasonable numbers of permits, you could easily
* oversubscribe the worker, and cause it to run out of resources.
* Because we use thread pools to execute tasks when virtual threads are not enabled, there must
* be *some* defined upper-limit on the size of the thread pool for each kind of task. You must
* not hand out more permits than this number. If unspecified, the default is {@link
* Integer#MAX_VALUE}. Be aware that if your implementation hands out unreasonable numbers of
* permits, you could easily oversubscribe the worker, and cause it to run out of resources.
*
* <p>If a non-empty value is returned, it is assumed to be meaningful, and the worker will emit
* {@link io.temporal.worker.MetricsType#WORKER_TASK_SLOTS_AVAILABLE} metrics based on this value.
Expand Down

0 comments on commit 6836ffa

Please sign in to comment.