Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap the managed worker thread pool to disallow shutdown on prod mode #43268

Merged
merged 1 commit into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.wildfly.common.cpu.ProcessorInfo;

import io.quarkus.runtime.annotations.Recorder;
import io.quarkus.runtime.util.NoopShutdownScheduledExecutorService;

/**
*
Expand Down Expand Up @@ -57,8 +58,19 @@ public void run() {
if (threadPoolConfig.prefill) {
underlying.prestartAllCoreThreads();
}
current = underlying;
return underlying;
ScheduledExecutorService managed = underlying;
// In prod and test mode, we wrap the ExecutorService and the shutdown() and shutdownNow() are deliberately not delegated
// This is to prevent the application and other extensions from shutting down the executor service
// The problem was described in https://github.com/quarkusio/quarkus/issues/16833#issuecomment-1917042589
// and https://github.com/quarkusio/quarkus/issues/43228
// For example, the Vertx instance is closed before io.quarkus.runtime.ExecutorRecorder.createShutdownTask() is used
// And when it's closed the underlying worker thread pool (which is in the prod mode backed by the ExecutorBuildItem) is closed as well
// As a result the quarkus.thread-pool.shutdown-interrupt config property and logic defined in ExecutorRecorder.createShutdownTask() is completely ignored
if (launchMode != LaunchMode.DEVELOPMENT) {
managed = new NoopShutdownScheduledExecutorService(underlying);
}
current = managed;
return managed;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, isn't the Vert.x worker thread pool backed by this very ExecutorService in the prod mode? If so, then we could remove the vertx-specific stuff for the prod mode here: https://github.com/quarkusio/quarkus/blob/main/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxCoreRecorder.java#L110

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let's try to simplify the code! Except that we don't have a test for this. I guess that we could at least try to run the reproducer from #16833 manually?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let me remove the draft to run the existing test suite. I can also reproduce with #43228.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we allow the shutdown only on dev mode. It'd be better if tests were closer to prod.

}

private static Runnable createShutdownTask(ThreadPoolConfig threadPoolConfig, EnhancedQueueExecutor executor) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.quarkus.runtime.util;

import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
* Forwards all method calls to the scheduled executor service returned from the {@link #delegate()} method. Only non-default
* methods
* declared on the {@link ScheduledExecutorService} interface are forwarded.
*/
public abstract class ForwardingScheduledExecutorService extends ForwardingExecutorService implements ScheduledExecutorService {

protected abstract ScheduledExecutorService delegate();

@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return delegate().schedule(command, delay, unit);
}

@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return delegate().schedule(callable, delay, unit);
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return delegate().scheduleAtFixedRate(command, initialDelay, period, unit);
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return delegate().scheduleWithFixedDelay(command, initialDelay, delay, unit);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.quarkus.runtime.util;

import java.util.List;
import java.util.concurrent.ScheduledExecutorService;

import org.jboss.logging.Logger;

/**
* Forwards all method calls to the scheduled executor service returned from the {@link #delegate()} method.
* Does not allow shutdown
*/
public class NoopShutdownScheduledExecutorService extends ForwardingScheduledExecutorService {

private static final Logger LOG = Logger.getLogger(NoopShutdownScheduledExecutorService.class);

private final ScheduledExecutorService delegate;

public NoopShutdownScheduledExecutorService(final ScheduledExecutorService delegate) {
this.delegate = delegate;
}

@Override
protected ScheduledExecutorService delegate() {
return delegate;
}

@Override
public boolean isShutdown() {
// managed executors are never shut down from the application's perspective
return false;
}

@Override
public boolean isTerminated() {
// managed executors are never shut down from the application's perspective
return false;
}

@Override
public void shutdown() {
LOG.debug("shutdown() not allowed on managed executor service");
}

@Override
public List<Runnable> shutdownNow() {
LOG.debug("shutdownNow() not allowed on managed executor service");
return List.of();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package io.quarkus.vertx.deployment;

import java.util.concurrent.ExecutorService;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.arc.Arc;
import io.quarkus.runtime.StartupEvent;
import io.quarkus.test.QuarkusUnitTest;
import io.vertx.core.Future;
import io.vertx.core.Vertx;

public class VertxWorkerPoolShutdownTest {

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.withApplicationRoot((jar) -> jar
.addClasses(MyBean.class));

@Test
public void test() {
MyBean bean = Arc.container().instance(MyBean.class).get();
Assertions.assertTrue(bean.isOk());
}

@ApplicationScoped
public static class MyBean {

@Inject
Vertx vertx;

@Inject
ExecutorService executorService;

boolean ok;

public boolean isOk() {
return ok;
}

public void init(@Observes StartupEvent ev) {
executorService.shutdownNow();
ozangunalp marked this conversation as resolved.
Show resolved Hide resolved
((io.vertx.core.impl.ContextInternal) vertx.getOrCreateContext()).workerPool().executor().shutdownNow();
Future<Boolean> ok1 = vertx.executeBlocking(() -> true);
ok = ok1.toCompletionStage().toCompletableFuture().join();
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,8 @@ public class VertxCoreRecorder {
public Supplier<Vertx> configureVertx(VertxConfiguration config, ThreadPoolConfig threadPoolConfig,
LaunchMode launchMode, ShutdownContext shutdown, List<Consumer<VertxOptions>> customizers,
ExecutorService executorProxy) {
if (launchMode == LaunchMode.NORMAL) {
// In prod mode, we wrap the ExecutorService and the shutdown() and shutdownNow() are deliberately not delegated
// This is a workaround to solve the problem described in https://github.com/quarkusio/quarkus/issues/16833#issuecomment-1917042589
// The Vertx instance is closed before io.quarkus.runtime.ExecutorRecorder.createShutdownTask() is used
// And when it's closed the underlying worker thread pool (which is in the prod mode backed by the ExecutorBuildItem) is closed as well
// As a result the quarkus.thread-pool.shutdown-interrupt config property and logic defined in ExecutorRecorder.createShutdownTask() is completely ignored
QuarkusExecutorFactory.sharedExecutor = new NoopShutdownExecutorService(executorProxy);
} else {
QuarkusExecutorFactory.sharedExecutor = executorProxy;
}
// The wrapper previously here to prevent the executor to be shutdown prematurely is moved to higher level to the io.quarkus.runtime.ExecutorRecorder
QuarkusExecutorFactory.sharedExecutor = executorProxy;
mkouba marked this conversation as resolved.
Show resolved Hide resolved
if (launchMode != LaunchMode.DEVELOPMENT) {
vertx = new VertxSupplier(launchMode, config, customizers, threadPoolConfig, shutdown);
// we need this to be part of the last shutdown tasks because closing it early (basically before Arc)
Expand Down
Loading