From c7ad32e52afb8c357fa7f0693dd19dc5d360801f Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Fri, 13 Sep 2024 10:41:04 +0200 Subject: [PATCH] Wrap the managed worker thread pool to disallow shutdown by application/other extensions Related to #16833 #43228 --- .../io/quarkus/runtime/ExecutorRecorder.java | 16 +++++- .../ForwardingScheduledExecutorService.java | 37 +++++++++++++ .../NoopShutdownScheduledExecutorService.java | 50 +++++++++++++++++ .../VertxWorkerPoolShutdownTest.java | 53 +++++++++++++++++++ .../runtime/NoopShutdownExecutorService.java | 39 -------------- .../vertx/core/runtime/VertxCoreRecorder.java | 12 +---- 6 files changed, 156 insertions(+), 51 deletions(-) create mode 100644 core/runtime/src/main/java/io/quarkus/runtime/util/ForwardingScheduledExecutorService.java create mode 100644 core/runtime/src/main/java/io/quarkus/runtime/util/NoopShutdownScheduledExecutorService.java create mode 100644 extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/VertxWorkerPoolShutdownTest.java delete mode 100644 extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/NoopShutdownExecutorService.java diff --git a/core/runtime/src/main/java/io/quarkus/runtime/ExecutorRecorder.java b/core/runtime/src/main/java/io/quarkus/runtime/ExecutorRecorder.java index 8cb7ba3acbfcf9..47bc631081a2bb 100644 --- a/core/runtime/src/main/java/io/quarkus/runtime/ExecutorRecorder.java +++ b/core/runtime/src/main/java/io/quarkus/runtime/ExecutorRecorder.java @@ -17,6 +17,7 @@ import org.wildfly.common.cpu.ProcessorInfo; import io.quarkus.runtime.annotations.Recorder; +import io.quarkus.runtime.util.NoopShutdownScheduledExecutorService; /** * @@ -57,8 +58,19 @@ public void run() { if (threadPoolConfig.prefill) { underlying.prestartAllCoreThreads(); } - current = underlying; - return underlying; + ScheduledExecutorService managed = underlying; + // In prod and test mode, we wrap the ExecutorService and the shutdown() and shutdownNow() are deliberately not delegated + // This is to prevent the application and other extensions from shutting down the executor service + // The problem was described in https://github.com/quarkusio/quarkus/issues/16833#issuecomment-1917042589 + // and https://github.com/quarkusio/quarkus/issues/43228 + // For example, the Vertx instance is closed before io.quarkus.runtime.ExecutorRecorder.createShutdownTask() is used + // And when it's closed the underlying worker thread pool (which is in the prod mode backed by the ExecutorBuildItem) is closed as well + // As a result the quarkus.thread-pool.shutdown-interrupt config property and logic defined in ExecutorRecorder.createShutdownTask() is completely ignored + if (launchMode != LaunchMode.DEVELOPMENT) { + managed = new NoopShutdownScheduledExecutorService(underlying); + } + current = managed; + return managed; } private static Runnable createShutdownTask(ThreadPoolConfig threadPoolConfig, EnhancedQueueExecutor executor) { diff --git a/core/runtime/src/main/java/io/quarkus/runtime/util/ForwardingScheduledExecutorService.java b/core/runtime/src/main/java/io/quarkus/runtime/util/ForwardingScheduledExecutorService.java new file mode 100644 index 00000000000000..eae3e5cf98a321 --- /dev/null +++ b/core/runtime/src/main/java/io/quarkus/runtime/util/ForwardingScheduledExecutorService.java @@ -0,0 +1,37 @@ +package io.quarkus.runtime.util; + +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Forwards all method calls to the scheduled executor service returned from the {@link #delegate()} method. Only non-default + * methods + * declared on the {@link ScheduledExecutorService} interface are forwarded. + */ +public abstract class ForwardingScheduledExecutorService extends ForwardingExecutorService implements ScheduledExecutorService { + + protected abstract ScheduledExecutorService delegate(); + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return delegate().schedule(command, delay, unit); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + return delegate().schedule(callable, delay, unit); + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return delegate().scheduleAtFixedRate(command, initialDelay, period, unit); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return delegate().scheduleWithFixedDelay(command, initialDelay, delay, unit); + } + +} diff --git a/core/runtime/src/main/java/io/quarkus/runtime/util/NoopShutdownScheduledExecutorService.java b/core/runtime/src/main/java/io/quarkus/runtime/util/NoopShutdownScheduledExecutorService.java new file mode 100644 index 00000000000000..ea0d05d51bd6be --- /dev/null +++ b/core/runtime/src/main/java/io/quarkus/runtime/util/NoopShutdownScheduledExecutorService.java @@ -0,0 +1,50 @@ +package io.quarkus.runtime.util; + +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; + +import org.jboss.logging.Logger; + +/** + * Forwards all method calls to the scheduled executor service returned from the {@link #delegate()} method. + * Does not allow shutdown + */ +public class NoopShutdownScheduledExecutorService extends ForwardingScheduledExecutorService { + + private static final Logger LOG = Logger.getLogger(NoopShutdownScheduledExecutorService.class); + + private final ScheduledExecutorService delegate; + + public NoopShutdownScheduledExecutorService(final ScheduledExecutorService delegate) { + this.delegate = delegate; + } + + @Override + protected ScheduledExecutorService delegate() { + return delegate; + } + + @Override + public boolean isShutdown() { + // managed executors are never shut down from the application's perspective + return false; + } + + @Override + public boolean isTerminated() { + // managed executors are never shut down from the application's perspective + return false; + } + + @Override + public void shutdown() { + LOG.debug("shutdown() not allowed on managed executor service"); + } + + @Override + public List shutdownNow() { + LOG.debug("shutdownNow() not allowed on managed executor service"); + return List.of(); + } + +} diff --git a/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/VertxWorkerPoolShutdownTest.java b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/VertxWorkerPoolShutdownTest.java new file mode 100644 index 00000000000000..ee7ff9deb90c79 --- /dev/null +++ b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/VertxWorkerPoolShutdownTest.java @@ -0,0 +1,53 @@ +package io.quarkus.vertx.deployment; + +import java.util.concurrent.ExecutorService; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Observes; +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.arc.Arc; +import io.quarkus.runtime.StartupEvent; +import io.quarkus.test.QuarkusUnitTest; +import io.vertx.core.Future; +import io.vertx.core.Vertx; + +public class VertxWorkerPoolShutdownTest { + + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addClasses(MyBean.class)); + + @Test + public void test() { + MyBean bean = Arc.container().instance(MyBean.class).get(); + Assertions.assertTrue(bean.isOk()); + } + + @ApplicationScoped + public static class MyBean { + + @Inject + Vertx vertx; + + @Inject + ExecutorService executorService; + + boolean ok; + + public boolean isOk() { + return ok; + } + + public void init(@Observes StartupEvent ev) { + executorService.shutdownNow(); + Future ok1 = vertx.executeBlocking(() -> true); + ok = ok1.toCompletionStage().toCompletableFuture().join(); + } + } +} diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/NoopShutdownExecutorService.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/NoopShutdownExecutorService.java deleted file mode 100644 index 01529b44375690..00000000000000 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/NoopShutdownExecutorService.java +++ /dev/null @@ -1,39 +0,0 @@ -package io.quarkus.vertx.core.runtime; - -import java.util.List; -import java.util.concurrent.ExecutorService; - -import org.jboss.logging.Logger; - -import io.quarkus.runtime.util.ForwardingExecutorService; - -/** - * This executor is only used in the prod mode as the Vertx worker thread pool. - */ -class NoopShutdownExecutorService extends ForwardingExecutorService { - - private static final Logger LOG = Logger.getLogger(NoopShutdownExecutorService.class); - - private final ExecutorService delegate; - - NoopShutdownExecutorService(ExecutorService delegate) { - this.delegate = delegate; - } - - @Override - protected ExecutorService delegate() { - return delegate; - } - - @Override - public void shutdown() { - LOG.debug("shutdown() deliberately not delegated"); - } - - @Override - public List shutdownNow() { - LOG.debug("shutdownNow() deliberately not delegated"); - return List.of(); - } - -} diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxCoreRecorder.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxCoreRecorder.java index 978c5887321f32..d678e2dfa3ab0b 100644 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxCoreRecorder.java +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxCoreRecorder.java @@ -101,16 +101,8 @@ public class VertxCoreRecorder { public Supplier configureVertx(VertxConfiguration config, ThreadPoolConfig threadPoolConfig, LaunchMode launchMode, ShutdownContext shutdown, List> customizers, ExecutorService executorProxy) { - if (launchMode == LaunchMode.NORMAL) { - // In prod mode, we wrap the ExecutorService and the shutdown() and shutdownNow() are deliberately not delegated - // This is a workaround to solve the problem described in https://github.com/quarkusio/quarkus/issues/16833#issuecomment-1917042589 - // The Vertx instance is closed before io.quarkus.runtime.ExecutorRecorder.createShutdownTask() is used - // And when it's closed the underlying worker thread pool (which is in the prod mode backed by the ExecutorBuildItem) is closed as well - // As a result the quarkus.thread-pool.shutdown-interrupt config property and logic defined in ExecutorRecorder.createShutdownTask() is completely ignored - QuarkusExecutorFactory.sharedExecutor = new NoopShutdownExecutorService(executorProxy); - } else { - QuarkusExecutorFactory.sharedExecutor = executorProxy; - } + // The wrapper previously here to prevent the executor to be shutdown prematurely is moved to higher level to the io.quarkus.runtime.ExecutorRecorder + QuarkusExecutorFactory.sharedExecutor = executorProxy; if (launchMode != LaunchMode.DEVELOPMENT) { vertx = new VertxSupplier(launchMode, config, customizers, threadPoolConfig, shutdown); // we need this to be part of the last shutdown tasks because closing it early (basically before Arc)