From 44992e3e5762be3cb855c79a2f4597ef70f51ad8 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 20 Aug 2024 11:20:02 -0700 Subject: [PATCH] Fix queue shutdown --- .../LocalActivitySlotSupplierQueue.java | 57 +++++++++++++++---- .../internal/worker/LocalActivityWorker.java | 16 ++++-- .../LocalActivityWorkerNotStartedTest.java | 10 +++- 3 files changed, 65 insertions(+), 18 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivitySlotSupplierQueue.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivitySlotSupplierQueue.java index 4cf18e090c..e2da9d832c 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivitySlotSupplierQueue.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivitySlotSupplierQueue.java @@ -23,14 +23,12 @@ import io.temporal.worker.tuning.LocalActivitySlotInfo; import io.temporal.worker.tuning.SlotPermit; import io.temporal.workflow.Functions; -import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class LocalActivitySlotSupplierQueue { +class LocalActivitySlotSupplierQueue implements Shutdownable { static final class QueuedLARequest { final boolean isRetry; final SlotReservationData data; @@ -47,10 +45,11 @@ static final class QueuedLARequest { private final Semaphore newExecutionsBackpressureSemaphore; private final TrackingSlotSupplier slotSupplier; private final Functions.Proc1 afterReservedCallback; - private final Thread queueThread; + private final ExecutorService queueThreadService; private static final Logger log = LoggerFactory.getLogger(LocalActivitySlotSupplierQueue.class.getName()); private volatile boolean running = true; + private volatile boolean wasEverStarted = false; LocalActivitySlotSupplierQueue( TrackingSlotSupplier slotSupplier, @@ -73,13 +72,13 @@ static final class QueuedLARequest { return 0; }); this.slotSupplier = slotSupplier; - this.queueThread = new Thread(this::processQueue, "LocalActivitySlotSupplierQueue"); - this.queueThread.start(); + this.queueThreadService = + Executors.newSingleThreadExecutor(r -> new Thread(r, "LocalActivitySlotSupplierQueue")); } private void processQueue() { try { - while (running) { + while (running || !requestQueue.isEmpty()) { QueuedLARequest request = requestQueue.take(); SlotPermit slotPermit; try { @@ -102,9 +101,9 @@ private void processQueue() { } } - void shutdown() { - running = false; - queueThread.interrupt(); + void start() { + wasEverStarted = true; + this.queueThreadService.submit(this::processQueue); } boolean waitOnBackpressure(@Nullable Long acceptanceTimeoutMs) throws InterruptedException { @@ -134,4 +133,40 @@ void submitAttempt(SlotReservationData data, boolean isRetry, LocalActivityAttem newExecutionsBackpressureSemaphore.release(); } } + + @Override + public boolean isShutdown() { + return queueThreadService.isShutdown(); + } + + @Override + public boolean isTerminated() { + return queueThreadService.isTerminated(); + } + + @Override + public CompletableFuture shutdown(ShutdownManager shutdownManager, boolean interruptTasks) { + running = false; + if (requestQueue.isEmpty()) { + // Just interrupt the thread, so that if we're waiting on blocking take the thread will + // be interrupted and exit. Otherwise the loop will exit once the queue is empty. + queueThreadService.shutdownNow(); + } + + return interruptTasks + ? shutdownManager.shutdownExecutorNowUntimed( + queueThreadService, "LocalActivitySlotSupplierQueue") + : shutdownManager.shutdownExecutorUntimed( + queueThreadService, "LocalActivitySlotSupplierQueue"); + } + + @Override + public void awaitTermination(long timeout, TimeUnit unit) { + if (!wasEverStarted) { + // Not entirely clear why this is necessary, but await termination will hang the whole + // timeout duration if no task was ever submitted. + return; + } + ShutdownManager.awaitTermination(queueThreadService, unit.toMillis(timeout)); + } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java index a15e0c8769..106be05bee 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java @@ -689,6 +689,7 @@ public boolean start() { false); this.workerMetricsScope.counter(MetricsType.WORKER_START_COUNTER).inc(1); + this.slotQueue.start(); return true; } else { return false; @@ -698,9 +699,9 @@ public boolean start() { @Override public CompletableFuture shutdown(ShutdownManager shutdownManager, boolean interruptTasks) { if (activityAttemptTaskExecutor != null && !activityAttemptTaskExecutor.isShutdown()) { - slotQueue.shutdown(); - return activityAttemptTaskExecutor + return slotQueue .shutdown(shutdownManager, interruptTasks) + .thenCompose(r -> activityAttemptTaskExecutor.shutdown(shutdownManager, interruptTasks)) .thenCompose( r -> shutdownManager.shutdownExecutor( @@ -717,21 +718,24 @@ public CompletableFuture shutdown(ShutdownManager shutdownManager, boolean @Override public void awaitTermination(long timeout, TimeUnit unit) { - slotQueue.shutdown(); long timeoutMillis = unit.toMillis(timeout); - ShutdownManager.awaitTermination(scheduledExecutor, timeoutMillis); + long remainingTimeout = ShutdownManager.awaitTermination(scheduledExecutor, timeoutMillis); + ShutdownManager.awaitTermination(slotQueue, remainingTimeout); } @Override public boolean isShutdown() { - return activityAttemptTaskExecutor != null && activityAttemptTaskExecutor.isShutdown(); + return activityAttemptTaskExecutor != null + && activityAttemptTaskExecutor.isShutdown() + && slotQueue.isShutdown(); } @Override public boolean isTerminated() { return activityAttemptTaskExecutor != null && activityAttemptTaskExecutor.isTerminated() - && scheduledExecutor.isTerminated(); + && scheduledExecutor.isTerminated() + && slotQueue.isTerminated(); } @Override diff --git a/temporal-sdk/src/test/java/io/temporal/worker/LocalActivityWorkerNotStartedTest.java b/temporal-sdk/src/test/java/io/temporal/worker/LocalActivityWorkerNotStartedTest.java index 2638c7b3f6..17c3709d3d 100644 --- a/temporal-sdk/src/test/java/io/temporal/worker/LocalActivityWorkerNotStartedTest.java +++ b/temporal-sdk/src/test/java/io/temporal/worker/LocalActivityWorkerNotStartedTest.java @@ -20,10 +20,14 @@ package io.temporal.worker; +import static org.junit.Assert.assertTrue; + import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.workflow.Workflow; import io.temporal.workflow.WorkflowInterface; import io.temporal.workflow.WorkflowMethod; +import java.time.Duration; +import java.time.Instant; import java.util.Set; import org.junit.Rule; import org.junit.Test; @@ -42,14 +46,18 @@ public class LocalActivityWorkerNotStartedTest { @Test public void canShutDownProperlyWhenNotStarted() { // Shut down the (never started) worker + Instant shutdownTime = Instant.now(); testWorkflowRule.getTestEnvironment().getWorkerFactory().shutdown(); - testWorkflowRule.getWorker().awaitTermination(1, java.util.concurrent.TimeUnit.SECONDS); + testWorkflowRule.getWorker().awaitTermination(2, java.util.concurrent.TimeUnit.SECONDS); Set threadSet = Thread.getAllStackTraces().keySet(); for (Thread thread : threadSet) { if (thread.getName().contains("LocalActivitySlotSupplierQueue")) { throw new RuntimeException("Thread should be terminated"); } } + Duration elapsed = Duration.between(shutdownTime, Instant.now()); + // Shutdown should not have taken long + assertTrue(elapsed.getSeconds() < 2); } @WorkflowInterface