Skip to content

Commit

Permalink
Merge pull request quarkusio#38478 from mkouba/issue-16833
Browse files Browse the repository at this point in the history
Vertx: use NoopShutdownExecutorService and DevModeExecutorService
  • Loading branch information
mkouba authored Feb 1, 2024
2 parents 19dd956 + 277ff01 commit dfe434e
Show file tree
Hide file tree
Showing 9 changed files with 250 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public class RuntimeUpdatesProcessor implements HotReplacementContext, Closeable
final Map<Path, Long> sourceFileTimestamps = new ConcurrentHashMap<>();

private final List<Runnable> preScanSteps = new CopyOnWriteArrayList<>();
private final List<Runnable> postRestartSteps = new CopyOnWriteArrayList<>();
private final List<Consumer<Set<String>>> noRestartChangesConsumers = new CopyOnWriteArrayList<>();
private final List<HotReplacementSetup> hotReplacementSetup = new ArrayList<>();
private final List<Runnable> deploymentFailedStartHandlers = new ArrayList<>();
Expand Down Expand Up @@ -541,6 +542,13 @@ public boolean doScan(boolean userInitiated, boolean forceRestart) {
restartCallback.accept(filesChanged, changedClassResults);
long timeNanoSeconds = System.nanoTime() - startNanoseconds;
log.infof("Live reload total time: %ss ", Timing.convertToBigDecimalSeconds(timeNanoSeconds));
for (Runnable step : postRestartSteps) {
try {
step.run();
} catch (Throwable t) {
log.error("Post Restart step failed", t);
}
}
if (TimeUnit.SECONDS.convert(timeNanoSeconds, TimeUnit.NANOSECONDS) >= 4 && !instrumentationEnabled()) {
if (!instrumentationLogPrinted) {
instrumentationLogPrinted = true;
Expand Down Expand Up @@ -593,6 +601,11 @@ public void addPreScanStep(Runnable runnable) {
preScanSteps.add(runnable);
}

@Override
public void addPostRestartStep(Runnable runnable) {
postRestartSteps.add(runnable);
}

@Override
public void consumeNoRestartChanges(Consumer<Set<String>> consumer) {
noRestartChangesConsumers.add(consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,9 @@ public interface HotReplacementContext {
* @return A set of changed files
*/
Set<String> syncState(Map<String, String> fileHashes);

/**
* Adds a task that is run after the restart is performed.
*/
void addPostRestartStep(Runnable runnable);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package io.quarkus.runtime.util;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Forwards all method calls to the executor service returned from the {@link #delegate()} method. Only non-default methods
* declared on the {@link ExecutorService} interface are forwarded.
*/
public abstract class ForwardingExecutorService implements ExecutorService {

protected abstract ExecutorService delegate();

@Override
public void execute(Runnable command) {
delegate().execute(command);
}

@Override
public void shutdown() {
delegate().shutdown();
}

@Override
public List<Runnable> shutdownNow() {
return delegate().shutdownNow();
}

@Override
public boolean isShutdown() {
return delegate().isShutdown();
}

@Override
public boolean isTerminated() {
return delegate().isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return delegate().awaitTermination(timeout, unit);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return delegate().submit(task);
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return delegate().submit(task, result);
}

@Override
public Future<?> submit(Runnable task) {
return delegate().submit(task);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return delegate().invokeAll(tasks);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return delegate().invokeAll(tasks, timeout, unit);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return delegate().invokeAny(tasks);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return delegate().invokeAny(tasks, timeout, unit);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import io.quarkus.dev.console.DevConsoleManager;
import io.quarkus.dev.spi.HotReplacementContext;
import io.quarkus.dev.spi.HotReplacementSetup;
import io.quarkus.vertx.core.runtime.QuarkusExecutorFactory;
import io.quarkus.vertx.core.runtime.VertxCoreRecorder;
import io.quarkus.vertx.http.runtime.VertxHttpRecorder;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.http.HttpServerResponse;
Expand Down Expand Up @@ -50,6 +52,15 @@ public void run() {
RemoteSyncHandler.doPreScan();
}
});
hotReplacementContext.addPostRestartStep(new Runnable() {
@Override
public void run() {
// If not on a worker thread then attempt to re-initialize the dev mode executor
if (!Context.isOnWorkerThread()) {
QuarkusExecutorFactory.reinitializeDevModeExecutor();
}
}
});
}

@Override
Expand Down Expand Up @@ -186,6 +197,7 @@ public void handle(AsyncResult<Boolean> event) {
} else {
boolean restart = event.result();
if (restart) {
QuarkusExecutorFactory.reinitializeDevModeExecutor();
routingContext.request().headers().set(HEADER_NAME, "true");
VertxHttpRecorder.getRootHandler().handle(routingContext.request());
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.quarkus.vertx.core.runtime;

import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

import org.jboss.logging.Logger;

import io.quarkus.runtime.util.ForwardingExecutorService;

/**
* This executor is only used in the dev mode as the Vertx worker thread pool.
* <p>
* The underlying executor can be shut down and then replaced with a new re-initialized executor.
*/
class DevModeExecutorService extends ForwardingExecutorService {

private static final Logger LOG = Logger.getLogger(DevModeExecutorService.class);

private final Supplier<ExecutorService> initializer;
private volatile ExecutorService executor;

DevModeExecutorService(Supplier<ExecutorService> initializer) {
this.initializer = initializer;
this.executor = initializer.get();
}

@Override
protected ExecutorService delegate() {
return executor;
}

/**
* Shutdown the underlying executor and then initialize a new one.
*/
void reinit() {
ExecutorService oldExecutor = this.executor;
if (oldExecutor != null) {
oldExecutor.shutdownNow();
}
this.executor = initializer.get();
LOG.debug("Dev mode executor re-initialized");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package io.quarkus.vertx.core.runtime;

import java.util.List;
import java.util.concurrent.ExecutorService;

import org.jboss.logging.Logger;

import io.quarkus.runtime.util.ForwardingExecutorService;

/**
* This executor is only used in the prod mode as the Vertx worker thread pool.
*/
class NoopShutdownExecutorService extends ForwardingExecutorService {

private static final Logger LOG = Logger.getLogger(NoopShutdownExecutorService.class);

private final ExecutorService delegate;

NoopShutdownExecutorService(ExecutorService delegate) {
this.delegate = delegate;
}

@Override
protected ExecutorService delegate() {
return delegate;
}

@Override
public void shutdown() {
LOG.debug("shutdown() deliberately not delegated");
}

@Override
public List<Runnable> shutdownNow() {
LOG.debug("shutdownNow() deliberately not delegated");
return List.of();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import org.jboss.logging.Logger;
import org.jboss.threads.EnhancedQueueExecutor;
Expand All @@ -15,6 +16,7 @@

public class QuarkusExecutorFactory implements ExecutorServiceFactory {
static volatile ExecutorService sharedExecutor;
static volatile DevModeExecutorService devModeExecutor;
private static final AtomicInteger executorCount = new AtomicInteger(0);
private static final Logger log = Logger.getLogger(QuarkusExecutorFactory.class);

Expand All @@ -28,20 +30,51 @@ public QuarkusExecutorFactory(VertxConfiguration conf, LaunchMode launchMode) {

@Override
public ExecutorService createExecutor(ThreadFactory threadFactory, Integer concurrency, Integer maxConcurrency) {
// The current Vertx impl creates two external executors during initialization
// The first one is used for the worker thread pool, the second one is used internally,
// and additional executors may be created on demand
// Unfortunately, there is no way to distinguish the particular executor types
// Therefore, we only consider the first one as the worker thread pool
// Note that in future versions of Vertx this may change!
if (executorCount.incrementAndGet() == 1) {
// The first executor should be the worker thread pool
if (launchMode != LaunchMode.DEVELOPMENT) {
if (sharedExecutor == null) {
log.warn("Shared executor not set. Unshared executor will be created for blocking work");
// This should only happen in tests using Vertx directly in a unit test
sharedExecutor = internalCreateExecutor(threadFactory, concurrency, maxConcurrency);
}
return sharedExecutor;
} else {
// In dev mode we use a special executor for the worker pool
// where the underlying executor can be shut down and then replaced with a new re-initialized executor
// This is a workaround to solve the problem described in https://github.com/quarkusio/quarkus/issues/16833#issuecomment-1917042589
// The Vertx instance is reused between restarts but we must attempt to shut down this executor,
// for example to cancel/interrupt the scheduled methods
devModeExecutor = new DevModeExecutorService(new Supplier<ExecutorService>() {
@Override
public ExecutorService get() {
return internalCreateExecutor(threadFactory, concurrency, maxConcurrency);
}
});
return devModeExecutor;
}
}

return internalCreateExecutor(threadFactory, concurrency, maxConcurrency);
}

/**
* In dev mode, shut down the underlying executor and then initialize a new one.
*
* @see DevModeExecutorService
*/
public static void reinitializeDevModeExecutor() {
DevModeExecutorService executor = QuarkusExecutorFactory.devModeExecutor;
if (executor != null) {
executor.reinit();
}
}

private ExecutorService internalCreateExecutor(ThreadFactory threadFactory, Integer concurrency, Integer maxConcurrency) {
final EnhancedQueueExecutor.Builder builder = new EnhancedQueueExecutor.Builder()
.setRegisterMBean(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,16 @@ public class VertxCoreRecorder {
public Supplier<Vertx> configureVertx(VertxConfiguration config, ThreadPoolConfig threadPoolConfig,
LaunchMode launchMode, ShutdownContext shutdown, List<Consumer<VertxOptions>> customizers,
ExecutorService executorProxy) {
QuarkusExecutorFactory.sharedExecutor = executorProxy;
if (launchMode == LaunchMode.NORMAL) {
// In prod mode, we wrap the ExecutorService and the shutdown() and shutdownNow() are deliberately not delegated
// This is a workaround to solve the problem described in https://github.com/quarkusio/quarkus/issues/16833#issuecomment-1917042589
// The Vertx instance is closed before io.quarkus.runtime.ExecutorRecorder.createShutdownTask() is used
// And when it's closed the underlying worker thread pool (which is in the prod mode backed by the ExecutorBuildItem) is closed as well
// As a result the quarkus.thread-pool.shutdown-interrupt config property and logic defined in ExecutorRecorder.createShutdownTask() is completely ignored
QuarkusExecutorFactory.sharedExecutor = new NoopShutdownExecutorService(executorProxy);
} else {
QuarkusExecutorFactory.sharedExecutor = executorProxy;
}
if (launchMode != LaunchMode.DEVELOPMENT) {
vertx = new VertxSupplier(launchMode, config, customizers, threadPoolConfig, shutdown);
// we need this to be part of the last shutdown tasks because closing it early (basically before Arc)
Expand Down
Loading

0 comments on commit dfe434e

Please sign in to comment.