Skip to content

Commit

Permalink
Merge pull request #19487 from stuartwdouglas/reset-tccl
Browse files Browse the repository at this point in the history
Vert.x TCCL fixes
  • Loading branch information
famod authored Aug 20, 2021
2 parents 9abb418 + 8d4a650 commit 0cd5c9f
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ void registerVerticleClasses(CombinedIndexBuildItem indexBuildItem,

@BuildStep
@Record(ExecutionTime.RUNTIME_INIT)
ThreadFactoryBuildItem createVertxThreadFactory(VertxCoreRecorder recorder) {
return new ThreadFactoryBuildItem(recorder.createThreadFactory());
ThreadFactoryBuildItem createVertxThreadFactory(VertxCoreRecorder recorder, LaunchModeBuildItem launchMode) {
return new ThreadFactoryBuildItem(recorder.createThreadFactory(launchMode.getLaunchMode()));
}

@BuildStep
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@
import static io.quarkus.vertx.core.runtime.SSLConfigHelper.configurePfxTrustOptions;

import java.io.File;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand All @@ -29,11 +27,9 @@

import org.jboss.logging.Logger;
import org.jboss.threads.ContextHandler;
import org.jboss.threads.EnhancedQueueExecutor;
import org.wildfly.common.cpu.ProcessorInfo;

import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.FastThreadLocal;
import io.quarkus.runtime.IOThreadDetector;
import io.quarkus.runtime.LaunchMode;
Expand All @@ -57,7 +53,6 @@
import io.vertx.core.impl.VertxBuilder;
import io.vertx.core.impl.VertxImpl;
import io.vertx.core.impl.VertxThread;
import io.vertx.core.impl.WorkerPool;
import io.vertx.core.spi.VertxThreadFactory;
import io.vertx.core.spi.resolver.ResolverProvider;

Expand All @@ -81,6 +76,19 @@ public class VertxCoreRecorder {
*/
private static volatile String webDeploymentId;

/**
* All current dev mode threads, accessed under lock
* <p>
* This allows them to have their TCCL updated on restart
*/
private static final Set<Thread> devModeThreads = new HashSet<>();
/**
* The class loader to use for new threads in dev mode. On dev mode restart this must be updated under the
* {@link #devModeThreads} lock, to
* avoid race conditions.
*/
private static volatile ClassLoader currentDevModeNewThreadCreationClassLoader;

public Supplier<Vertx> configureVertx(VertxConfiguration config,
LaunchMode launchMode, ShutdownContext shutdown, List<Consumer<VertxOptions>> customizers,
ExecutorService executorProxy) {
Expand All @@ -94,13 +102,14 @@ public Supplier<Vertx> configureVertx(VertxConfiguration config,
public void run() {
destroy();
QuarkusExecutorFactory.sharedExecutor = null;
currentDevModeNewThreadCreationClassLoader = null;
}
});
} else {
if (vertx == null) {
vertx = new VertxSupplier(launchMode, config, customizers, shutdown);
} else if (vertx.v != null) {
tryCleanTccl(vertx.v);
tryCleanTccl();
}
shutdown.addLastShutdownTask(new Runnable() {
@Override
Expand Down Expand Up @@ -135,75 +144,13 @@ public void handle(AsyncResult<Void> event) {
return vertx;
}

private void tryCleanTccl(Vertx devModeVertx) {
//this is a best effort attempt to clean out the old TCCL from
private void tryCleanTccl() {
ClassLoader cl = Thread.currentThread().getContextClassLoader();

resetExecutorsClassloaderContext(extractWorkerPool(devModeVertx), cl);
resetExecutorsClassloaderContext(extractInternalWorkerPool(devModeVertx), cl);

EventLoopGroup group = ((VertxImpl) devModeVertx).getEventLoopGroup();
for (EventExecutor i : group) {
i.execute(new Runnable() {

@Override
public void run() {
Thread.currentThread().setContextClassLoader(cl);
}

});
}

}

private WorkerPool extractInternalWorkerPool(Vertx devModeVertx) {
VertxImpl vertxImpl = (VertxImpl) devModeVertx;
final Object internalWorkerPool;
final Field field;
try {
field = VertxImpl.class.getDeclaredField("internalWorkerPool");
field.setAccessible(true);
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
}
try {
internalWorkerPool = field.get(vertxImpl);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}

return (WorkerPool) internalWorkerPool;
}

private WorkerPool extractWorkerPool(Vertx devModeVertx) {
final ContextInternal ctx = (ContextInternal) devModeVertx.getOrCreateContext();
return ctx.workerPool();
}

/**
* Extract the JBoss Threads EnhancedQueueExecutor from the Vertx instance
* and reset all threads to use the given ClassLoader.
* This is messy as it needs to use reflection until Vertx can expose it:
* - https://github.com/eclipse-vertx/vert.x/pull/4029
*/
private void resetExecutorsClassloaderContext(WorkerPool workerPool, ClassLoader cl) {
final Method executorMethod;
try {
executorMethod = WorkerPool.class.getDeclaredMethod("executor");
executorMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
}
final Object result;
try {
result = executorMethod.invoke(workerPool);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
EnhancedQueueExecutor executor = (EnhancedQueueExecutor) result;
final Thread[] runningThreads = executor.getRunningThreads();
for (Thread t : runningThreads) {
t.setContextClassLoader(cl);
synchronized (devModeThreads) {
currentDevModeNewThreadCreationClassLoader = cl;
for (var t : devModeThreads) {
t.setContextClassLoader(cl);
}
}
}

Expand All @@ -229,6 +176,11 @@ public void handle(AsyncResult<Void> event) {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
synchronized (devModeThreads) {
devModeThreads.clear();
currentDevModeNewThreadCreationClassLoader = null;
}
}
}
}
Expand All @@ -253,9 +205,18 @@ public static Vertx initialize(VertxConfiguration conf, VertxOptionsCustomizer c

Vertx vertx;

Optional<ClassLoader> nonDevModeTccl = setupThreadFactoryTccl(launchMode);
VertxThreadFactory vertxThreadFactory = new VertxThreadFactory() {
@Override
public VertxThread newVertxThread(Runnable target, String name, boolean worker, long maxExecTime,
TimeUnit maxExecTimeUnit) {
return createVertxThread(target, name, worker, maxExecTime, maxExecTimeUnit, launchMode, nonDevModeTccl);
}
};
if (conf != null && conf.cluster != null && conf.cluster.clustered) {
CompletableFuture<Vertx> latch = new CompletableFuture<>();
new VertxBuilder(options)
.threadFactory(vertxThreadFactory)
.executorServiceFactory(new QuarkusExecutorFactory(conf, launchMode))
.init().clusteredVertx(new Handler<AsyncResult<Vertx>>() {
@Override
Expand All @@ -270,6 +231,7 @@ public void handle(AsyncResult<Vertx> ar) {
vertx = latch.join();
} else {
vertx = new VertxBuilder(options)
.threadFactory(vertxThreadFactory)
.executorServiceFactory(new QuarkusExecutorFactory(conf, launchMode))
.init().vertx();
}
Expand All @@ -283,6 +245,40 @@ public void handle(Throwable error) {
return logVertxInitialization(vertx);
}

/**
* Depending on the launch mode we may need do handle the TCCL differently.
*
* For dev mode it can change, so we don't want to capture the original TCCL (as this would be a leak). For other modes we
* just want a fixed TCCL, and leaks are not an issue.
*
* @param launchMode The launch mode
* @return The ClassLoader if we are not running in dev mode
*/
private static Optional<ClassLoader> setupThreadFactoryTccl(LaunchMode launchMode) {
Optional<ClassLoader> nonDevModeTccl;
if (launchMode == LaunchMode.DEVELOPMENT) {
currentDevModeNewThreadCreationClassLoader = Thread.currentThread().getContextClassLoader();
nonDevModeTccl = Optional.empty(); //in dev mode we don't want to capture the original TCCL to stop a leak
} else {
nonDevModeTccl = Optional.of(Thread.currentThread().getContextClassLoader());
}
return nonDevModeTccl;
}

private static VertxThread createVertxThread(Runnable target, String name, boolean worker, long maxExecTime,
TimeUnit maxExecTimeUnit, LaunchMode launchMode, Optional<ClassLoader> nonDevModeTccl) {
var thread = VertxThreadFactory.INSTANCE.newVertxThread(target, name, worker, maxExecTime, maxExecTimeUnit);
if (launchMode == LaunchMode.DEVELOPMENT) {
synchronized (devModeThreads) {
setNewThreadTccl(thread);
devModeThreads.add(thread);
}
} else {
thread.setContextClassLoader(nonDevModeTccl.get());
}
return thread;
}

private static Vertx logVertxInitialization(Vertx vertx) {
LOGGER.debugf("Vertx has Native Transport Enabled: %s", vertx.isNativeTransportEnabled());
return vertx;
Expand Down Expand Up @@ -509,18 +505,27 @@ public Integer get() {
};
}

public ThreadFactory createThreadFactory() {
ClassLoader tccl = Thread.currentThread().getContextClassLoader();
public ThreadFactory createThreadFactory(LaunchMode launchMode) {
Optional<ClassLoader> nonDevModeTccl = setupThreadFactoryTccl(launchMode);
AtomicInteger threadCount = new AtomicInteger(0);
return runnable -> {
VertxThread thread = VertxThreadFactory.INSTANCE.newVertxThread(runnable,
"executor-thread-" + threadCount.getAndIncrement(), true, 0, null);
VertxThread thread = createVertxThread(runnable,
"executor-thread-" + threadCount.getAndIncrement(), true, 0, null, launchMode, nonDevModeTccl);
thread.setDaemon(true);
thread.setContextClassLoader(tccl);
return thread;
};
}

private static void setNewThreadTccl(VertxThread thread) {
ClassLoader cl = VertxCoreRecorder.currentDevModeNewThreadCreationClassLoader;
if (cl == null) {
//can happen if a thread is created after shutdown is initiated
//should be super rare, but might as well handle it properly
cl = VertxCoreRecorder.class.getClassLoader();
}
thread.setContextClassLoader(cl);
}

public ContextHandler<Object> executionContextHandler() {
return new ContextHandler<Object>() {
@Override
Expand Down

0 comments on commit 0cd5c9f

Please sign in to comment.