From 3be4c1a6cf6ee78fef9f6839b25b09c187ee46d3 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Tue, 25 Jul 2023 14:31:03 +0200 Subject: [PATCH] Virtual threads extension providing executor supplier for running virtual threads - Handles executor close on shutdown hook - Creates named virtual threads --- bom/application/pom.xml | 10 ++ devtools/bom-descriptor-json/pom.xml | 13 ++ docs/pom.xml | 13 ++ docs/src/main/asciidoc/virtual-threads.adoc | 13 ++ extensions/grpc/deployment/pom.xml | 4 + extensions/grpc/runtime/pom.xml | 4 + .../grpc/runtime/GrpcServerRecorder.java | 80 +-------- extensions/pom.xml | 1 + .../deployment/pom.xml | 4 + .../quarkus-resteasy-reactive/runtime/pom.xml | 4 + .../runtime/ResteasyReactiveRecorder.java | 80 +-------- .../deployment/pom.xml | 4 + .../SmallRyeReactiveMessagingProcessor.java | 7 - .../runtime/pom.xml | 4 + .../runtime/QuarkusWorkerPoolRegistry.java | 74 +------- extensions/virtual-threads/deployment/pom.xml | 51 ++++++ .../threads/VirtualThreadsProcessor.java | 19 ++ extensions/virtual-threads/pom.xml | 23 +++ extensions/virtual-threads/runtime/pom.xml | 70 ++++++++ .../ContextPreservingExecutorService.java | 105 +++++++++++ .../virtual/threads/VirtualThreadsConfig.java | 32 ++++ .../threads/VirtualThreadsRecorder.java | 166 ++++++++++++++++++ .../resources/META-INF/quarkus-extension.yaml | 10 ++ .../VirtualThreadExecutorSupplierTest.java | 61 +++++++ 24 files changed, 622 insertions(+), 230 deletions(-) create mode 100644 extensions/virtual-threads/deployment/pom.xml create mode 100644 extensions/virtual-threads/deployment/src/main/java/io/quarkus/virtual/threads/VirtualThreadsProcessor.java create mode 100644 extensions/virtual-threads/pom.xml create mode 100644 extensions/virtual-threads/runtime/pom.xml create mode 100644 extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/ContextPreservingExecutorService.java create mode 100644 extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreadsConfig.java create mode 100644 extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreadsRecorder.java create mode 100644 extensions/virtual-threads/runtime/src/main/resources/META-INF/quarkus-extension.yaml create mode 100644 extensions/virtual-threads/runtime/src/test/java/io/quarkus/virtual/threads/VirtualThreadExecutorSupplierTest.java diff --git a/bom/application/pom.xml b/bom/application/pom.xml index 231b9e9986005..7bb8927d0c413 100644 --- a/bom/application/pom.xml +++ b/bom/application/pom.xml @@ -2948,6 +2948,16 @@ quarkus-info ${project.version} + + io.quarkus + quarkus-virtual-threads + ${project.version} + + + io.quarkus + quarkus-virtual-threads-deployment + ${project.version} + diff --git a/devtools/bom-descriptor-json/pom.xml b/devtools/bom-descriptor-json/pom.xml index 013478b495120..946fcea64de2c 100644 --- a/devtools/bom-descriptor-json/pom.xml +++ b/devtools/bom-descriptor-json/pom.xml @@ -2839,6 +2839,19 @@ + + io.quarkus + quarkus-virtual-threads + ${project.version} + pom + test + + + * + * + + + io.quarkus quarkus-webjars-locator diff --git a/docs/pom.xml b/docs/pom.xml index 970a3d8332421..1f66edd36b5d4 100644 --- a/docs/pom.xml +++ b/docs/pom.xml @@ -2853,6 +2853,19 @@ + + io.quarkus + quarkus-virtual-threads-deployment + ${project.version} + pom + test + + + * + * + + + io.quarkus quarkus-webjars-locator-deployment diff --git a/docs/src/main/asciidoc/virtual-threads.adoc b/docs/src/main/asciidoc/virtual-threads.adoc index 851c1f8ce682d..946c009dbc637 100644 --- a/docs/src/main/asciidoc/virtual-threads.adoc +++ b/docs/src/main/asciidoc/virtual-threads.adoc @@ -450,6 +450,19 @@ So, the data written in the duplicated context (and the request scope, as the re However, thread locals are not propagated. +== Virtual thread names + +Virtual threads are created without a thread name by default, which is not practical to identify the execution for debugging and logging purposes. +Quarkus managed virtual threads are named and prefixed with `quarkus-virtual-thread-`. +You can customize this prefix, or disable the naming altogether configuring an empty value: + +[source, properties] +---- +quarkus.virtual-threads.name-prefix= + +---- + + == Additional references - https://dl.acm.org/doi/10.1145/3583678.3596895[Considerations for integrating virtual threads in a Java framework: a Quarkus example in a resource-constrained environment] \ No newline at end of file diff --git a/extensions/grpc/deployment/pom.xml b/extensions/grpc/deployment/pom.xml index 5ec16775a0ed5..e5639e2ea0477 100644 --- a/extensions/grpc/deployment/pom.xml +++ b/extensions/grpc/deployment/pom.xml @@ -51,6 +51,10 @@ quarkus-smallrye-health-deployment true + + io.quarkus + quarkus-virtual-threads-deployment + io.quarkus diff --git a/extensions/grpc/runtime/pom.xml b/extensions/grpc/runtime/pom.xml index 9a0404be39605..a046721e15876 100644 --- a/extensions/grpc/runtime/pom.xml +++ b/extensions/grpc/runtime/pom.xml @@ -45,6 +45,10 @@ io.quarkus quarkus-smallrye-stork + + io.quarkus + quarkus-virtual-threads + io.quarkus diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java index de8af94ba457d..6c9e3654828e9 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java @@ -6,7 +6,6 @@ import java.io.File; import java.io.IOException; import java.io.UncheckedIOException; -import java.lang.reflect.InvocationTargetException; import java.net.BindException; import java.time.Duration; import java.util.AbstractMap; @@ -20,13 +19,10 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Supplier; import java.util.regex.Pattern; import jakarta.enterprise.inject.Instance; @@ -63,7 +59,7 @@ import io.quarkus.runtime.ShutdownContext; import io.quarkus.runtime.annotations.Recorder; import io.quarkus.vertx.http.runtime.PortSystemProperties; -import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.quarkus.virtual.threads.VirtualThreadsRecorder; import io.vertx.core.AbstractVerticle; import io.vertx.core.AsyncResult; import io.vertx.core.Context; @@ -71,7 +67,6 @@ import io.vertx.core.Handler; import io.vertx.core.Promise; import io.vertx.core.Vertx; -import io.vertx.core.impl.ContextInternal; import io.vertx.ext.web.Route; import io.vertx.ext.web.Router; import io.vertx.ext.web.RoutingContext; @@ -588,7 +583,8 @@ private ServerServiceDefinition serviceWithInterceptors(Vertx vertx, GrpcContain List virtuals = virtualMethodsPerService.get(service.getImplementationClassName()); if (list != null || virtuals != null) { interceptors - .add(new BlockingServerInterceptor(vertx, list, virtuals, VIRTUAL_EXECUTOR_SUPPLIER.get(), devMode)); + .add(new BlockingServerInterceptor(vertx, list, virtuals, + VirtualThreadsRecorder.getCurrent(), devMode)); } } return ServerInterceptors.intercept(service.definition, interceptors); @@ -728,74 +724,4 @@ public void run(Runnable command) { } } - public static final Supplier VIRTUAL_EXECUTOR_SUPPLIER = new Supplier<>() { - Executor current = null; - - /** - * This method uses reflection in order to allow developers to quickly test quarkus-loom without needing to - * change --release, --source, --target flags and to enable previews. - * Since we try to load the "Loom-preview" classes/methods at runtime, the application can even be compiled - * using java 11 and executed with a loom-compliant JDK. - *

- * IMPORTANT: we still need to use a duplicated context to have all the propagation working. - * Thus, the context is captured and applied/terminated in the virtual thread. - */ - @Override - public Executor get() { - if (current == null) { - try { - var virtual = (Executor) Executors.class.getMethod("newVirtualThreadPerTaskExecutor") - .invoke(this); - current = new Executor() { - @Override - public void execute(Runnable command) { - var context = Vertx.currentContext(); - if (!(context instanceof ContextInternal)) { - virtual.execute(command); - } else { - virtual.execute(new Runnable() { - @Override - public void run() { - final var previousContext = ((ContextInternal) context).beginDispatch(); - try { - command.run(); - } finally { - ((ContextInternal) context).endDispatch(previousContext); - } - } - }); - } - } - }; - } catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) { - logger.debug("Unable to invoke java.util.concurrent.Executors#newVirtualThreadPerTaskExecutor", e); - //quite ugly but works - logger.warnf("You weren't able to create an executor that spawns virtual threads, the default" + - " blocking executor will be used, please check that your JDK is compatible with " + - "virtual threads"); - //if for some reason a class/method can't be loaded or invoked we return the traditional executor, - // wrapping executeBlocking. - current = new Executor() { - @Override - public void execute(Runnable command) { - var context = Vertx.currentContext(); - if (!(context instanceof ContextInternal)) { - Infrastructure.getDefaultWorkerPool().execute(command); - } else { - context.executeBlocking(fut -> { - try { - command.run(); - fut.complete(null); - } catch (Exception e) { - fut.fail(e); - } - }); - } - } - }; - } - } - return current; - } - }; } diff --git a/extensions/pom.xml b/extensions/pom.xml index e4c9d03f36e7d..453c4a3ba3f50 100644 --- a/extensions/pom.xml +++ b/extensions/pom.xml @@ -16,6 +16,7 @@ netty-loom-adaptor + virtual-threads arc scheduler diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/pom.xml b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/pom.xml index c133493289748..1d7bb6f5b839b 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/pom.xml +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/pom.xml @@ -54,6 +54,10 @@ io.quarkus quarkus-resteasy-reactive-common-deployment + + io.quarkus + quarkus-virtual-threads-deployment + io.quarkus quarkus-security-deployment diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/pom.xml b/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/pom.xml index b363b1d9b5a34..879e78d92fce7 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/pom.xml +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/pom.xml @@ -45,6 +45,10 @@ io.quarkus quarkus-jsonp + + io.quarkus + quarkus-virtual-threads + diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/ResteasyReactiveRecorder.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/ResteasyReactiveRecorder.java index 74bdd6d25443e..3a74e78586a49 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/ResteasyReactiveRecorder.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/ResteasyReactiveRecorder.java @@ -4,11 +4,8 @@ import static io.quarkus.vertx.http.runtime.security.HttpSecurityRecorder.DefaultAuthFailureHandler.extractRootCause; import java.io.Closeable; -import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -64,9 +61,8 @@ import io.quarkus.vertx.http.runtime.HttpBuildTimeConfig; import io.quarkus.vertx.http.runtime.security.HttpSecurityRecorder.DefaultAuthFailureHandler; import io.quarkus.vertx.http.runtime.security.QuarkusHttpUser; +import io.quarkus.virtual.threads.VirtualThreadsRecorder; import io.vertx.core.Handler; -import io.vertx.core.Vertx; -import io.vertx.core.impl.ContextInternal; import io.vertx.ext.web.RoutingContext; @Recorder @@ -80,78 +76,6 @@ public Executor get() { return ExecutorRecorder.getCurrent(); } }; - public static final Supplier VIRTUAL_EXECUTOR_SUPPLIER = new Supplier() { - Executor current = null; - - /** - * This method is used to specify a custom executor to dispatch virtual threads on carrier threads - * We need reflection for both ease of use (see {@link #get() Get} method) but also because we call methods - * of private classes from the java.lang package. - * - * It is used for testing purposes only for now - */ - private Executor setVirtualThreadCustomScheduler(Executor executor) throws ClassNotFoundException, - InvocationTargetException, InstantiationException, IllegalAccessException, NoSuchMethodException { - var vtf = Class.forName("java.lang.ThreadBuilders").getDeclaredClasses()[0]; - Constructor constructor = vtf.getDeclaredConstructors()[0]; - constructor.setAccessible(true); - ThreadFactory tf = (ThreadFactory) constructor.newInstance( - new Object[] { executor, "quarkus-virtual-factory-", 0, 0, - null }); - - return (Executor) Executors.class.getMethod("newThreadPerTaskExecutor", ThreadFactory.class) - .invoke(this, tf); - } - - /** - * This method uses reflection in order to allow developers to quickly test quarkus-loom without needing to - * change --release, --source, --target flags and to enable previews. - * Since we try to load the "Loom-preview" classes/methods at runtime, the application can even be compiled - * using java 11 and executed with a loom-compliant JDK. - *

- * IMPORTANT: we still need to use a duplicated context to have all the propagation working. - * Thus, the context is captured and applied/terminated in the virtual thread. - */ - @Override - public Executor get() { - if (current == null) { - try { - var virtual = (Executor) Executors.class.getMethod("newVirtualThreadPerTaskExecutor") - .invoke(this); - current = new Executor() { - @Override - public void execute(Runnable command) { - var context = Vertx.currentContext(); - if (!(context instanceof ContextInternal)) { - virtual.execute(command); - } else { - virtual.execute(new Runnable() { - @Override - public void run() { - final var previousContext = ((ContextInternal) context).beginDispatch(); - try { - command.run(); - } finally { - ((ContextInternal) context).endDispatch(previousContext); - } - } - }); - } - } - }; - } catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) { - logger.debug("Unable to invoke java.util.concurrent.Executors#newVirtualThreadPerTaskExecutor", e); - //quite ugly but works - logger.warnf("You weren't able to create an executor that spawns virtual threads, the default" + - " blocking executor will be used, please check that your JDK is compatible with " + - "virtual threads"); - //if for some reason a class/method can't be loaded or invoked we return the traditional EXECUTOR - current = EXECUTOR_SUPPLIER.get(); - } - } - return current; - } - }; static volatile Deployment currentDeployment; @@ -205,7 +129,7 @@ public ResteasyReactiveRequestContext createContext(Deployment deployment, } RuntimeDeploymentManager runtimeDeploymentManager = new RuntimeDeploymentManager(info, EXECUTOR_SUPPLIER, - VIRTUAL_EXECUTOR_SUPPLIER, + VirtualThreadsRecorder::getCurrent, closeTaskHandler, contextFactory, new ArcThreadSetupAction(beanContainer.requestContext()), vertxConfig.rootPath); Deployment deployment = runtimeDeploymentManager.deploy(); diff --git a/extensions/smallrye-reactive-messaging/deployment/pom.xml b/extensions/smallrye-reactive-messaging/deployment/pom.xml index 885efec6591ce..c99b6e63e260d 100644 --- a/extensions/smallrye-reactive-messaging/deployment/pom.xml +++ b/extensions/smallrye-reactive-messaging/deployment/pom.xml @@ -38,6 +38,10 @@ io.quarkus quarkus-vertx-deployment + + io.quarkus + quarkus-virtual-threads-deployment + org.commonmark commonmark diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java index a5b0955d7ff07..d2ebac1e6fa28 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java @@ -52,7 +52,6 @@ import io.quarkus.deployment.builditem.GeneratedClassBuildItem; import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem; import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; -import io.quarkus.deployment.builditem.nativeimage.RuntimeInitializedClassBuildItem; import io.quarkus.deployment.metrics.MetricsCapabilityBuildItem; import io.quarkus.deployment.recording.RecorderContext; import io.quarkus.gizmo.ClassCreator; @@ -114,12 +113,6 @@ AdditionalBeanBuildItem beans() { QuarkusWorkerPoolRegistry.class); } - @BuildStep - void nativeRuntimeInitClasses(BuildProducer runtimeInitClasses) { - runtimeInitClasses.produce(new RuntimeInitializedClassBuildItem( - "io.quarkus.smallrye.reactivemessaging.runtime.QuarkusWorkerPoolRegistry$VirtualExecutorSupplier")); - } - @BuildStep AnnotationsTransformerBuildItem transformBeanScope(BeanArchiveIndexBuildItem index, CustomScopeAnnotationsBuildItem scopes) { diff --git a/extensions/smallrye-reactive-messaging/runtime/pom.xml b/extensions/smallrye-reactive-messaging/runtime/pom.xml index fb63c1dce84a7..30cc9bd512573 100644 --- a/extensions/smallrye-reactive-messaging/runtime/pom.xml +++ b/extensions/smallrye-reactive-messaging/runtime/pom.xml @@ -28,6 +28,10 @@ io.quarkus quarkus-vertx + + io.quarkus + quarkus-virtual-threads + io.smallrye.reactive smallrye-reactive-messaging-provider diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusWorkerPoolRegistry.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusWorkerPoolRegistry.java index a8b0e0f000b7f..e93767d8aba6d 100644 --- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusWorkerPoolRegistry.java +++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusWorkerPoolRegistry.java @@ -1,15 +1,11 @@ package io.quarkus.smallrye.reactivemessaging.runtime; -import java.lang.reflect.InvocationTargetException; import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.function.Supplier; import jakarta.annotation.Priority; import jakarta.enterprise.context.ApplicationScoped; @@ -21,17 +17,14 @@ import org.eclipse.microprofile.config.ConfigProvider; import org.jboss.logging.Logger; -import org.slf4j.LoggerFactory; -import io.quarkus.runtime.ExecutorRecorder; +import io.quarkus.virtual.threads.VirtualThreadsRecorder; import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.annotations.Blocking; import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; import io.smallrye.reactive.messaging.providers.connectors.WorkerPoolRegistry; import io.smallrye.reactive.messaging.providers.helpers.Validation; -import io.vertx.core.Vertx; import io.vertx.core.impl.ConcurrentHashSet; -import io.vertx.core.impl.ContextInternal; import io.vertx.mutiny.core.Context; import io.vertx.mutiny.core.WorkerExecutor; @@ -41,7 +34,8 @@ // TODO: create a different entry for WorkerPoolRegistry than `analyzeWorker` and drop this class public class QuarkusWorkerPoolRegistry extends WorkerPoolRegistry { - private static final Logger logger = Logger.getLogger(QuarkusWorkerPoolRegistry.class); + private static final Logger log = Logger.getLogger(WorkerPoolRegistry.class); + private static final String WORKER_CONFIG_PREFIX = "smallrye.messaging.worker"; private static final String WORKER_CONCURRENCY = "max-concurrency"; public static final String DEFAULT_VIRTUAL_THREAD_WORKER = ""; @@ -59,61 +53,6 @@ private static Set initVirtualThreadWorkers() { return set; } - private enum VirtualExecutorSupplier implements Supplier { - Instance; - - private final Executor executor; - - /** - * This method uses reflection in order to allow developers to quickly test quarkus-loom without needing to - * change --release, --source, --target flags and to enable previews. - * Since we try to load the "Loom-preview" classes/methods at runtime, the application can even be compiled - * using java 11 and executed with a loom-compliant JDK. - */ - VirtualExecutorSupplier() { - Executor actual; - try { - var virtual = (Executor) Executors.class.getMethod("newVirtualThreadPerTaskExecutor") - .invoke(this); - actual = new Executor() { - @Override - public void execute(Runnable command) { - var context = Vertx.currentContext(); - if (!(context instanceof ContextInternal)) { - virtual.execute(command); - } else { - ContextInternal contextInternal = (ContextInternal) context; - virtual.execute(new Runnable() { - @Override - public void run() { - final var previousContext = contextInternal.beginDispatch(); - try { - command.run(); - } finally { - contextInternal.endDispatch(previousContext); - } - } - }); - } - } - }; - } catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) { - //quite ugly but works - logger.warnf(e, "You weren't able to create an executor that spawns virtual threads, the default" + - " blocking executor will be used, please check that your JDK is compatible with " + - "virtual threads"); - //if for some reason a class/method can't be loaded or invoked we return the traditional EXECUTOR - actual = ExecutorRecorder.getCurrent(); - } - this.executor = actual; - } - - @Override - public Executor get() { - return this.executor; - } - } - public void terminate( @Observes(notifyObserver = Reception.IF_EXISTS) @Priority(100) @BeforeDestroyed(ApplicationScoped.class) Object event) { if (!workerExecutors.isEmpty()) { @@ -151,7 +90,7 @@ public Uni executeWork(Context currentContext, Uni uni, String workerN } private Uni runOnVirtualThread(Context currentContext, Uni uni) { - return uni.runSubscriptionOn(VirtualExecutorSupplier.Instance.get()) + return uni.runSubscriptionOn(VirtualThreadsRecorder.getCurrent()) .onItemOrFailure().transformToUni((item, failure) -> { return Uni.createFrom().emitter(emitter -> { if (currentContext != null) { @@ -186,9 +125,8 @@ public WorkerExecutor getWorker(String workerName) { if (executor == null) { executor = executionHolder.vertx().createSharedWorkerExecutor(workerName, workerConcurrency.get(workerName)); - LoggerFactory.getLogger(WorkerPoolRegistry.class) - .info("Created worker pool named " + workerName + " with concurrency of " - + workerConcurrency.get(workerName)); + log.info("Created worker pool named " + workerName + " with concurrency of " + + workerConcurrency.get(workerName)); workerExecutors.put(workerName, executor); } } diff --git a/extensions/virtual-threads/deployment/pom.xml b/extensions/virtual-threads/deployment/pom.xml new file mode 100644 index 0000000000000..67a18d2b61479 --- /dev/null +++ b/extensions/virtual-threads/deployment/pom.xml @@ -0,0 +1,51 @@ + + + 4.0.0 + + + io.quarkus + quarkus-virtual-threads-parent + 999-SNAPSHOT + ../pom.xml + + + quarkus-virtual-threads-deployment + Quarkus - Virtual Threads - Deployment + + + + io.quarkus + quarkus-arc-deployment + + + io.quarkus + quarkus-virtual-threads + + + io.quarkus + quarkus-core-deployment + + + io.quarkus + quarkus-vertx-deployment + + + + + + maven-compiler-plugin + + + + io.quarkus + quarkus-extension-processor + ${project.version} + + + + + + + diff --git a/extensions/virtual-threads/deployment/src/main/java/io/quarkus/virtual/threads/VirtualThreadsProcessor.java b/extensions/virtual-threads/deployment/src/main/java/io/quarkus/virtual/threads/VirtualThreadsProcessor.java new file mode 100644 index 0000000000000..136bfa1f2bff8 --- /dev/null +++ b/extensions/virtual-threads/deployment/src/main/java/io/quarkus/virtual/threads/VirtualThreadsProcessor.java @@ -0,0 +1,19 @@ +package io.quarkus.virtual.threads; + +import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.annotations.ExecutionTime; +import io.quarkus.deployment.annotations.Record; +import io.quarkus.deployment.builditem.LaunchModeBuildItem; +import io.quarkus.deployment.builditem.ShutdownContextBuildItem; + +public class VirtualThreadsProcessor { + + @BuildStep + @Record(ExecutionTime.STATIC_INIT) + public void setup(VirtualThreadsConfig config, VirtualThreadsRecorder recorder, + ShutdownContextBuildItem shutdownContextBuildItem, + LaunchModeBuildItem launchModeBuildItem) { + recorder.setupVirtualThreads(config, shutdownContextBuildItem, launchModeBuildItem.getLaunchMode()); + } + +} diff --git a/extensions/virtual-threads/pom.xml b/extensions/virtual-threads/pom.xml new file mode 100644 index 0000000000000..e7a87fb197424 --- /dev/null +++ b/extensions/virtual-threads/pom.xml @@ -0,0 +1,23 @@ + + + 4.0.0 + + + quarkus-extensions-parent + io.quarkus + 999-SNAPSHOT + ../pom.xml + + + quarkus-virtual-threads-parent + Quarkus - Virtual Threads + pom + + + runtime + deployment + + + \ No newline at end of file diff --git a/extensions/virtual-threads/runtime/pom.xml b/extensions/virtual-threads/runtime/pom.xml new file mode 100644 index 0000000000000..948c7ce99de64 --- /dev/null +++ b/extensions/virtual-threads/runtime/pom.xml @@ -0,0 +1,70 @@ + + + 4.0.0 + + + io.quarkus + quarkus-virtual-threads-parent + 999-SNAPSHOT + ../pom.xml + + + quarkus-virtual-threads + Quarkus - Virtual Threads - Runtime + Virtual Threads Executor + + + + io.quarkus + quarkus-core + + + io.quarkus + quarkus-arc + + + io.quarkus + quarkus-vertx + + + io.quarkus + quarkus-junit5-internal + test + + + org.assertj + assertj-core + test + + + + + + + io.quarkus + quarkus-extension-maven-plugin + + + maven-compiler-plugin + + + + io.quarkus + quarkus-extension-processor + ${project.version} + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + --enable-preview + + + + + diff --git a/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/ContextPreservingExecutorService.java b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/ContextPreservingExecutorService.java new file mode 100644 index 0000000000000..09499bb96f67f --- /dev/null +++ b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/ContextPreservingExecutorService.java @@ -0,0 +1,105 @@ +package io.quarkus.virtual.threads; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import io.vertx.core.Vertx; +import io.vertx.core.impl.ContextInternal; + +/** + * Delegating executor service implementation preserving the Vert.x context on {@link #execute(Runnable)} + */ +class ContextPreservingExecutorService implements ExecutorService { + private final ExecutorService delegate; + + ContextPreservingExecutorService(final ExecutorService delegate) { + this.delegate = delegate; + } + + public void execute(final Runnable command) { + var context = Vertx.currentContext(); + if (!(context instanceof ContextInternal)) { + delegate.execute(command); + } else { + ContextInternal contextInternal = (ContextInternal) context; + delegate.execute(new Runnable() { + @Override + public void run() { + final var previousContext = contextInternal.beginDispatch(); + try { + command.run(); + } finally { + contextInternal.endDispatch(previousContext); + } + } + }); + } + } + + public boolean isShutdown() { + return delegate.isShutdown(); + } + + public boolean isTerminated() { + return delegate.isTerminated(); + } + + public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException { + return delegate.awaitTermination(timeout, unit); + } + + @Override + public Future submit(Callable task) { + return delegate.submit(task); + } + + @Override + public Future submit(Runnable task, T result) { + return delegate.submit(task, result); + } + + @Override + public Future submit(Runnable task) { + return delegate.submit(task); + } + + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException { + return delegate.invokeAll(tasks); + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException { + return delegate.invokeAll(tasks, timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + return delegate.invokeAny(tasks); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return delegate.invokeAny(tasks, timeout, unit); + } + + public void shutdown() { + delegate.shutdown(); + } + + public List shutdownNow() { + return delegate.shutdownNow(); + } + + public String toString() { + return delegate.toString(); + } +} diff --git a/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreadsConfig.java b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreadsConfig.java new file mode 100644 index 0000000000000..1e1b3aa24b87d --- /dev/null +++ b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreadsConfig.java @@ -0,0 +1,32 @@ +package io.quarkus.virtual.threads; + +import java.time.Duration; +import java.util.Optional; + +import io.quarkus.runtime.annotations.ConfigItem; +import io.quarkus.runtime.annotations.ConfigPhase; +import io.quarkus.runtime.annotations.ConfigRoot; + +@ConfigRoot(phase = ConfigPhase.BUILD_AND_RUN_TIME_FIXED) +public class VirtualThreadsConfig { + + /** + * Virtual thread name prefix. If left blank virtual threads will be unnamed. + */ + @ConfigItem(defaultValue = "quarkus-virtual-thread-") + Optional namePrefix; + + /** + * The shutdown timeout. If all pending work has not been completed by this time + * then any pending tasks will be interrupted, and the shutdown process will continue + */ + @ConfigItem(defaultValue = "1M") + public Duration shutdownTimeout; + + /** + * The frequency at which the status of the executor service should be checked during shutdown. + * Setting this key to an empty value disables the shutdown check interval. + */ + @ConfigItem(defaultValue = "5s") + public Optional shutdownCheckInterval; +} diff --git a/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreadsRecorder.java b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreadsRecorder.java new file mode 100644 index 0000000000000..c326cd433d1b2 --- /dev/null +++ b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreadsRecorder.java @@ -0,0 +1,166 @@ +package io.quarkus.virtual.threads; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Optional; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; + +import org.jboss.logging.Logger; + +import io.quarkus.runtime.LaunchMode; +import io.quarkus.runtime.ShutdownContext; +import io.quarkus.runtime.annotations.Recorder; +import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.vertx.core.Vertx; +import io.vertx.core.impl.ContextInternal; + +@Recorder +public class VirtualThreadsRecorder { + + private static final Logger logger = Logger.getLogger("io.quarkus.virtual-threads"); + + static VirtualThreadsConfig config = new VirtualThreadsConfig(); + + private static volatile Executor current; + private static final Object lock = new Object(); + + public void setupVirtualThreads(VirtualThreadsConfig c, ShutdownContext shutdownContext, LaunchMode launchMode) { + config = c; + if (launchMode == LaunchMode.DEVELOPMENT) { + shutdownContext.addLastShutdownTask(new Runnable() { + @Override + public void run() { + Executor executor = current; + if (executor instanceof ExecutorService) { + ((ExecutorService) executor).shutdownNow(); + } + current = null; + } + }); + } else { + shutdownContext.addLastShutdownTask(new Runnable() { + @Override + public void run() { + Executor executor = current; + if (executor instanceof ExecutorService) { + ExecutorService service = (ExecutorService) executor; + service.shutdown(); + + final long timeout = config.shutdownTimeout.toNanos(); + final long interval = config.shutdownCheckInterval.orElse(config.shutdownTimeout).toNanos(); + + long start = System.nanoTime(); + int loop = 1; + long elapsed = 0; + for (;;) { + // This log can be very useful when debugging problems + logger.debugf("Await termination loop: %s, remaining: %s", loop++, timeout - elapsed); + try { + if (!service.awaitTermination(Math.min(timeout, interval), NANOSECONDS)) { + elapsed = System.nanoTime() - start; + if (elapsed >= timeout) { + service.shutdownNow(); + break; + } + } else { + return; + } + } catch (InterruptedException ignored) { + } + } + } + } + }); + } + } + + public static Executor getCurrent() { + Executor executor = current; + if (executor != null) { + return executor; + } + synchronized (lock) { + if (current == null) { + current = createExecutor(); + } + return current; + } + } + + static ExecutorService newVirtualThreadPerTaskExecutorWithName(String prefix) + throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, ClassNotFoundException { + Method ofVirtual = Thread.class.getMethod("ofVirtual"); + Object vtb = ofVirtual.invoke(VirtualThreadsRecorder.class); + Class vtbClass = Class.forName("java.lang.Thread$Builder$OfVirtual"); + Method name = vtbClass.getMethod("name", String.class, long.class); + vtb = name.invoke(vtb, prefix, 0); + Method factory = vtbClass.getMethod("factory"); + ThreadFactory tf = (ThreadFactory) factory.invoke(vtb); + + return (ExecutorService) Executors.class.getMethod("newThreadPerTaskExecutor", ThreadFactory.class) + .invoke(VirtualThreadsRecorder.class, tf); + } + + static ExecutorService newVirtualThreadPerTaskExecutor() + throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + return (ExecutorService) Executors.class.getMethod("newVirtualThreadPerTaskExecutor") + .invoke(VirtualThreadsRecorder.class); + } + + static ExecutorService newVirtualThreadExecutor() + throws InvocationTargetException, NoSuchMethodException, IllegalAccessException { + try { + Optional namePrefix = config.namePrefix; + return namePrefix.isPresent() ? newVirtualThreadPerTaskExecutorWithName(namePrefix.get()) + : newVirtualThreadPerTaskExecutor(); + } catch (ClassNotFoundException e) { + logger.warn("Unable to invoke java.util.concurrent.Executors#newThreadPerTaskExecutor" + + " with VirtualThreadFactory, falling back to unnamed virtual threads", e); + return newVirtualThreadPerTaskExecutor(); + } + } + + /** + * This method uses reflection in order to allow developers to quickly test quarkus-loom without needing to + * change --release, --source, --target flags and to enable previews. + * Since we try to load the "Loom-preview" classes/methods at runtime, the application can even be compiled + * using java 11 and executed with a loom-compliant JDK. + */ + private static Executor createExecutor() { + try { + return new ContextPreservingExecutorService(newVirtualThreadExecutor()); + } catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) { + logger.debug("Unable to invoke java.util.concurrent.Executors#newVirtualThreadPerTaskExecutor", e); + //quite ugly but works + logger.warn("You weren't able to create an executor that spawns virtual threads, the default" + + " blocking executor will be used, please check that your JDK is compatible with " + + "virtual threads"); + //if for some reason a class/method can't be loaded or invoked we return the traditional executor, + // wrapping executeBlocking. + return new Executor() { + @Override + public void execute(Runnable command) { + var context = Vertx.currentContext(); + if (!(context instanceof ContextInternal)) { + Infrastructure.getDefaultWorkerPool().execute(command); + } else { + context.executeBlocking(fut -> { + try { + command.run(); + fut.complete(null); + } catch (Exception e) { + fut.fail(e); + } + }, false); + } + } + }; + } + } + +} diff --git a/extensions/virtual-threads/runtime/src/main/resources/META-INF/quarkus-extension.yaml b/extensions/virtual-threads/runtime/src/main/resources/META-INF/quarkus-extension.yaml new file mode 100644 index 0000000000000..06b2a96598fff --- /dev/null +++ b/extensions/virtual-threads/runtime/src/main/resources/META-INF/quarkus-extension.yaml @@ -0,0 +1,10 @@ +--- +artifact: ${project.groupId}:${project.artifactId}:${project.version} +name: "Virtual Threads Support" +metadata: + keywords: + - "virtual-threads" + - "loom" + unlisted: true + config: + - "quarkus.virtual-threads." diff --git a/extensions/virtual-threads/runtime/src/test/java/io/quarkus/virtual/threads/VirtualThreadExecutorSupplierTest.java b/extensions/virtual-threads/runtime/src/test/java/io/quarkus/virtual/threads/VirtualThreadExecutorSupplierTest.java new file mode 100644 index 0000000000000..e4cb74d37d117 --- /dev/null +++ b/extensions/virtual-threads/runtime/src/test/java/io/quarkus/virtual/threads/VirtualThreadExecutorSupplierTest.java @@ -0,0 +1,61 @@ +package io.quarkus.virtual.threads; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.time.Duration; +import java.util.concurrent.Executor; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledForJreRange; +import org.junit.jupiter.api.condition.JRE; + +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.helpers.test.UniAssertSubscriber; + +class VirtualThreadExecutorSupplierTest { + + @Test + @EnabledForJreRange(min = JRE.JAVA_20, disabledReason = "Virtual Threads are a preview feature starting from Java 20") + void virtualThreadCustomScheduler() + throws ClassNotFoundException, InvocationTargetException, IllegalAccessException, NoSuchMethodException { + Executor executor = VirtualThreadsRecorder.newVirtualThreadPerTaskExecutorWithName("vthread-"); + var assertSubscriber = Uni.createFrom().emitter(e -> { + assertThat(Thread.currentThread().getName()).isNotEmpty() + .startsWith("vthread-"); + assertThatItRunsOnVirtualThread(); + e.complete(null); + }).runSubscriptionOn(executor) + .subscribe().withSubscriber(UniAssertSubscriber.create()); + assertSubscriber.awaitItem(Duration.ofSeconds(1)).assertCompleted(); + } + + @Test + @EnabledForJreRange(min = JRE.JAVA_20, disabledReason = "Virtual Threads are a preview feature starting from Java 20") + void execute() throws InvocationTargetException, NoSuchMethodException, IllegalAccessException { + Executor executor = VirtualThreadsRecorder.newVirtualThreadPerTaskExecutor(); + var assertSubscriber = Uni.createFrom().emitter(e -> { + assertThat(Thread.currentThread().getName()).isEmpty(); + assertThatItRunsOnVirtualThread(); + e.complete(null); + }).runSubscriptionOn(executor) + .subscribe().withSubscriber(UniAssertSubscriber.create()); + assertSubscriber.awaitItem(Duration.ofSeconds(1)).assertCompleted(); + } + + public static void assertThatItRunsOnVirtualThread() { + // We cannot depend on a Java 20. + try { + Method isVirtual = Thread.class.getMethod("isVirtual"); + isVirtual.setAccessible(true); + boolean virtual = (Boolean) isVirtual.invoke(Thread.currentThread()); + if (!virtual) { + throw new AssertionError("Thread " + Thread.currentThread() + " is not a virtual thread"); + } + } catch (Exception e) { + throw new AssertionError( + "Thread " + Thread.currentThread() + " is not a virtual thread - cannot invoke Thread.isVirtual()", e); + } + } +}