Skip to content

Commit

Permalink
merge changes from Ozan
Browse files Browse the repository at this point in the history
+ fixed logging and default executor when newVirtualThreadPerTask can't be used to schedule virtual threads
  • Loading branch information
anavarr committed Jul 5, 2023
1 parent 8a06248 commit 0badc7c
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -56,7 +56,7 @@ public class AeshConsole extends QuarkusConsole {
* Because Aesh can log deadlocks are possible on Windows if a write fails, unless care
* is taken.
*/
private final LinkedBlockingDeque<String> writeQueue = new LinkedBlockingDeque<>();
private final ConcurrentLinkedDeque<String> writeQueue = new ConcurrentLinkedDeque<>();
private final Lock connectionLock = new ReentrantLock();
private static final ThreadLocal<Boolean> IN_WRITE = new ThreadLocal<>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.KOTLIN_UNIT;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.MERGE;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.OUTGOING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.RUN_ON_VIRTUAL_THREAD;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.SMALLRYE_BLOCKING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.TRANSACTIONAL;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.VOID_CLASS;
Expand All @@ -33,6 +34,7 @@
import io.quarkus.runtime.configuration.ConfigurationException;
import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusMediatorConfiguration;
import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusParameterDescriptor;
import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusWorkerPoolRegistry;
import io.quarkus.smallrye.reactivemessaging.runtime.TypeInfo;
import io.smallrye.reactive.messaging.Shape;
import io.smallrye.reactive.messaging.annotations.Blocking;
Expand Down Expand Up @@ -178,17 +180,28 @@ public Integer get() {
AnnotationInstance blockingAnnotation = methodInfo.annotation(BLOCKING);
AnnotationInstance smallryeBlockingAnnotation = methodInfo.annotation(SMALLRYE_BLOCKING);
AnnotationInstance transactionalAnnotation = methodInfo.annotation(TRANSACTIONAL);
if (blockingAnnotation != null || smallryeBlockingAnnotation != null || transactionalAnnotation != null) {
AnnotationInstance runOnVirtualThreadAnnotation = methodInfo.annotation(RUN_ON_VIRTUAL_THREAD);
if (blockingAnnotation != null || smallryeBlockingAnnotation != null || transactionalAnnotation != null
|| runOnVirtualThreadAnnotation != null) {
mediatorConfigurationSupport.validateBlocking(validationOutput);
configuration.setBlocking(true);
if (blockingAnnotation != null) {
AnnotationValue ordered = blockingAnnotation.value("ordered");
configuration.setBlockingExecutionOrdered(ordered == null || ordered.asBoolean());
if (ordered == null && runOnVirtualThreadAnnotation != null) {
configuration.setBlockingExecutionOrdered(false);
} else {
configuration.setBlockingExecutionOrdered(ordered == null || ordered.asBoolean());
}
String poolName;
if (blockingAnnotation.value() != null &&
!(poolName = blockingAnnotation.value().asString()).equals(Blocking.DEFAULT_WORKER_POOL)) {
configuration.setWorkerPoolName(poolName);
} else if (runOnVirtualThreadAnnotation != null) {
configuration.setWorkerPoolName(QuarkusWorkerPoolRegistry.DEFAULT_VIRTUAL_THREAD_WORKER);
}
} else if (runOnVirtualThreadAnnotation != null) {
configuration.setBlockingExecutionOrdered(false);
configuration.setWorkerPoolName(QuarkusWorkerPoolRegistry.DEFAULT_VIRTUAL_THREAD_WORKER);
} else {
configuration.setBlockingExecutionOrdered(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.jboss.jandex.DotName;

import io.smallrye.common.annotation.RunOnVirtualThread;
import io.smallrye.reactive.messaging.MessageConverter;
import io.smallrye.reactive.messaging.MutinyEmitter;
import io.smallrye.reactive.messaging.annotations.Blocking;
Expand Down Expand Up @@ -89,6 +90,7 @@ public final class ReactiveMessagingDotNames {
.createSimple("io.quarkus.smallrye.reactivemessaging.runtime.kotlin.AbstractSubscribingCoroutineInvoker");

static final DotName TRANSACTIONAL = DotName.createSimple("jakarta.transaction.Transactional");
static final DotName RUN_ON_VIRTUAL_THREAD = DotName.createSimple(RunOnVirtualThread.class.getName());

private ReactiveMessagingDotNames() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static io.quarkus.deployment.annotations.ExecutionTime.STATIC_INIT;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.BLOCKING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.RUN_ON_VIRTUAL_THREAD;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.SMALLRYE_BLOCKING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.TRANSACTIONAL;

Expand Down Expand Up @@ -240,6 +241,7 @@ public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext re
BeanInfo bean = mediatorMethod.getBean();

if (methodInfo.hasAnnotation(BLOCKING) || methodInfo.hasAnnotation(SMALLRYE_BLOCKING)
|| methodInfo.hasAnnotation(RUN_ON_VIRTUAL_THREAD)
|| methodInfo.hasAnnotation(TRANSACTIONAL)) {
// Just in case both annotation are used, use @Blocking value.
String poolName = Blocking.DEFAULT_WORKER_POOL;
Expand All @@ -249,8 +251,12 @@ public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext re
AnnotationInstance blocking = methodInfo.annotation(ReactiveMessagingDotNames.BLOCKING);
poolName = blocking.value() == null ? Blocking.DEFAULT_WORKER_POOL : blocking.value().asString();
}
boolean virtualThread = methodInfo.hasAnnotation(RUN_ON_VIRTUAL_THREAD);
if (virtualThread && Blocking.DEFAULT_WORKER_POOL.equals(poolName)) {
poolName = QuarkusWorkerPoolRegistry.DEFAULT_VIRTUAL_THREAD_WORKER;
}
workerConfigurations.add(new WorkerConfiguration(methodInfo.declaringClass().toString(),
methodInfo.name(), poolName));
methodInfo.name(), poolName, virtualThread));
}

try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package io.quarkus.smallrye.reactivemessaging.runtime;

import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.function.Supplier;

import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
Expand All @@ -15,13 +20,16 @@
import jakarta.inject.Inject;

import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.logging.Logger;
import org.slf4j.LoggerFactory;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.annotations.Blocking;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.connectors.WorkerPoolRegistry;
import io.smallrye.reactive.messaging.providers.helpers.Validation;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.WorkerExecutor;

Expand All @@ -30,15 +38,65 @@
@ApplicationScoped
// TODO: create a different entry for WorkerPoolRegistry than `analyzeWorker` and drop this class
public class QuarkusWorkerPoolRegistry extends WorkerPoolRegistry {

static final Logger logger = Logger.getLogger("io.quarkus");
private static final String WORKER_CONFIG_PREFIX = "smallrye.messaging.worker";
private static final String WORKER_CONCURRENCY = "max-concurrency";
public static final String DEFAULT_VIRTUAL_THREAD_WORKER = "<virtual-thread>";
public static final int DEFAULT_VIRTUAL_THREAD_MAX_CONCURRENCY = 5000;

@Inject
ExecutionHolder executionHolder;

private final Map<String, Semaphore> virtualThreadConcurrency = new ConcurrentHashMap<>();

private final Map<String, Integer> workerConcurrency = new HashMap<>();
private final Map<String, WorkerExecutor> workerExecutors = new ConcurrentHashMap<>();

public static final Supplier<Executor> VIRTUAL_EXECUTOR_SUPPLIER = new Supplier<Executor>() {
volatile Executor current = null;

@Override
public Executor get() {
if (current == null) {
try {
var virtual = (Executor) Executors.class.getMethod("newVirtualThreadPerTaskExecutor")
.invoke(this);
current = new Executor() {
@Override
public void execute(Runnable command) {
var context = Vertx.currentContext();
if (!(context instanceof ContextInternal)) {
virtual.execute(command);
} else {
virtual.execute(new Runnable() {
@Override
public void run() {
final var previousContext = ((ContextInternal) context).beginDispatch();
try {
command.run();
} finally {
((ContextInternal) context).endDispatch(previousContext);
}
}
});
}
}
};
} catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) {
System.err.println(e);
//quite ugly but works
logger.warnf("You weren't able to create an executor that spawns virtual threads, the default" +
" blocking executor will be used, please check that your JDK is compatible with " +
"virtual threads");
//if for some reason a class/method can't be loaded or invoked we return a default blocking executor
current = Executors.newCachedThreadPool();
}
}
return current;
}
};

public void terminate(
@Observes(notifyObserver = Reception.IF_EXISTS) @Priority(100) @BeforeDestroyed(ApplicationScoped.class) Object event) {
if (!workerExecutors.isEmpty()) {
Expand All @@ -56,6 +114,24 @@ public <T> Uni<T> executeWork(Context currentContext, Uni<T> uni, String workerN
return currentContext.executeBlocking(Uni.createFrom().deferred(() -> uni), ordered);
}
return executionHolder.vertx().executeBlocking(uni, ordered);
} else if (virtualThreadConcurrency.containsKey(workerName)) {
Semaphore semaphore = virtualThreadConcurrency.get(workerName);
boolean acquired = semaphore.tryAcquire();
if (acquired) {
return runOnVirtualThread(currentContext, uni, semaphore);
} else {
if (currentContext != null) {
return currentContext.executeBlocking(Uni.createFrom().deferred(() -> {
semaphore.acquireUninterruptibly();
return runOnVirtualThread(currentContext, uni, semaphore);
}));
} else {
return executionHolder.vertx().executeBlocking(Uni.createFrom().deferred(() -> {
semaphore.acquireUninterruptibly();
return runOnVirtualThread(currentContext, uni, semaphore);
}));
}
}
} else {
if (currentContext != null) {
return getWorker(workerName).executeBlocking(uni, ordered)
Expand All @@ -73,6 +149,20 @@ public <T> Uni<T> executeWork(Context currentContext, Uni<T> uni, String workerN
}
}

private <T> Uni<T> runOnVirtualThread(Context currentContext, Uni<T> uni, Semaphore semaphore) {
return uni.runSubscriptionOn(command -> VIRTUAL_EXECUTOR_SUPPLIER.get().execute(command))
.onItemOrFailure().transformToUni((item, failure) -> {
semaphore.release();
return Uni.createFrom().emitter(emitter -> {
if (failure != null) {
currentContext.runOnContext(() -> emitter.fail(failure));
} else {
currentContext.runOnContext(() -> emitter.complete(item));
}
});
});
}

public WorkerExecutor getWorker(String workerName) {
Objects.requireNonNull(workerName, "Worker Name not specified");

Expand Down Expand Up @@ -102,10 +192,10 @@ public WorkerExecutor getWorker(String workerName) {
}

// Shouldn't get here
throw new IllegalArgumentException("@Blocking referred to invalid worker name.");
throw new IllegalArgumentException("@Blocking referred to invalid worker name. " + workerName);
}

public void defineWorker(String className, String method, String poolName) {
public void defineWorker(String className, String method, String poolName, boolean virtualThread) {
Objects.requireNonNull(className, "className was empty");
Objects.requireNonNull(method, "Method was empty");

Expand All @@ -118,11 +208,17 @@ public void defineWorker(String className, String method, String poolName) {
// Validate @Blocking worker pool has configuration to define concurrency
String workerConfigKey = WORKER_CONFIG_PREFIX + "." + poolName + "." + WORKER_CONCURRENCY;
Optional<Integer> concurrency = ConfigProvider.getConfig().getOptionalValue(workerConfigKey, Integer.class);
if (!concurrency.isPresent()) {
throw getBlockingError(className, method, workerConfigKey + " was not defined");
}

workerConcurrency.put(poolName, concurrency.get());
if (virtualThread) {
virtualThreadConcurrency.put(poolName,
new Semaphore(concurrency.orElse(DEFAULT_VIRTUAL_THREAD_MAX_CONCURRENCY)));
} else {
if (!concurrency.isPresent()) {
throw getBlockingError(className, method, workerConfigKey + " was not defined");
}

workerConcurrency.put(poolName, concurrency.get());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ void onStaticInit(@Observes @Initialized(ApplicationScoped.class) Object event,
QuarkusWorkerPoolRegistry workerPoolRegistry) {
mediatorManager.addAnalyzed(context.getMediatorConfigurations());
for (WorkerConfiguration worker : context.getWorkerConfigurations()) {
workerPoolRegistry.defineWorker(worker.getClassName(), worker.getMethodName(), worker.getPoolName());
workerPoolRegistry.defineWorker(worker.getClassName(), worker.getMethodName(), worker.getPoolName(),
worker.isVirtualThread());
}
for (EmitterConfiguration emitter : context.getEmitterConfigurations()) {
mediatorManager.addEmitter(emitter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ public class WorkerConfiguration {

private String poolName;

private boolean virtualThread;

public WorkerConfiguration() {
}

public WorkerConfiguration(String className, String name, String poolName) {
public WorkerConfiguration(String className, String name, String poolName, boolean virtualThread) {
this.className = className;
this.methodName = name;
this.poolName = poolName;
this.virtualThread = virtualThread;
}

public String getClassName() {
Expand All @@ -41,4 +44,11 @@ public void setPoolName(String poolName) {
this.poolName = poolName;
}

public boolean isVirtualThread() {
return virtualThread;
}

public void setVirtualThread(boolean virtualThread) {
this.virtualThread = virtualThread;
}
}

0 comments on commit 0badc7c

Please sign in to comment.