Skip to content

Commit

Permalink
Merge pull request #43268 from ozangunalp/worker_pool_disallow_shutdown
Browse files Browse the repository at this point in the history
Wrap the managed worker thread pool to disallow shutdown on prod mode
  • Loading branch information
ozangunalp authored Sep 24, 2024
2 parents ec83c29 + c37ec43 commit 5c3ad82
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.wildfly.common.cpu.ProcessorInfo;

import io.quarkus.runtime.annotations.Recorder;
import io.quarkus.runtime.util.NoopShutdownScheduledExecutorService;

/**
*
Expand Down Expand Up @@ -57,8 +58,19 @@ public void run() {
if (threadPoolConfig.prefill) {
underlying.prestartAllCoreThreads();
}
current = underlying;
return underlying;
ScheduledExecutorService managed = underlying;
// In prod and test mode, we wrap the ExecutorService and the shutdown() and shutdownNow() are deliberately not delegated
// This is to prevent the application and other extensions from shutting down the executor service
// The problem was described in https://github.com/quarkusio/quarkus/issues/16833#issuecomment-1917042589
// and https://github.com/quarkusio/quarkus/issues/43228
// For example, the Vertx instance is closed before io.quarkus.runtime.ExecutorRecorder.createShutdownTask() is used
// And when it's closed the underlying worker thread pool (which is in the prod mode backed by the ExecutorBuildItem) is closed as well
// As a result the quarkus.thread-pool.shutdown-interrupt config property and logic defined in ExecutorRecorder.createShutdownTask() is completely ignored
if (launchMode != LaunchMode.DEVELOPMENT) {
managed = new NoopShutdownScheduledExecutorService(underlying);
}
current = managed;
return managed;
}

private static Runnable createShutdownTask(ThreadPoolConfig threadPoolConfig, EnhancedQueueExecutor executor) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.quarkus.runtime.util;

import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
* Forwards all method calls to the scheduled executor service returned from the {@link #delegate()} method. Only non-default
* methods
* declared on the {@link ScheduledExecutorService} interface are forwarded.
*/
public abstract class ForwardingScheduledExecutorService extends ForwardingExecutorService implements ScheduledExecutorService {

protected abstract ScheduledExecutorService delegate();

@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return delegate().schedule(command, delay, unit);
}

@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return delegate().schedule(callable, delay, unit);
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return delegate().scheduleAtFixedRate(command, initialDelay, period, unit);
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return delegate().scheduleWithFixedDelay(command, initialDelay, delay, unit);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.quarkus.runtime.util;

import java.util.List;
import java.util.concurrent.ScheduledExecutorService;

import org.jboss.logging.Logger;

/**
* Forwards all method calls to the scheduled executor service returned from the {@link #delegate()} method.
* Does not allow shutdown
*/
public class NoopShutdownScheduledExecutorService extends ForwardingScheduledExecutorService {

private static final Logger LOG = Logger.getLogger(NoopShutdownScheduledExecutorService.class);

private final ScheduledExecutorService delegate;

public NoopShutdownScheduledExecutorService(final ScheduledExecutorService delegate) {
this.delegate = delegate;
}

@Override
protected ScheduledExecutorService delegate() {
return delegate;
}

@Override
public boolean isShutdown() {
// managed executors are never shut down from the application's perspective
return false;
}

@Override
public boolean isTerminated() {
// managed executors are never shut down from the application's perspective
return false;
}

@Override
public void shutdown() {
LOG.debug("shutdown() not allowed on managed executor service");
}

@Override
public List<Runnable> shutdownNow() {
LOG.debug("shutdownNow() not allowed on managed executor service");
return List.of();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package io.quarkus.vertx.deployment;

import java.util.concurrent.ExecutorService;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.arc.Arc;
import io.quarkus.runtime.StartupEvent;
import io.quarkus.test.QuarkusUnitTest;
import io.vertx.core.Future;
import io.vertx.core.Vertx;

public class VertxWorkerPoolShutdownTest {

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.withApplicationRoot((jar) -> jar
.addClasses(MyBean.class));

@Test
public void test() {
MyBean bean = Arc.container().instance(MyBean.class).get();
Assertions.assertTrue(bean.isOk());
}

@ApplicationScoped
public static class MyBean {

@Inject
Vertx vertx;

@Inject
ExecutorService executorService;

boolean ok;

public boolean isOk() {
return ok;
}

public void init(@Observes StartupEvent ev) {
executorService.shutdownNow();
((io.vertx.core.impl.ContextInternal) vertx.getOrCreateContext()).workerPool().executor().shutdownNow();
Future<Boolean> ok1 = vertx.executeBlocking(() -> true);
ok = ok1.toCompletionStage().toCompletableFuture().join();
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,8 @@ public class VertxCoreRecorder {
public Supplier<Vertx> configureVertx(VertxConfiguration config, ThreadPoolConfig threadPoolConfig,
LaunchMode launchMode, ShutdownContext shutdown, List<Consumer<VertxOptions>> customizers,
ExecutorService executorProxy) {
if (launchMode == LaunchMode.NORMAL) {
// In prod mode, we wrap the ExecutorService and the shutdown() and shutdownNow() are deliberately not delegated
// This is a workaround to solve the problem described in https://github.com/quarkusio/quarkus/issues/16833#issuecomment-1917042589
// The Vertx instance is closed before io.quarkus.runtime.ExecutorRecorder.createShutdownTask() is used
// And when it's closed the underlying worker thread pool (which is in the prod mode backed by the ExecutorBuildItem) is closed as well
// As a result the quarkus.thread-pool.shutdown-interrupt config property and logic defined in ExecutorRecorder.createShutdownTask() is completely ignored
QuarkusExecutorFactory.sharedExecutor = new NoopShutdownExecutorService(executorProxy);
} else {
QuarkusExecutorFactory.sharedExecutor = executorProxy;
}
// The wrapper previously here to prevent the executor to be shutdown prematurely is moved to higher level to the io.quarkus.runtime.ExecutorRecorder
QuarkusExecutorFactory.sharedExecutor = executorProxy;
if (launchMode != LaunchMode.DEVELOPMENT) {
vertx = new VertxSupplier(launchMode, config, customizers, threadPoolConfig, shutdown);
// we need this to be part of the last shutdown tasks because closing it early (basically before Arc)
Expand Down

0 comments on commit 5c3ad82

Please sign in to comment.