From dd0fdaa53e2ca76305ac1c845cd3b4dbeacf5766 Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Thu, 2 Dec 2021 15:33:49 +0100 Subject: [PATCH] When a reactive messaging method is annotated with @Transactional, consider it blocking. Fix https://github.com/quarkusio/quarkus/issues/21795 --- docs/src/main/asciidoc/amqp-reference.adoc | 7 ++- docs/src/main/asciidoc/kafka.adoc | 18 +++--- .../QuarkusMediatorConfigurationUtil.java | 4 +- .../deployment/ReactiveMessagingDotNames.java | 2 + .../SmallRyeReactiveMessagingProcessor.java | 6 +- .../TransactionalSubscriberTest.java | 55 +++++++++++++++++++ .../beans/IncomingUsingTransactional.java | 37 +++++++++++++ 7 files changed, 119 insertions(+), 10 deletions(-) create mode 100644 extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/TransactionalSubscriberTest.java create mode 100644 extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/blocking/beans/IncomingUsingTransactional.java diff --git a/docs/src/main/asciidoc/amqp-reference.adoc b/docs/src/main/asciidoc/amqp-reference.adoc index e7b40854ee708..384250366a795 100644 --- a/docs/src/main/asciidoc/amqp-reference.adoc +++ b/docs/src/main/asciidoc/amqp-reference.adoc @@ -369,7 +369,6 @@ import javax.transaction.Transactional; public class PriceStorage { @Incoming("prices") - @Blocking @Transactional public void store(int priceInUsd) { Price price = new Price(); @@ -393,6 +392,12 @@ The first one provides more fine-grained tuning such as the worker pool to use a The second one, used also with other reactive features of Quarkus, uses the default worker pool and preserves the order. ==== +[TIP] +.@Transactional +==== +If your method is annotated with `@Transactional`, it will be considered _blocking_ automatically, even if the method is not annotated with `@Blocking`. +==== + == Customizing the underlying AMQP client The connector uses the Vert.x AMQP client underneath. diff --git a/docs/src/main/asciidoc/kafka.adoc b/docs/src/main/asciidoc/kafka.adoc index 8f7dcad209804..fddbc20b1e219 100644 --- a/docs/src/main/asciidoc/kafka.adoc +++ b/docs/src/main/asciidoc/kafka.adoc @@ -235,7 +235,6 @@ import javax.transaction.Transactional; public class PriceStorage { @Incoming("prices") - @Blocking @Transactional public void store(int priceInUsd) { Price price = new Price(); @@ -263,6 +262,12 @@ The second one, used also with other reactive features of Quarkus, uses the defa Detailed information on the usage of `@Blocking` annotation can be found in https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/3.1/advanced/blocking.html[SmallRye Reactive Messaging – Handling blocking execution]. ==== +[TIP] +.@Transactional +==== +If your method is annotated with `@Transactional`, it will be considered _blocking_ automatically, even if the method is not annotated with `@Blocking`. +==== + === Acknowledgment Strategies All messages received by a consumer must be acknowledged. @@ -1935,17 +1940,16 @@ public class FruitConsumer { @Incoming("fruits") // <1> @Transactional // <2> - @Blocking // <3> - public void persistFruits(Fruit fruit) { // <4> - fruit.persist(); // <5> + public void persistFruits(Fruit fruit) { // <3> + fruit.persist(); // <4> } } ---- <1> Configuring the incoming channel. This channel reads from Kafka. <2> As we are writing in a database, we must be in a transaction. This annotation starts a new transaction and commits it when the method returns. -<3> Writing to a database using classic Hibernate is blocking. So, you must tell to Quarkus that the method must be called on a worker thread you can block (and not an I/O thread). -<4> The method receives each Fruit. Note that you would need a deserializer to reconstruct the Fruit instances from the Kafka records. -<5> Persist the received `fruit` object. +Quarkus automatically considers the method as _blocking_. Indeed, writing to a database using classic Hibernate is blocking. So, Quarkus calls the method on a worker thread you can block (and not an I/O thread). +<3> The method receives each Fruit. Note that you would need a deserializer to reconstruct the Fruit instances from the Kafka records. +<4> Persist the received `fruit` object. As mentioned in <4>, you need a deserializer that can create a `Fruit` from the record. This can be done using a Jackson deserializer: diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java index 5d423995c6f77..c265eb4af8c93 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java @@ -10,6 +10,7 @@ import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.MERGE; import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.OUTGOING; import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.SMALLRYE_BLOCKING; +import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.TRANSACTIONAL; import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.VOID_CLASS; import java.util.ArrayList; @@ -158,7 +159,8 @@ public Integer get() { AnnotationInstance blockingAnnotation = methodInfo.annotation(BLOCKING); AnnotationInstance smallryeBlockingAnnotation = methodInfo.annotation(SMALLRYE_BLOCKING); - if (blockingAnnotation != null || smallryeBlockingAnnotation != null) { + AnnotationInstance transactionalAnnotation = methodInfo.annotation(TRANSACTIONAL); + if (blockingAnnotation != null || smallryeBlockingAnnotation != null || transactionalAnnotation != null) { mediatorConfigurationSupport.validateBlocking(validationOutput); configuration.setBlocking(true); if (blockingAnnotation != null) { diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java index ba94e68032adb..6634a6b2bc2bb 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java @@ -72,6 +72,8 @@ public final class ReactiveMessagingDotNames { static final DotName ABSTRACT_SUBSCRIBING_COROUTINE_INVOKER = DotName .createSimple("io.quarkus.smallrye.reactivemessaging.runtime.kotlin.AbstractSubscribingCoroutineInvoker"); + static final DotName TRANSACTIONAL = DotName.createSimple("javax.transaction.Transactional"); + private ReactiveMessagingDotNames() { } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java index f12d2ec54e2aa..b7f7afcbfc3f9 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java @@ -3,6 +3,7 @@ import static io.quarkus.deployment.annotations.ExecutionTime.STATIC_INIT; import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.BLOCKING; import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.SMALLRYE_BLOCKING; +import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.TRANSACTIONAL; import java.lang.reflect.Modifier; import java.util.ArrayList; @@ -218,9 +219,12 @@ public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext re MethodInfo methodInfo = mediatorMethod.getMethod(); BeanInfo bean = mediatorMethod.getBean(); - if (methodInfo.hasAnnotation(BLOCKING) || methodInfo.hasAnnotation(SMALLRYE_BLOCKING)) { + if (methodInfo.hasAnnotation(BLOCKING) || methodInfo.hasAnnotation(SMALLRYE_BLOCKING) + || methodInfo.hasAnnotation(TRANSACTIONAL)) { // Just in case both annotation are used, use @Blocking value. String poolName = Blocking.DEFAULT_WORKER_POOL; + + // If the method is annotated with the SmallRye Reactive Messaging @Blocking, extract the worker pool name if any if (methodInfo.hasAnnotation(ReactiveMessagingDotNames.BLOCKING)) { AnnotationInstance blocking = methodInfo.annotation(ReactiveMessagingDotNames.BLOCKING); poolName = blocking.value() == null ? Blocking.DEFAULT_WORKER_POOL : blocking.value().asString(); diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/TransactionalSubscriberTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/TransactionalSubscriberTest.java new file mode 100644 index 0000000000000..85db105af0b1c --- /dev/null +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/TransactionalSubscriberTest.java @@ -0,0 +1,55 @@ +package io.quarkus.smallrye.reactivemessaging; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.io.File; +import java.util.List; +import java.util.stream.Collectors; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.reactivestreams.Publisher; + +import io.quarkus.smallrye.reactivemessaging.blocking.beans.IncomingUsingTransactional; +import io.quarkus.test.QuarkusUnitTest; +import io.smallrye.mutiny.Multi; + +public class TransactionalSubscriberTest { + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addClasses(ProduceIn.class, IncomingUsingTransactional.class) + .addAsResource( + new File("src/test/resources/config/worker-config.properties"), + "application.properties")); + + @Inject + IncomingUsingTransactional incoming; + + @Test + public void testIncomingUsingRunOnWorkerThread() { + await().until(() -> incoming.list().size() == 6); + assertThat(incoming.list()).contains("a", "b", "c", "d", "e", "f"); + + List threadNames = incoming.threads().stream().distinct() + .collect(Collectors.toList()); + assertThat(threadNames.contains(Thread.currentThread().getName())).isFalse(); + for (String name : threadNames) { + assertThat(name.startsWith("executor-thread-")).isTrue(); + } + } + + @ApplicationScoped + public static class ProduceIn { + @Outgoing("in") + public Publisher produce() { + return Multi.createFrom().items("a", "b", "c", "d", "e", "f"); + } + } + +} diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/blocking/beans/IncomingUsingTransactional.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/blocking/beans/IncomingUsingTransactional.java new file mode 100644 index 0000000000000..af17e380a7ec8 --- /dev/null +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/blocking/beans/IncomingUsingTransactional.java @@ -0,0 +1,37 @@ +package io.quarkus.smallrye.reactivemessaging.blocking.beans; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import javax.enterprise.context.ApplicationScoped; +import javax.transaction.Transactional; + +import org.eclipse.microprofile.reactive.messaging.Incoming; + +@ApplicationScoped +public class IncomingUsingTransactional { + private final List list = new CopyOnWriteArrayList<>(); + private final List threads = new CopyOnWriteArrayList<>(); + + @Incoming("in") + @Transactional + public void consume(String s) { + if (s.equals("b") || s.equals("d") || s.equals("f")) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + threads.add(Thread.currentThread().getName()); + list.add(s); + } + + public List list() { + return list; + } + + public List threads() { + return threads; + } +}