Skip to content

Commit

Permalink
Virtual threads extension providing executor supplier for running vir…
Browse files Browse the repository at this point in the history
…tual threads

- Handles executor close on shutdown hook
- Creates named virtual threads
  • Loading branch information
ozangunalp committed Jul 31, 2023
1 parent 773b8c1 commit 3be4c1a
Show file tree
Hide file tree
Showing 24 changed files with 622 additions and 230 deletions.
10 changes: 10 additions & 0 deletions bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2948,6 +2948,16 @@
<artifactId>quarkus-info</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-virtual-threads</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-virtual-threads-deployment</artifactId>
<version>${project.version}</version>
</dependency>

<!-- Quarkus test dependencies -->
<dependency>
Expand Down
13 changes: 13 additions & 0 deletions devtools/bom-descriptor-json/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2839,6 +2839,19 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-virtual-threads</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-webjars-locator</artifactId>
Expand Down
13 changes: 13 additions & 0 deletions docs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2853,6 +2853,19 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-virtual-threads-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-webjars-locator-deployment</artifactId>
Expand Down
13 changes: 13 additions & 0 deletions docs/src/main/asciidoc/virtual-threads.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,19 @@ So, the data written in the duplicated context (and the request scope, as the re

However, thread locals are not propagated.

== Virtual thread names

Virtual threads are created without a thread name by default, which is not practical to identify the execution for debugging and logging purposes.
Quarkus managed virtual threads are named and prefixed with `quarkus-virtual-thread-`.
You can customize this prefix, or disable the naming altogether configuring an empty value:

[source, properties]
----
quarkus.virtual-threads.name-prefix=
----


== Additional references

- https://dl.acm.org/doi/10.1145/3583678.3596895[Considerations for integrating virtual threads in a Java framework: a Quarkus example in a resource-constrained environment]
4 changes: 4 additions & 0 deletions extensions/grpc/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@
<artifactId>quarkus-smallrye-health-deployment</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-virtual-threads-deployment</artifactId>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
Expand Down
4 changes: 4 additions & 0 deletions extensions/grpc/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-stork</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-virtual-threads</artifactId>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.reflect.InvocationTargetException;
import java.net.BindException;
import java.time.Duration;
import java.util.AbstractMap;
Expand All @@ -20,13 +19,10 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.regex.Pattern;

import jakarta.enterprise.inject.Instance;
Expand Down Expand Up @@ -63,15 +59,14 @@
import io.quarkus.runtime.ShutdownContext;
import io.quarkus.runtime.annotations.Recorder;
import io.quarkus.vertx.http.runtime.PortSystemProperties;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.quarkus.virtual.threads.VirtualThreadsRecorder;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.ext.web.Route;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
Expand Down Expand Up @@ -588,7 +583,8 @@ private ServerServiceDefinition serviceWithInterceptors(Vertx vertx, GrpcContain
List<String> virtuals = virtualMethodsPerService.get(service.getImplementationClassName());
if (list != null || virtuals != null) {
interceptors
.add(new BlockingServerInterceptor(vertx, list, virtuals, VIRTUAL_EXECUTOR_SUPPLIER.get(), devMode));
.add(new BlockingServerInterceptor(vertx, list, virtuals,
VirtualThreadsRecorder.getCurrent(), devMode));
}
}
return ServerInterceptors.intercept(service.definition, interceptors);
Expand Down Expand Up @@ -728,74 +724,4 @@ public void run(Runnable command) {
}
}

public static final Supplier<Executor> VIRTUAL_EXECUTOR_SUPPLIER = new Supplier<>() {
Executor current = null;

/**
* This method uses reflection in order to allow developers to quickly test quarkus-loom without needing to
* change --release, --source, --target flags and to enable previews.
* Since we try to load the "Loom-preview" classes/methods at runtime, the application can even be compiled
* using java 11 and executed with a loom-compliant JDK.
* <p>
* IMPORTANT: we still need to use a duplicated context to have all the propagation working.
* Thus, the context is captured and applied/terminated in the virtual thread.
*/
@Override
public Executor get() {
if (current == null) {
try {
var virtual = (Executor) Executors.class.getMethod("newVirtualThreadPerTaskExecutor")
.invoke(this);
current = new Executor() {
@Override
public void execute(Runnable command) {
var context = Vertx.currentContext();
if (!(context instanceof ContextInternal)) {
virtual.execute(command);
} else {
virtual.execute(new Runnable() {
@Override
public void run() {
final var previousContext = ((ContextInternal) context).beginDispatch();
try {
command.run();
} finally {
((ContextInternal) context).endDispatch(previousContext);
}
}
});
}
}
};
} catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) {
logger.debug("Unable to invoke java.util.concurrent.Executors#newVirtualThreadPerTaskExecutor", e);
//quite ugly but works
logger.warnf("You weren't able to create an executor that spawns virtual threads, the default" +
" blocking executor will be used, please check that your JDK is compatible with " +
"virtual threads");
//if for some reason a class/method can't be loaded or invoked we return the traditional executor,
// wrapping executeBlocking.
current = new Executor() {
@Override
public void execute(Runnable command) {
var context = Vertx.currentContext();
if (!(context instanceof ContextInternal)) {
Infrastructure.getDefaultWorkerPool().execute(command);
} else {
context.executeBlocking(fut -> {
try {
command.run();
fut.complete(null);
} catch (Exception e) {
fut.fail(e);
}
});
}
}
};
}
}
return current;
}
};
}
1 change: 1 addition & 0 deletions extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<modules>
<!-- Netty loom adaptor-->
<module>netty-loom-adaptor</module>
<module>virtual-threads</module>
<!-- Plumbing -->
<module>arc</module>
<module>scheduler</module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive-common-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-virtual-threads-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-security-deployment</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jsonp</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-virtual-threads</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@
import static io.quarkus.vertx.http.runtime.security.HttpSecurityRecorder.DefaultAuthFailureHandler.extractRootCause;

import java.io.Closeable;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -64,9 +61,8 @@
import io.quarkus.vertx.http.runtime.HttpBuildTimeConfig;
import io.quarkus.vertx.http.runtime.security.HttpSecurityRecorder.DefaultAuthFailureHandler;
import io.quarkus.vertx.http.runtime.security.QuarkusHttpUser;
import io.quarkus.virtual.threads.VirtualThreadsRecorder;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.ext.web.RoutingContext;

@Recorder
Expand All @@ -80,78 +76,6 @@ public Executor get() {
return ExecutorRecorder.getCurrent();
}
};
public static final Supplier<Executor> VIRTUAL_EXECUTOR_SUPPLIER = new Supplier<Executor>() {
Executor current = null;

/**
* This method is used to specify a custom executor to dispatch virtual threads on carrier threads
* We need reflection for both ease of use (see {@link #get() Get} method) but also because we call methods
* of private classes from the java.lang package.
*
* It is used for testing purposes only for now
*/
private Executor setVirtualThreadCustomScheduler(Executor executor) throws ClassNotFoundException,
InvocationTargetException, InstantiationException, IllegalAccessException, NoSuchMethodException {
var vtf = Class.forName("java.lang.ThreadBuilders").getDeclaredClasses()[0];
Constructor constructor = vtf.getDeclaredConstructors()[0];
constructor.setAccessible(true);
ThreadFactory tf = (ThreadFactory) constructor.newInstance(
new Object[] { executor, "quarkus-virtual-factory-", 0, 0,
null });

return (Executor) Executors.class.getMethod("newThreadPerTaskExecutor", ThreadFactory.class)
.invoke(this, tf);
}

/**
* This method uses reflection in order to allow developers to quickly test quarkus-loom without needing to
* change --release, --source, --target flags and to enable previews.
* Since we try to load the "Loom-preview" classes/methods at runtime, the application can even be compiled
* using java 11 and executed with a loom-compliant JDK.
* <p>
* IMPORTANT: we still need to use a duplicated context to have all the propagation working.
* Thus, the context is captured and applied/terminated in the virtual thread.
*/
@Override
public Executor get() {
if (current == null) {
try {
var virtual = (Executor) Executors.class.getMethod("newVirtualThreadPerTaskExecutor")
.invoke(this);
current = new Executor() {
@Override
public void execute(Runnable command) {
var context = Vertx.currentContext();
if (!(context instanceof ContextInternal)) {
virtual.execute(command);
} else {
virtual.execute(new Runnable() {
@Override
public void run() {
final var previousContext = ((ContextInternal) context).beginDispatch();
try {
command.run();
} finally {
((ContextInternal) context).endDispatch(previousContext);
}
}
});
}
}
};
} catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) {
logger.debug("Unable to invoke java.util.concurrent.Executors#newVirtualThreadPerTaskExecutor", e);
//quite ugly but works
logger.warnf("You weren't able to create an executor that spawns virtual threads, the default" +
" blocking executor will be used, please check that your JDK is compatible with " +
"virtual threads");
//if for some reason a class/method can't be loaded or invoked we return the traditional EXECUTOR
current = EXECUTOR_SUPPLIER.get();
}
}
return current;
}
};

static volatile Deployment currentDeployment;

Expand Down Expand Up @@ -205,7 +129,7 @@ public ResteasyReactiveRequestContext createContext(Deployment deployment,
}

RuntimeDeploymentManager runtimeDeploymentManager = new RuntimeDeploymentManager(info, EXECUTOR_SUPPLIER,
VIRTUAL_EXECUTOR_SUPPLIER,
VirtualThreadsRecorder::getCurrent,
closeTaskHandler, contextFactory, new ArcThreadSetupAction(beanContainer.requestContext()),
vertxConfig.rootPath);
Deployment deployment = runtimeDeploymentManager.deploy();
Expand Down
4 changes: 4 additions & 0 deletions extensions/smallrye-reactive-messaging/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-virtual-threads-deployment</artifactId>
</dependency>
<dependency>
<groupId>org.commonmark</groupId>
<artifactId>commonmark</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import io.quarkus.deployment.builditem.GeneratedClassBuildItem;
import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.deployment.builditem.nativeimage.RuntimeInitializedClassBuildItem;
import io.quarkus.deployment.metrics.MetricsCapabilityBuildItem;
import io.quarkus.deployment.recording.RecorderContext;
import io.quarkus.gizmo.ClassCreator;
Expand Down Expand Up @@ -114,12 +113,6 @@ AdditionalBeanBuildItem beans() {
QuarkusWorkerPoolRegistry.class);
}

@BuildStep
void nativeRuntimeInitClasses(BuildProducer<RuntimeInitializedClassBuildItem> runtimeInitClasses) {
runtimeInitClasses.produce(new RuntimeInitializedClassBuildItem(
"io.quarkus.smallrye.reactivemessaging.runtime.QuarkusWorkerPoolRegistry$VirtualExecutorSupplier"));
}

@BuildStep
AnnotationsTransformerBuildItem transformBeanScope(BeanArchiveIndexBuildItem index,
CustomScopeAnnotationsBuildItem scopes) {
Expand Down
Loading

0 comments on commit 3be4c1a

Please sign in to comment.