Skip to content

Commit

Permalink
Fix queue shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Aug 20, 2024
1 parent caef9bf commit 44992e3
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@
import io.temporal.worker.tuning.LocalActivitySlotInfo;
import io.temporal.worker.tuning.SlotPermit;
import io.temporal.workflow.Functions;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class LocalActivitySlotSupplierQueue {
class LocalActivitySlotSupplierQueue implements Shutdownable {
static final class QueuedLARequest {
final boolean isRetry;
final SlotReservationData data;
Expand All @@ -47,10 +45,11 @@ static final class QueuedLARequest {
private final Semaphore newExecutionsBackpressureSemaphore;
private final TrackingSlotSupplier<LocalActivitySlotInfo> slotSupplier;
private final Functions.Proc1<LocalActivityAttemptTask> afterReservedCallback;
private final Thread queueThread;
private final ExecutorService queueThreadService;
private static final Logger log =
LoggerFactory.getLogger(LocalActivitySlotSupplierQueue.class.getName());
private volatile boolean running = true;
private volatile boolean wasEverStarted = false;

LocalActivitySlotSupplierQueue(
TrackingSlotSupplier<LocalActivitySlotInfo> slotSupplier,
Expand All @@ -73,13 +72,13 @@ static final class QueuedLARequest {
return 0;
});
this.slotSupplier = slotSupplier;
this.queueThread = new Thread(this::processQueue, "LocalActivitySlotSupplierQueue");
this.queueThread.start();
this.queueThreadService =
Executors.newSingleThreadExecutor(r -> new Thread(r, "LocalActivitySlotSupplierQueue"));
}

private void processQueue() {
try {
while (running) {
while (running || !requestQueue.isEmpty()) {
QueuedLARequest request = requestQueue.take();
SlotPermit slotPermit;
try {
Expand All @@ -102,9 +101,9 @@ private void processQueue() {
}
}

void shutdown() {
running = false;
queueThread.interrupt();
void start() {
wasEverStarted = true;
this.queueThreadService.submit(this::processQueue);
}

boolean waitOnBackpressure(@Nullable Long acceptanceTimeoutMs) throws InterruptedException {
Expand Down Expand Up @@ -134,4 +133,40 @@ void submitAttempt(SlotReservationData data, boolean isRetry, LocalActivityAttem
newExecutionsBackpressureSemaphore.release();
}
}

@Override
public boolean isShutdown() {
return queueThreadService.isShutdown();
}

@Override
public boolean isTerminated() {
return queueThreadService.isTerminated();
}

@Override
public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
running = false;
if (requestQueue.isEmpty()) {
// Just interrupt the thread, so that if we're waiting on blocking take the thread will
// be interrupted and exit. Otherwise the loop will exit once the queue is empty.
queueThreadService.shutdownNow();
}

return interruptTasks
? shutdownManager.shutdownExecutorNowUntimed(
queueThreadService, "LocalActivitySlotSupplierQueue")
: shutdownManager.shutdownExecutorUntimed(
queueThreadService, "LocalActivitySlotSupplierQueue");
}

@Override
public void awaitTermination(long timeout, TimeUnit unit) {
if (!wasEverStarted) {
// Not entirely clear why this is necessary, but await termination will hang the whole
// timeout duration if no task was ever submitted.
return;
}
ShutdownManager.awaitTermination(queueThreadService, unit.toMillis(timeout));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,7 @@ public boolean start() {
false);

this.workerMetricsScope.counter(MetricsType.WORKER_START_COUNTER).inc(1);
this.slotQueue.start();
return true;
} else {
return false;
Expand All @@ -698,9 +699,9 @@ public boolean start() {
@Override
public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
if (activityAttemptTaskExecutor != null && !activityAttemptTaskExecutor.isShutdown()) {
slotQueue.shutdown();
return activityAttemptTaskExecutor
return slotQueue
.shutdown(shutdownManager, interruptTasks)
.thenCompose(r -> activityAttemptTaskExecutor.shutdown(shutdownManager, interruptTasks))
.thenCompose(
r ->
shutdownManager.shutdownExecutor(
Expand All @@ -717,21 +718,24 @@ public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean

@Override
public void awaitTermination(long timeout, TimeUnit unit) {
slotQueue.shutdown();
long timeoutMillis = unit.toMillis(timeout);
ShutdownManager.awaitTermination(scheduledExecutor, timeoutMillis);
long remainingTimeout = ShutdownManager.awaitTermination(scheduledExecutor, timeoutMillis);
ShutdownManager.awaitTermination(slotQueue, remainingTimeout);
}

@Override
public boolean isShutdown() {
return activityAttemptTaskExecutor != null && activityAttemptTaskExecutor.isShutdown();
return activityAttemptTaskExecutor != null
&& activityAttemptTaskExecutor.isShutdown()
&& slotQueue.isShutdown();
}

@Override
public boolean isTerminated() {
return activityAttemptTaskExecutor != null
&& activityAttemptTaskExecutor.isTerminated()
&& scheduledExecutor.isTerminated();
&& scheduledExecutor.isTerminated()
&& slotQueue.isTerminated();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@

package io.temporal.worker;

import static org.junit.Assert.assertTrue;

import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.time.Duration;
import java.time.Instant;
import java.util.Set;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -42,14 +46,18 @@ public class LocalActivityWorkerNotStartedTest {
@Test
public void canShutDownProperlyWhenNotStarted() {
// Shut down the (never started) worker
Instant shutdownTime = Instant.now();
testWorkflowRule.getTestEnvironment().getWorkerFactory().shutdown();
testWorkflowRule.getWorker().awaitTermination(1, java.util.concurrent.TimeUnit.SECONDS);
testWorkflowRule.getWorker().awaitTermination(2, java.util.concurrent.TimeUnit.SECONDS);
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
for (Thread thread : threadSet) {
if (thread.getName().contains("LocalActivitySlotSupplierQueue")) {
throw new RuntimeException("Thread should be terminated");
}
}
Duration elapsed = Duration.between(shutdownTime, Instant.now());
// Shutdown should not have taken long
assertTrue(elapsed.getSeconds() < 2);
}

@WorkflowInterface
Expand Down

0 comments on commit 44992e3

Please sign in to comment.