Skip to content

Commit

Permalink
Vertx - activate the request context for blocking ConsumeEvent methods
Browse files Browse the repository at this point in the history
  • Loading branch information
mkouba committed Jan 6, 2021
1 parent f181f2b commit aaa4294
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 70 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package io.quarkus.vertx.deployment;

import static io.quarkus.vertx.ConsumeEvent.FAILURE_CODE;
import static io.quarkus.vertx.deployment.VertxConstants.AXLE_MESSAGE;
import static io.quarkus.vertx.deployment.VertxConstants.COMPLETION_STAGE;
import static io.quarkus.vertx.deployment.VertxConstants.MESSAGE;
import static io.quarkus.vertx.deployment.VertxConstants.MUTINY_MESSAGE;
import static io.quarkus.vertx.deployment.VertxConstants.RX_MESSAGE;
import static io.quarkus.vertx.deployment.VertxConstants.UNI;

import java.lang.annotation.Annotation;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
Expand All @@ -30,40 +28,27 @@
import io.quarkus.gizmo.AssignableResultHandle;
import io.quarkus.gizmo.BranchResult;
import io.quarkus.gizmo.BytecodeCreator;
import io.quarkus.gizmo.CatchBlockCreator;
import io.quarkus.gizmo.ClassCreator;
import io.quarkus.gizmo.ClassOutput;
import io.quarkus.gizmo.FunctionCreator;
import io.quarkus.gizmo.MethodCreator;
import io.quarkus.gizmo.MethodDescriptor;
import io.quarkus.gizmo.ResultHandle;
import io.quarkus.gizmo.TryBlock;
import io.quarkus.runtime.util.HashUtil;
import io.quarkus.vertx.ConsumeEvent;
import io.quarkus.vertx.runtime.EventConsumerInvoker;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;

class EventBusConsumer {

private static final String INVOKER_SUFFIX = "_VertxInvoker";

private static final MethodDescriptor ARC_CONTAINER = MethodDescriptor
.ofMethod(Arc.class, "container", ArcContainer.class);
private static final MethodDescriptor INSTANCE_HANDLE_GET = MethodDescriptor.ofMethod(InstanceHandle.class, "get",
Object.class);
private static final MethodDescriptor ARC_CONTAINER_INSTANCE_FOR_TYPE = MethodDescriptor
.ofMethod(ArcContainer.class,
"instance", InstanceHandle.class,
Class.class, Annotation[].class);
private static final MethodDescriptor VERTX_EXECUTE_BLOCKING = MethodDescriptor.ofMethod(Vertx.class,
"executeBlocking", void.class, Handler.class, boolean.class, Handler.class);
private static final MethodDescriptor FUTURE_COMPLETE = MethodDescriptor.ofMethod(Future.class,
"complete", void.class, Object.class);
private static final MethodDescriptor FUTURE_FAIL = MethodDescriptor.ofMethod(Future.class,
"fail", void.class, Throwable.class);
private static final MethodDescriptor ARC_CONTAINER_BEAN = MethodDescriptor.ofMethod(ArcContainer.class, "bean",
InjectableBean.class, String.class);
private static final MethodDescriptor ARC_CONTAINER_INSTANCE_FOR_BEAN = MethodDescriptor
Expand Down Expand Up @@ -118,51 +103,23 @@ static String generateInvoker(BeanInfo bean, MethodInfo method,
String generatedName = targetPackage.replace('.', '/') + "/" + baseName + INVOKER_SUFFIX + "_" + method.name() + "_"
+ HashUtil.sha1(sigBuilder.toString());

boolean blocking;
AnnotationValue blockingValue = consumeEvent.value("blocking");
blocking = method.hasAnnotation(BLOCKING) || (blockingValue != null && blockingValue.asBoolean());

ClassCreator invokerCreator = ClassCreator.builder().classOutput(classOutput).className(generatedName)
.interfaces(EventConsumerInvoker.class).build();

// The method descriptor is: void invokeBean(Object message)
MethodCreator invoke = invokerCreator.getMethodCreator("invokeBean", void.class, Object.class);
ResultHandle containerHandle = invoke.invokeStaticMethod(ARC_CONTAINER);

AnnotationValue blocking = consumeEvent.value("blocking");
boolean blockingAnnotation = method.hasAnnotation(BLOCKING);
if ((blocking != null && blocking.asBoolean()) || blockingAnnotation) {
// Blocking operation must be performed on a worker thread
ResultHandle vertxHandle = invoke
.invokeInterfaceMethod(INSTANCE_HANDLE_GET,
invoke.invokeInterfaceMethod(ARC_CONTAINER_INSTANCE_FOR_TYPE, containerHandle,
invoke.loadClass(Vertx.class),
invoke.newArray(Annotation.class.getName(), invoke.load(0))));

FunctionCreator func = invoke.createFunction(Handler.class);
BytecodeCreator funcBytecode = func.getBytecode();
AssignableResultHandle messageHandle = funcBytecode.createVariable(Message.class);
funcBytecode.assign(messageHandle, invoke.getMethodParam(0));
TryBlock tryBlock = funcBytecode.tryBlock();
invoke(bean, method, messageHandle, tryBlock);
tryBlock.invokeInterfaceMethod(FUTURE_COMPLETE, funcBytecode.getMethodParam(0), tryBlock.loadNull());

CatchBlockCreator catchBlock = tryBlock.addCatch(Exception.class);
// Need to reply with the caught exception - using Throwable.toString on purpose to get the class name.
ResultHandle failureMessage = catchBlock
.invokeVirtualMethod(THROWABLE_TO_STRING, catchBlock.getCaughtException());
ResultHandle failureStatus = catchBlock.load(FAILURE_CODE);
catchBlock.invokeInterfaceMethod(
MESSAGE_FAIL,
messageHandle,
failureStatus,
failureMessage);
// Completing successfully, the failure has been sent to the sender.
catchBlock.invokeInterfaceMethod(FUTURE_COMPLETE, funcBytecode.getMethodParam(0), catchBlock.loadNull());
if (blocking) {
MethodCreator isBlocking = invokerCreator.getMethodCreator("isBlocking", boolean.class);
isBlocking.returnValue(isBlocking.load(true));
}

funcBytecode.returnValue(null);
invoke(bean, method, invoke.getMethodParam(0), invoke);

invoke.invokeInterfaceMethod(VERTX_EXECUTE_BLOCKING, vertxHandle, func.getInstance(), invoke.load(false),
invoke.loadNull());
} else {
invoke(bean, method, invoke.getMethodParam(0), invoke);
}
invoke.returnValue(null);
invokerCreator.close();
return generatedName.replace('/', '.');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.arc.Arc;
import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.vertx.ConsumeEvent;
import io.smallrye.common.annotation.Blocking;
Expand All @@ -39,9 +38,11 @@ public class MessageConsumerMethodTest {
@Inject
SimpleBean simpleBean;

@Inject
EventBus eventBus;

@Test
public void testSend() throws InterruptedException {
EventBus eventBus = Arc.container().instance(EventBus.class).get();
BlockingQueue<Object> synchronizer = new LinkedBlockingQueue<>();
eventBus.request("foo", "hello", ar -> {
if (ar.succeeded()) {
Expand All @@ -59,7 +60,6 @@ public void testSend() throws InterruptedException {

@Test
public void testSendAsync() throws InterruptedException {
EventBus eventBus = Arc.container().instance(EventBus.class).get();
BlockingQueue<Object> synchronizer = new LinkedBlockingQueue<>();
eventBus.request("foo-async", "hello", ar -> {
if (ar.succeeded()) {
Expand All @@ -77,7 +77,6 @@ public void testSendAsync() throws InterruptedException {

@Test
public void testSendAsyncUni() throws InterruptedException {
EventBus eventBus = Arc.container().instance(EventBus.class).get();
BlockingQueue<Object> synchronizer = new LinkedBlockingQueue<>();
eventBus.request("foo-async-uni", "hello-uni", ar -> {
if (ar.succeeded()) {
Expand All @@ -95,7 +94,6 @@ public void testSendAsyncUni() throws InterruptedException {

@Test
public void testSendDefaultAddress() throws InterruptedException {
EventBus eventBus = Arc.container().instance(EventBus.class).get();
BlockingQueue<Object> synchronizer = new LinkedBlockingQueue<>();
eventBus.request("io.quarkus.vertx.deployment.MessageConsumerMethodTest$SimpleBean", "Hello", ar -> {
if (ar.succeeded()) {
Expand All @@ -113,7 +111,6 @@ public void testSendDefaultAddress() throws InterruptedException {

@Test
public void testRequestContext() throws InterruptedException {
EventBus eventBus = Arc.container().instance(EventBus.class).get();
BlockingQueue<Object> synchronizer = new LinkedBlockingQueue<>();
eventBus.request("request", "Martin", ar -> {
if (ar.succeeded()) {
Expand All @@ -129,10 +126,26 @@ public void testRequestContext() throws InterruptedException {
assertEquals("MArtin", synchronizer.poll(2, TimeUnit.SECONDS));
}

@Test
public void testBlockingRequestContext() throws InterruptedException {
BlockingQueue<Object> synchronizer = new LinkedBlockingQueue<>();
eventBus.request("blocking-request", "Lu", ar -> {
if (ar.succeeded()) {
try {
synchronizer.put(ar.result().body());
} catch (InterruptedException e) {
fail(e);
}
} else {
fail(ar.cause());
}
});
assertEquals("Lu", synchronizer.poll(2, TimeUnit.SECONDS));
}

@Test
public void testPublish() throws InterruptedException {
SimpleBean.MESSAGES.clear();
EventBus eventBus = Arc.container().instance(EventBus.class).get();
SimpleBean.latch = new CountDownLatch(2);
eventBus.publish("pub", "Hello");
SimpleBean.latch.await(2, TimeUnit.SECONDS);
Expand All @@ -143,7 +156,6 @@ public void testPublish() throws InterruptedException {
@Test
public void testBlockingConsumer() throws InterruptedException {
SimpleBean.MESSAGES.clear();
EventBus eventBus = Arc.container().instance(EventBus.class).get();
SimpleBean.latch = new CountDownLatch(1);
eventBus.publish("blocking", "Hello");
SimpleBean.latch.await(2, TimeUnit.SECONDS);
Expand All @@ -155,7 +167,6 @@ public void testBlockingConsumer() throws InterruptedException {
@Test
public void testPublishRx() throws InterruptedException {
SimpleBean.MESSAGES.clear();
EventBus eventBus = Arc.container().instance(EventBus.class).get();
SimpleBean.latch = new CountDownLatch(1);
eventBus.publish("pub-rx", "Hello");
SimpleBean.latch.await(2, TimeUnit.SECONDS);
Expand All @@ -165,7 +176,6 @@ public void testPublishRx() throws InterruptedException {
@Test
public void testPublishAxle() throws InterruptedException {
SimpleBean.MESSAGES.clear();
EventBus eventBus = Arc.container().instance(EventBus.class).get();
SimpleBean.latch = new CountDownLatch(1);
eventBus.publish("pub-axle", "Hello");
SimpleBean.latch.await(2, TimeUnit.SECONDS);
Expand All @@ -175,7 +185,6 @@ public void testPublishAxle() throws InterruptedException {
@Test
public void testPublishMutiny() throws InterruptedException {
SimpleBean.MESSAGES.clear();
EventBus eventBus = Arc.container().instance(EventBus.class).get();
SimpleBean.latch = new CountDownLatch(1);
eventBus.publish("pub-mutiny", "Hello");
SimpleBean.latch.await(2, TimeUnit.SECONDS);
Expand All @@ -185,7 +194,6 @@ public void testPublishMutiny() throws InterruptedException {
@Test
public void testBlockingConsumerUsingSmallRyeBlocking() throws InterruptedException {
SimpleBean.MESSAGES.clear();
EventBus eventBus = Arc.container().instance(EventBus.class).get();
SimpleBean.latch = new CountDownLatch(1);
eventBus.publish("worker", "Hello");
SimpleBean.latch.await(2, TimeUnit.SECONDS);
Expand Down Expand Up @@ -270,6 +278,12 @@ void consumeBlockingUsingRunOnWorkerThread(String message) {
MESSAGES.add(message.toLowerCase() + "::" + Context.isOnWorkerThread());
latch.countDown();
}

@Blocking
@ConsumeEvent("blocking-request")
String blockingRequestContextActive(String message) {
return transformer.transform(message);
}
}

@RequestScoped
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,8 @@
*/
public interface EventConsumerInvoker extends BeanInvoker<Message<Object>> {

default boolean isBlocking() {
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.quarkus.vertx.ConsumeEvent;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
Expand Down Expand Up @@ -81,10 +82,24 @@ void registerMessageConsumers(Map<String, ConsumeEvent> messageConsumerConfigura
consumer.handler(new Handler<Message<Object>>() {
@Override
public void handle(Message<Object> m) {
try {
invoker.invoke(m);
} catch (Throwable e) {
m.fail(ConsumeEvent.FAILURE_CODE, e.toString());
if (invoker.isBlocking()) {
vertx.executeBlocking(new Handler<Promise<Object>>() {
@Override
public void handle(Promise<Object> event) {
try {
invoker.invoke(m);
} catch (Throwable e) {
m.fail(ConsumeEvent.FAILURE_CODE, e.toString());
}
event.complete();
}
}, null);
} else {
try {
invoker.invoke(m);
} catch (Throwable e) {
m.fail(ConsumeEvent.FAILURE_CODE, e.toString());
}
}
}
});
Expand Down

0 comments on commit aaa4294

Please sign in to comment.