Skip to content

Commit

Permalink
Vert.x TCCL fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Aug 18, 2021
1 parent 77d652e commit 4a87bb2
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ void registerVerticleClasses(CombinedIndexBuildItem indexBuildItem,

@BuildStep
@Record(ExecutionTime.RUNTIME_INIT)
ThreadFactoryBuildItem createVertxThreadFactory(VertxCoreRecorder recorder) {
return new ThreadFactoryBuildItem(recorder.createThreadFactory());
ThreadFactoryBuildItem createVertxThreadFactory(VertxCoreRecorder recorder, LaunchModeBuildItem launchMode) {
return new ThreadFactoryBuildItem(recorder.createThreadFactory(launchMode.getLaunchMode()));
}

@BuildStep
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
import static io.quarkus.vertx.core.runtime.SSLConfigHelper.configurePfxTrustOptions;

import java.io.File;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -18,6 +15,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
Expand All @@ -29,11 +27,9 @@

import org.jboss.logging.Logger;
import org.jboss.threads.ContextHandler;
import org.jboss.threads.EnhancedQueueExecutor;
import org.wildfly.common.cpu.ProcessorInfo;

import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.FastThreadLocal;
import io.quarkus.runtime.IOThreadDetector;
import io.quarkus.runtime.LaunchMode;
Expand All @@ -57,7 +53,6 @@
import io.vertx.core.impl.VertxBuilder;
import io.vertx.core.impl.VertxImpl;
import io.vertx.core.impl.VertxThread;
import io.vertx.core.impl.WorkerPool;
import io.vertx.core.spi.VertxThreadFactory;
import io.vertx.core.spi.resolver.ResolverProvider;

Expand All @@ -81,6 +76,8 @@ public class VertxCoreRecorder {
*/
private static volatile String webDeploymentId;

private static final Set<Thread> devModeThreads = new CopyOnWriteArraySet<>();

public Supplier<Vertx> configureVertx(VertxConfiguration config,
LaunchMode launchMode, ShutdownContext shutdown, List<Consumer<VertxOptions>> customizers,
ExecutorService executorProxy) {
Expand All @@ -100,7 +97,7 @@ public void run() {
if (vertx == null) {
vertx = new VertxSupplier(launchMode, config, customizers, shutdown);
} else if (vertx.v != null) {
tryCleanTccl(vertx.v);
tryCleanTccl();
}
shutdown.addLastShutdownTask(new Runnable() {
@Override
Expand Down Expand Up @@ -135,74 +132,9 @@ public void handle(AsyncResult<Void> event) {
return vertx;
}

private void tryCleanTccl(Vertx devModeVertx) {
//this is a best effort attempt to clean out the old TCCL from
private void tryCleanTccl() {
ClassLoader cl = Thread.currentThread().getContextClassLoader();

resetExecutorsClassloaderContext(extractWorkerPool(devModeVertx), cl);
resetExecutorsClassloaderContext(extractInternalWorkerPool(devModeVertx), cl);

EventLoopGroup group = ((VertxImpl) devModeVertx).getEventLoopGroup();
for (EventExecutor i : group) {
i.execute(new Runnable() {

@Override
public void run() {
Thread.currentThread().setContextClassLoader(cl);
}

});
}

}

private WorkerPool extractInternalWorkerPool(Vertx devModeVertx) {
VertxImpl vertxImpl = (VertxImpl) devModeVertx;
final Object internalWorkerPool;
final Field field;
try {
field = VertxImpl.class.getDeclaredField("internalWorkerPool");
field.setAccessible(true);
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
}
try {
internalWorkerPool = field.get(vertxImpl);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}

return (WorkerPool) internalWorkerPool;
}

private WorkerPool extractWorkerPool(Vertx devModeVertx) {
final ContextInternal ctx = (ContextInternal) devModeVertx.getOrCreateContext();
return ctx.workerPool();
}

/**
* Extract the JBoss Threads EnhancedQueueExecutor from the Vertx instance
* and reset all threads to use the given ClassLoader.
* This is messy as it needs to use reflection until Vertx can expose it:
* - https://github.com/eclipse-vertx/vert.x/pull/4029
*/
private void resetExecutorsClassloaderContext(WorkerPool workerPool, ClassLoader cl) {
final Method executorMethod;
try {
executorMethod = WorkerPool.class.getDeclaredMethod("executor");
executorMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
}
final Object result;
try {
result = executorMethod.invoke(workerPool);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
EnhancedQueueExecutor executor = (EnhancedQueueExecutor) result;
final Thread[] runningThreads = executor.getRunningThreads();
for (Thread t : runningThreads) {
for (var t : devModeThreads) {
t.setContextClassLoader(cl);
}
}
Expand Down Expand Up @@ -230,6 +162,7 @@ public void handle(AsyncResult<Void> event) {
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
devModeThreads.clear();
}
}

Expand All @@ -253,9 +186,23 @@ public static Vertx initialize(VertxConfiguration conf, VertxOptionsCustomizer c

Vertx vertx;

ClassLoader tccl = Thread.currentThread().getContextClassLoader();
VertxThreadFactory vertxThreadFactory = new VertxThreadFactory() {
@Override
public VertxThread newVertxThread(Runnable target, String name, boolean worker, long maxExecTime,
TimeUnit maxExecTimeUnit) {
var thread = VertxThreadFactory.INSTANCE.newVertxThread(target, name, worker, maxExecTime, maxExecTimeUnit);
thread.setContextClassLoader(tccl);
if (launchMode == LaunchMode.DEVELOPMENT) {
devModeThreads.add(thread);
}
return thread;
}
};
if (conf != null && conf.cluster != null && conf.cluster.clustered) {
CompletableFuture<Vertx> latch = new CompletableFuture<>();
new VertxBuilder(options)
.threadFactory(vertxThreadFactory)
.executorServiceFactory(new QuarkusExecutorFactory(conf, launchMode))
.init().clusteredVertx(new Handler<AsyncResult<Vertx>>() {
@Override
Expand All @@ -270,6 +217,7 @@ public void handle(AsyncResult<Vertx> ar) {
vertx = latch.join();
} else {
vertx = new VertxBuilder(options)
.threadFactory(vertxThreadFactory)
.executorServiceFactory(new QuarkusExecutorFactory(conf, launchMode))
.init().vertx();
}
Expand Down Expand Up @@ -509,14 +457,17 @@ public Integer get() {
};
}

public ThreadFactory createThreadFactory() {
public ThreadFactory createThreadFactory(LaunchMode launchMode) {
ClassLoader tccl = Thread.currentThread().getContextClassLoader();
AtomicInteger threadCount = new AtomicInteger(0);
return runnable -> {
VertxThread thread = VertxThreadFactory.INSTANCE.newVertxThread(runnable,
"executor-thread-" + threadCount.getAndIncrement(), true, 0, null);
thread.setDaemon(true);
thread.setContextClassLoader(tccl);
if (launchMode == LaunchMode.DEVELOPMENT) {
devModeThreads.add(thread);
}
return thread;
};
}
Expand Down

0 comments on commit 4a87bb2

Please sign in to comment.