Skip to content

Commit

Permalink
Merge pull request #30279 from mkouba/issue-29466
Browse files Browse the repository at this point in the history
Introduce VertxContextSupport
  • Loading branch information
cescoffier authored Jan 11, 2023
2 parents 712c06a + 7315443 commit ad321f0
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 0 deletions.
21 changes: 21 additions & 0 deletions docs/src/main/asciidoc/vertx.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,27 @@ Then, invoke the endpoint using:
The response indicates that, in addition to the English page, there are a German and a French page about Quarkus on wikipedia.
== Executing Asynchronous Code From a Blocking Thread
Sometimes it's necessary to execute an asynchronous code from a blocking thread.
Specifically, to execute the code on a Vert.x thread with an isolated/duplicated Vert.x context.
A typical example is an asynchronous code that needs to leverage the Hibernate Reactive API during application startup.
Quarkus provides the `VertxContextSupport#subscribeAndAwait()` method which subscribes to the supplied `io.smallrye.mutiny.Uni` on a Vert.x duplicated context, then blocks the current thread and waits for the result.
[source, java]
----
void onStart(@Observes StartupEvent event, Mutiny.SessionFactory sf) {
VertxContextSupport.subscribeAndAwait(() -> {
sf.withTransaction(session -> session.persist(new Person()));
});
}
----
NOTE: If it's necessary, the CDI request context is activated during execution of the asynchronous code.
CAUTION: `VertxContextSupport#subscribeAndAwait()` must not be called on an event loop!
== Going further
This guide introduced how you can use Vert.x APIs from a Quarkus application.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package io.quarkus.vertx;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.function.Supplier;

import javax.enterprise.event.Observes;
import javax.inject.Inject;
import javax.inject.Singleton;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.arc.Arc;
import io.quarkus.runtime.StartupEvent;
import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.smallrye.common.vertx.VertxContext;
import io.smallrye.mutiny.Uni;

public class VertxContextSupportTest {

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.withApplicationRoot(root -> root
.addClasses(Alpha.class));

@Inject
Alpha alpha;

@Test
public void testRunner() {
assertEquals("foo", alpha.val);
}

@Singleton
public static class Alpha {

String val;

void onStart(@Observes StartupEvent event) {
Supplier<Uni<String>> supplier = new Supplier<Uni<String>>() {
@Override
public Uni<String> get() {
assertTrue(VertxContext.isOnDuplicatedContext());
VertxContextSafetyToggle.validateContextIfExists("Error", "Error");
assertTrue(Arc.container().requestContext().isActive());
return Uni.createFrom().item("foo");
}
};
try {
val = VertxContextSupport.subscribeAndAwait(supplier);
} catch (Throwable e) {
fail();
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.quarkus.vertx;

import java.util.function.Supplier;

import io.quarkus.arc.Arc;
import io.quarkus.arc.ManagedContext;
import io.quarkus.vertx.core.runtime.VertxCoreRecorder;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.smallrye.common.vertx.VertxContext;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;

/**
* Provides utility methods to work with Vertx duplicated context.
*
* @see VertxContext
*/
public final class VertxContextSupport {

private VertxContextSupport() {
}

/**
* Subscribes to the supplied {@link Uni} on a Vertx duplicated context; blocks the current thread and waits for the result.
* <p>
* If it's necessary, the CDI request context is activated during execution of the asynchronous code.
*
* @param uniSupplier
* @throws IllegalStateException If called on an event loop thread.
*/
public static <T> T subscribeAndAwait(Supplier<Uni<T>> uniSupplier) throws Throwable {
Context context = getContext();
VertxContextSafetyToggle.setContextSafe(context, true);
return Uni.createFrom().<T> emitter(e -> {
context.runOnContext(new Handler<Void>() {

@Override
public void handle(Void event) {
ManagedContext requestContext = Arc.container().requestContext();
Runnable terminate = null;
if (!requestContext.isActive()) {
requestContext.activate();
terminate = requestContext::terminate;
}
try {
Uni<T> uni = uniSupplier.get();
if (terminate != null) {
uni = uni.onTermination().invoke(terminate);
}
uni.subscribe().with(e::complete, e::fail);
} catch (Throwable t) {
e.fail(t);
}
}
});
}).await().indefinitely();
}

private static Context getContext() {
Context context = Vertx.currentContext();
if (context == null) {
Vertx vertx = VertxCoreRecorder.getVertx().get();
context = VertxContext.getOrCreateDuplicatedContext(vertx);
} else {
// Executed on a vertx thread...
if (Context.isOnEventLoopThread()) {
throw new IllegalStateException("VertxContextSupport#subscribeAndAwait() must not be called on an event loop!");
}
context = VertxContext.getOrCreateDuplicatedContext(context);
}
return context;
}

}

0 comments on commit ad321f0

Please sign in to comment.