diff --git a/extensions/vertx-core/deployment/src/main/java/io/quarkus/vertx/core/deployment/VertxCoreProcessor.java b/extensions/vertx-core/deployment/src/main/java/io/quarkus/vertx/core/deployment/VertxCoreProcessor.java
index ef373c89186b86..608b4ac8849628 100644
--- a/extensions/vertx-core/deployment/src/main/java/io/quarkus/vertx/core/deployment/VertxCoreProcessor.java
+++ b/extensions/vertx-core/deployment/src/main/java/io/quarkus/vertx/core/deployment/VertxCoreProcessor.java
@@ -147,8 +147,8 @@ void registerVerticleClasses(CombinedIndexBuildItem indexBuildItem,
@BuildStep
@Record(ExecutionTime.RUNTIME_INIT)
- ThreadFactoryBuildItem createVertxThreadFactory(VertxCoreRecorder recorder) {
- return new ThreadFactoryBuildItem(recorder.createThreadFactory());
+ ThreadFactoryBuildItem createVertxThreadFactory(VertxCoreRecorder recorder, LaunchModeBuildItem launchMode) {
+ return new ThreadFactoryBuildItem(recorder.createThreadFactory(launchMode.getLaunchMode()));
}
@BuildStep
diff --git a/extensions/vertx-core/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxCoreRecorder.java b/extensions/vertx-core/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxCoreRecorder.java
index f744cbd604fc4d..c50e4240155050 100644
--- a/extensions/vertx-core/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxCoreRecorder.java
+++ b/extensions/vertx-core/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxCoreRecorder.java
@@ -7,14 +7,12 @@
import static io.quarkus.vertx.core.runtime.SSLConfigHelper.configurePfxTrustOptions;
import java.io.File;
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -29,11 +27,9 @@
import org.jboss.logging.Logger;
import org.jboss.threads.ContextHandler;
-import org.jboss.threads.EnhancedQueueExecutor;
import org.wildfly.common.cpu.ProcessorInfo;
import io.netty.channel.EventLoopGroup;
-import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.FastThreadLocal;
import io.quarkus.runtime.IOThreadDetector;
import io.quarkus.runtime.LaunchMode;
@@ -57,7 +53,6 @@
import io.vertx.core.impl.VertxBuilder;
import io.vertx.core.impl.VertxImpl;
import io.vertx.core.impl.VertxThread;
-import io.vertx.core.impl.WorkerPool;
import io.vertx.core.spi.VertxThreadFactory;
import io.vertx.core.spi.resolver.ResolverProvider;
@@ -81,6 +76,18 @@ public class VertxCoreRecorder {
*/
private static volatile String webDeploymentId;
+ /**
+ * All current dev mode threads, accessed under lock
+ *
+ * This allows them to have their TCCL updated on restart
+ */
+ private static final Set devModeThreads = new HashSet<>();
+ /**
+ * The class loader to use for new threads in dev mode. This must be accessed under the {@link #devModeThreads} lock, to
+ * avoid race conditions.
+ */
+ private static volatile ClassLoader currentDevModeNewThreadCreationClassLoader;
+
public Supplier configureVertx(VertxConfiguration config,
LaunchMode launchMode, ShutdownContext shutdown, List> customizers,
ExecutorService executorProxy) {
@@ -94,13 +101,14 @@ public Supplier configureVertx(VertxConfiguration config,
public void run() {
destroy();
QuarkusExecutorFactory.sharedExecutor = null;
+ currentDevModeNewThreadCreationClassLoader = null;
}
});
} else {
if (vertx == null) {
vertx = new VertxSupplier(launchMode, config, customizers, shutdown);
} else if (vertx.v != null) {
- tryCleanTccl(vertx.v);
+ tryCleanTccl();
}
shutdown.addLastShutdownTask(new Runnable() {
@Override
@@ -135,75 +143,13 @@ public void handle(AsyncResult event) {
return vertx;
}
- private void tryCleanTccl(Vertx devModeVertx) {
- //this is a best effort attempt to clean out the old TCCL from
+ private void tryCleanTccl() {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
-
- resetExecutorsClassloaderContext(extractWorkerPool(devModeVertx), cl);
- resetExecutorsClassloaderContext(extractInternalWorkerPool(devModeVertx), cl);
-
- EventLoopGroup group = ((VertxImpl) devModeVertx).getEventLoopGroup();
- for (EventExecutor i : group) {
- i.execute(new Runnable() {
-
- @Override
- public void run() {
- Thread.currentThread().setContextClassLoader(cl);
- }
-
- });
- }
-
- }
-
- private WorkerPool extractInternalWorkerPool(Vertx devModeVertx) {
- VertxImpl vertxImpl = (VertxImpl) devModeVertx;
- final Object internalWorkerPool;
- final Field field;
- try {
- field = VertxImpl.class.getDeclaredField("internalWorkerPool");
- field.setAccessible(true);
- } catch (NoSuchFieldException e) {
- throw new RuntimeException(e);
- }
- try {
- internalWorkerPool = field.get(vertxImpl);
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e);
- }
-
- return (WorkerPool) internalWorkerPool;
- }
-
- private WorkerPool extractWorkerPool(Vertx devModeVertx) {
- final ContextInternal ctx = (ContextInternal) devModeVertx.getOrCreateContext();
- return ctx.workerPool();
- }
-
- /**
- * Extract the JBoss Threads EnhancedQueueExecutor from the Vertx instance
- * and reset all threads to use the given ClassLoader.
- * This is messy as it needs to use reflection until Vertx can expose it:
- * - https://github.com/eclipse-vertx/vert.x/pull/4029
- */
- private void resetExecutorsClassloaderContext(WorkerPool workerPool, ClassLoader cl) {
- final Method executorMethod;
- try {
- executorMethod = WorkerPool.class.getDeclaredMethod("executor");
- executorMethod.setAccessible(true);
- } catch (NoSuchMethodException e) {
- throw new RuntimeException(e);
- }
- final Object result;
- try {
- result = executorMethod.invoke(workerPool);
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- EnhancedQueueExecutor executor = (EnhancedQueueExecutor) result;
- final Thread[] runningThreads = executor.getRunningThreads();
- for (Thread t : runningThreads) {
- t.setContextClassLoader(cl);
+ synchronized (devModeThreads) {
+ currentDevModeNewThreadCreationClassLoader = cl;
+ for (var t : devModeThreads) {
+ t.setContextClassLoader(cl);
+ }
}
}
@@ -229,6 +175,11 @@ public void handle(AsyncResult event) {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
+ } finally {
+ synchronized (devModeThreads) {
+ devModeThreads.clear();
+ currentDevModeNewThreadCreationClassLoader = null;
+ }
}
}
}
@@ -253,9 +204,34 @@ public static Vertx initialize(VertxConfiguration conf, VertxOptionsCustomizer c
Vertx vertx;
+ Optional nonDevModeTccl;
+ if (launchMode == LaunchMode.DEVELOPMENT) {
+ currentDevModeNewThreadCreationClassLoader = Thread.currentThread().getContextClassLoader();
+ nonDevModeTccl = Optional.empty(); //in dev mode we don't want to capture the original TCCL to stop a leak
+ } else {
+ nonDevModeTccl = Optional.of(Thread.currentThread().getContextClassLoader());
+ }
+ VertxThreadFactory vertxThreadFactory = new VertxThreadFactory() {
+ @Override
+ public VertxThread newVertxThread(Runnable target, String name, boolean worker, long maxExecTime,
+ TimeUnit maxExecTimeUnit) {
+ var thread = VertxThreadFactory.INSTANCE.newVertxThread(target, name, worker, maxExecTime, maxExecTimeUnit);
+
+ if (launchMode == LaunchMode.DEVELOPMENT) {
+ synchronized (devModeThreads) {
+ setNewThreadTccl(thread);
+ devModeThreads.add(thread);
+ }
+ } else {
+ thread.setContextClassLoader(nonDevModeTccl.get());
+ }
+ return thread;
+ }
+ };
if (conf != null && conf.cluster != null && conf.cluster.clustered) {
CompletableFuture latch = new CompletableFuture<>();
new VertxBuilder(options)
+ .threadFactory(vertxThreadFactory)
.executorServiceFactory(new QuarkusExecutorFactory(conf, launchMode))
.init().clusteredVertx(new Handler>() {
@Override
@@ -270,6 +246,7 @@ public void handle(AsyncResult ar) {
vertx = latch.join();
} else {
vertx = new VertxBuilder(options)
+ .threadFactory(vertxThreadFactory)
.executorServiceFactory(new QuarkusExecutorFactory(conf, launchMode))
.init().vertx();
}
@@ -509,18 +486,41 @@ public Integer get() {
};
}
- public ThreadFactory createThreadFactory() {
- ClassLoader tccl = Thread.currentThread().getContextClassLoader();
+ public ThreadFactory createThreadFactory(LaunchMode launchMode) {
+ Optional nonDevModeTccl;
+ if (launchMode == LaunchMode.DEVELOPMENT) {
+ currentDevModeNewThreadCreationClassLoader = Thread.currentThread().getContextClassLoader();
+ nonDevModeTccl = Optional.empty(); //in dev mode we don't want to capture the original TCCL to stop a leak
+ } else {
+ nonDevModeTccl = Optional.of(Thread.currentThread().getContextClassLoader());
+ }
AtomicInteger threadCount = new AtomicInteger(0);
return runnable -> {
VertxThread thread = VertxThreadFactory.INSTANCE.newVertxThread(runnable,
"executor-thread-" + threadCount.getAndIncrement(), true, 0, null);
thread.setDaemon(true);
- thread.setContextClassLoader(tccl);
+ if (launchMode == LaunchMode.DEVELOPMENT) {
+ synchronized (devModeThreads) {
+ setNewThreadTccl(thread);
+ devModeThreads.add(thread);
+ }
+ } else {
+ thread.setContextClassLoader(nonDevModeTccl.get());
+ }
return thread;
};
}
+ private static void setNewThreadTccl(VertxThread thread) {
+ ClassLoader cl = VertxCoreRecorder.currentDevModeNewThreadCreationClassLoader;
+ if (cl == null) {
+ //can happen if a thread is created after shutdown is initiated
+ //should be super rare, but might as well handle it properly
+ cl = VertxCoreRecorder.class.getClassLoader();
+ }
+ thread.setContextClassLoader(cl);
+ }
+
public ContextHandler