diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/PollTaskExecutor.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/PollTaskExecutor.java index 79a9cf8c0..f7d4f27c8 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/PollTaskExecutor.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/PollTaskExecutor.java @@ -52,7 +52,7 @@ public interface TaskHandler { @Nonnull String identity, @Nonnull TaskHandler handler, @Nonnull PollerOptions pollerOptions, - int workerTaskSlots, + int threadPoolMax, boolean synchronousQueue, boolean useVirtualThreads) { this.namespace = Objects.requireNonNull(namespace); @@ -75,14 +75,16 @@ public interface TaskHandler { t.setUncaughtExceptionHandler(pollerOptions.getUncaughtExceptionHandler()); }); } else { + // Because the thread pool executor will keep allocating threads (even if other threads are + // idle) until it reaches the Core number, we want to limit this to something not completely + // bananas. + int coreMax = Math.min(threadPoolMax, Runtime.getRuntime().availableProcessors() * 2); ThreadPoolExecutor threadPoolTaskExecutor = new ThreadPoolExecutor( - // for SynchronousQueue we can afford to set it to 0, because the queue is always full - // or empty for LinkedBlockingQueue we have to set slots to workerTaskSlots to avoid - // situation when the queue grows, but the amount of threads is not, because the queue - // is not (and never) full - synchronousQueue ? 0 : workerTaskSlots, - workerTaskSlots, + // For SynchronousQueue we can afford to set it to 0, because the queue is always full + // or empty + synchronousQueue ? 0 : coreMax, + threadPoolMax, 10, TimeUnit.SECONDS, synchronousQueue ? new SynchronousQueue<>() : new LinkedBlockingQueue<>()); diff --git a/temporal-sdk/src/main/java/io/temporal/worker/tuning/SlotSupplier.java b/temporal-sdk/src/main/java/io/temporal/worker/tuning/SlotSupplier.java index 37c1f0ab7..fa584d443 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/tuning/SlotSupplier.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/tuning/SlotSupplier.java @@ -30,7 +30,8 @@ * at once. * * @param The type of information that will be used to reserve a slot. The three info types are - * {@link WorkflowSlotInfo}, {@link ActivitySlotInfo}, and {@link LocalActivitySlotInfo}. + * {@link WorkflowSlotInfo}, {@link ActivitySlotInfo}, {@link LocalActivitySlotInfo}, and {@link + * NexusSlotInfo}. */ @Experimental public interface SlotSupplier { @@ -77,11 +78,11 @@ public interface SlotSupplier { void releaseSlot(SlotReleaseContext ctx); /** - * Because we currently use thread pools to execute tasks, there must be *some* defined - * upper-limit on the size of the thread pool for each kind of task. You must not hand out more - * permits than this number. If unspecified, the default is {@link Integer#MAX_VALUE}. Be aware - * that if your implementation hands out unreasonable numbers of permits, you could easily - * oversubscribe the worker, and cause it to run out of resources. + * Because we use thread pools to execute tasks when virtual threads are not enabled, there must + * be *some* defined upper-limit on the size of the thread pool for each kind of task. You must + * not hand out more permits than this number. If unspecified, the default is {@link + * Integer#MAX_VALUE}. Be aware that if your implementation hands out unreasonable numbers of + * permits, you could easily oversubscribe the worker, and cause it to run out of resources. * *

If a non-empty value is returned, it is assumed to be meaningful, and the worker will emit * {@link io.temporal.worker.MetricsType#WORKER_TASK_SLOTS_AVAILABLE} metrics based on this value.