From e76188047132773cdcfa2ec68de03f480a179858 Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Thu, 19 Aug 2021 06:15:15 +1000 Subject: [PATCH] Vert.x TCCL fixes --- .../core/deployment/VertxCoreProcessor.java | 4 +- .../vertx/core/runtime/VertxCoreRecorder.java | 156 +++++++++--------- 2 files changed, 80 insertions(+), 80 deletions(-) diff --git a/extensions/vertx-core/deployment/src/main/java/io/quarkus/vertx/core/deployment/VertxCoreProcessor.java b/extensions/vertx-core/deployment/src/main/java/io/quarkus/vertx/core/deployment/VertxCoreProcessor.java index ef373c89186b86..608b4ac8849628 100644 --- a/extensions/vertx-core/deployment/src/main/java/io/quarkus/vertx/core/deployment/VertxCoreProcessor.java +++ b/extensions/vertx-core/deployment/src/main/java/io/quarkus/vertx/core/deployment/VertxCoreProcessor.java @@ -147,8 +147,8 @@ void registerVerticleClasses(CombinedIndexBuildItem indexBuildItem, @BuildStep @Record(ExecutionTime.RUNTIME_INIT) - ThreadFactoryBuildItem createVertxThreadFactory(VertxCoreRecorder recorder) { - return new ThreadFactoryBuildItem(recorder.createThreadFactory()); + ThreadFactoryBuildItem createVertxThreadFactory(VertxCoreRecorder recorder, LaunchModeBuildItem launchMode) { + return new ThreadFactoryBuildItem(recorder.createThreadFactory(launchMode.getLaunchMode())); } @BuildStep diff --git a/extensions/vertx-core/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxCoreRecorder.java b/extensions/vertx-core/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxCoreRecorder.java index f744cbd604fc4d..c50e4240155050 100644 --- a/extensions/vertx-core/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxCoreRecorder.java +++ b/extensions/vertx-core/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxCoreRecorder.java @@ -7,14 +7,12 @@ import static io.quarkus.vertx.core.runtime.SSLConfigHelper.configurePfxTrustOptions; import java.io.File; -import java.lang.reflect.Field; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -29,11 +27,9 @@ import org.jboss.logging.Logger; import org.jboss.threads.ContextHandler; -import org.jboss.threads.EnhancedQueueExecutor; import org.wildfly.common.cpu.ProcessorInfo; import io.netty.channel.EventLoopGroup; -import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.FastThreadLocal; import io.quarkus.runtime.IOThreadDetector; import io.quarkus.runtime.LaunchMode; @@ -57,7 +53,6 @@ import io.vertx.core.impl.VertxBuilder; import io.vertx.core.impl.VertxImpl; import io.vertx.core.impl.VertxThread; -import io.vertx.core.impl.WorkerPool; import io.vertx.core.spi.VertxThreadFactory; import io.vertx.core.spi.resolver.ResolverProvider; @@ -81,6 +76,18 @@ public class VertxCoreRecorder { */ private static volatile String webDeploymentId; + /** + * All current dev mode threads, accessed under lock + *

+ * This allows them to have their TCCL updated on restart + */ + private static final Set devModeThreads = new HashSet<>(); + /** + * The class loader to use for new threads in dev mode. This must be accessed under the {@link #devModeThreads} lock, to + * avoid race conditions. + */ + private static volatile ClassLoader currentDevModeNewThreadCreationClassLoader; + public Supplier configureVertx(VertxConfiguration config, LaunchMode launchMode, ShutdownContext shutdown, List> customizers, ExecutorService executorProxy) { @@ -94,13 +101,14 @@ public Supplier configureVertx(VertxConfiguration config, public void run() { destroy(); QuarkusExecutorFactory.sharedExecutor = null; + currentDevModeNewThreadCreationClassLoader = null; } }); } else { if (vertx == null) { vertx = new VertxSupplier(launchMode, config, customizers, shutdown); } else if (vertx.v != null) { - tryCleanTccl(vertx.v); + tryCleanTccl(); } shutdown.addLastShutdownTask(new Runnable() { @Override @@ -135,75 +143,13 @@ public void handle(AsyncResult event) { return vertx; } - private void tryCleanTccl(Vertx devModeVertx) { - //this is a best effort attempt to clean out the old TCCL from + private void tryCleanTccl() { ClassLoader cl = Thread.currentThread().getContextClassLoader(); - - resetExecutorsClassloaderContext(extractWorkerPool(devModeVertx), cl); - resetExecutorsClassloaderContext(extractInternalWorkerPool(devModeVertx), cl); - - EventLoopGroup group = ((VertxImpl) devModeVertx).getEventLoopGroup(); - for (EventExecutor i : group) { - i.execute(new Runnable() { - - @Override - public void run() { - Thread.currentThread().setContextClassLoader(cl); - } - - }); - } - - } - - private WorkerPool extractInternalWorkerPool(Vertx devModeVertx) { - VertxImpl vertxImpl = (VertxImpl) devModeVertx; - final Object internalWorkerPool; - final Field field; - try { - field = VertxImpl.class.getDeclaredField("internalWorkerPool"); - field.setAccessible(true); - } catch (NoSuchFieldException e) { - throw new RuntimeException(e); - } - try { - internalWorkerPool = field.get(vertxImpl); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } - - return (WorkerPool) internalWorkerPool; - } - - private WorkerPool extractWorkerPool(Vertx devModeVertx) { - final ContextInternal ctx = (ContextInternal) devModeVertx.getOrCreateContext(); - return ctx.workerPool(); - } - - /** - * Extract the JBoss Threads EnhancedQueueExecutor from the Vertx instance - * and reset all threads to use the given ClassLoader. - * This is messy as it needs to use reflection until Vertx can expose it: - * - https://github.com/eclipse-vertx/vert.x/pull/4029 - */ - private void resetExecutorsClassloaderContext(WorkerPool workerPool, ClassLoader cl) { - final Method executorMethod; - try { - executorMethod = WorkerPool.class.getDeclaredMethod("executor"); - executorMethod.setAccessible(true); - } catch (NoSuchMethodException e) { - throw new RuntimeException(e); - } - final Object result; - try { - result = executorMethod.invoke(workerPool); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - EnhancedQueueExecutor executor = (EnhancedQueueExecutor) result; - final Thread[] runningThreads = executor.getRunningThreads(); - for (Thread t : runningThreads) { - t.setContextClassLoader(cl); + synchronized (devModeThreads) { + currentDevModeNewThreadCreationClassLoader = cl; + for (var t : devModeThreads) { + t.setContextClassLoader(cl); + } } } @@ -229,6 +175,11 @@ public void handle(AsyncResult event) { latch.await(); } catch (InterruptedException e) { throw new RuntimeException(e); + } finally { + synchronized (devModeThreads) { + devModeThreads.clear(); + currentDevModeNewThreadCreationClassLoader = null; + } } } } @@ -253,9 +204,34 @@ public static Vertx initialize(VertxConfiguration conf, VertxOptionsCustomizer c Vertx vertx; + Optional nonDevModeTccl; + if (launchMode == LaunchMode.DEVELOPMENT) { + currentDevModeNewThreadCreationClassLoader = Thread.currentThread().getContextClassLoader(); + nonDevModeTccl = Optional.empty(); //in dev mode we don't want to capture the original TCCL to stop a leak + } else { + nonDevModeTccl = Optional.of(Thread.currentThread().getContextClassLoader()); + } + VertxThreadFactory vertxThreadFactory = new VertxThreadFactory() { + @Override + public VertxThread newVertxThread(Runnable target, String name, boolean worker, long maxExecTime, + TimeUnit maxExecTimeUnit) { + var thread = VertxThreadFactory.INSTANCE.newVertxThread(target, name, worker, maxExecTime, maxExecTimeUnit); + + if (launchMode == LaunchMode.DEVELOPMENT) { + synchronized (devModeThreads) { + setNewThreadTccl(thread); + devModeThreads.add(thread); + } + } else { + thread.setContextClassLoader(nonDevModeTccl.get()); + } + return thread; + } + }; if (conf != null && conf.cluster != null && conf.cluster.clustered) { CompletableFuture latch = new CompletableFuture<>(); new VertxBuilder(options) + .threadFactory(vertxThreadFactory) .executorServiceFactory(new QuarkusExecutorFactory(conf, launchMode)) .init().clusteredVertx(new Handler>() { @Override @@ -270,6 +246,7 @@ public void handle(AsyncResult ar) { vertx = latch.join(); } else { vertx = new VertxBuilder(options) + .threadFactory(vertxThreadFactory) .executorServiceFactory(new QuarkusExecutorFactory(conf, launchMode)) .init().vertx(); } @@ -509,18 +486,41 @@ public Integer get() { }; } - public ThreadFactory createThreadFactory() { - ClassLoader tccl = Thread.currentThread().getContextClassLoader(); + public ThreadFactory createThreadFactory(LaunchMode launchMode) { + Optional nonDevModeTccl; + if (launchMode == LaunchMode.DEVELOPMENT) { + currentDevModeNewThreadCreationClassLoader = Thread.currentThread().getContextClassLoader(); + nonDevModeTccl = Optional.empty(); //in dev mode we don't want to capture the original TCCL to stop a leak + } else { + nonDevModeTccl = Optional.of(Thread.currentThread().getContextClassLoader()); + } AtomicInteger threadCount = new AtomicInteger(0); return runnable -> { VertxThread thread = VertxThreadFactory.INSTANCE.newVertxThread(runnable, "executor-thread-" + threadCount.getAndIncrement(), true, 0, null); thread.setDaemon(true); - thread.setContextClassLoader(tccl); + if (launchMode == LaunchMode.DEVELOPMENT) { + synchronized (devModeThreads) { + setNewThreadTccl(thread); + devModeThreads.add(thread); + } + } else { + thread.setContextClassLoader(nonDevModeTccl.get()); + } return thread; }; } + private static void setNewThreadTccl(VertxThread thread) { + ClassLoader cl = VertxCoreRecorder.currentDevModeNewThreadCreationClassLoader; + if (cl == null) { + //can happen if a thread is created after shutdown is initiated + //should be super rare, but might as well handle it properly + cl = VertxCoreRecorder.class.getClassLoader(); + } + thread.setContextClassLoader(cl); + } + public ContextHandler executionContextHandler() { return new ContextHandler() { @Override