Skip to content

Commit

Permalink
Merge pull request #21889 from cescoffier/incoming-transactional-bloc…
Browse files Browse the repository at this point in the history
…king

Consider blocking reactive messaging methods using @transactional
  • Loading branch information
geoand authored Dec 3, 2021
2 parents 1fe8ec0 + dd0fdaa commit 267941c
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 10 deletions.
7 changes: 6 additions & 1 deletion docs/src/main/asciidoc/amqp-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,6 @@ import javax.transaction.Transactional;
public class PriceStorage {
@Incoming("prices")
@Blocking
@Transactional
public void store(int priceInUsd) {
Price price = new Price();
Expand All @@ -393,6 +392,12 @@ The first one provides more fine-grained tuning such as the worker pool to use a
The second one, used also with other reactive features of Quarkus, uses the default worker pool and preserves the order.
====

[TIP]
.@Transactional
====
If your method is annotated with `@Transactional`, it will be considered _blocking_ automatically, even if the method is not annotated with `@Blocking`.
====

== Customizing the underlying AMQP client

The connector uses the Vert.x AMQP client underneath.
Expand Down
18 changes: 11 additions & 7 deletions docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ import javax.transaction.Transactional;
public class PriceStorage {
@Incoming("prices")
@Blocking
@Transactional
public void store(int priceInUsd) {
Price price = new Price();
Expand Down Expand Up @@ -263,6 +262,12 @@ The second one, used also with other reactive features of Quarkus, uses the defa
Detailed information on the usage of `@Blocking` annotation can be found in https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/3.1/advanced/blocking.html[SmallRye Reactive Messaging – Handling blocking execution].
====

[TIP]
.@Transactional
====
If your method is annotated with `@Transactional`, it will be considered _blocking_ automatically, even if the method is not annotated with `@Blocking`.
====

=== Acknowledgment Strategies

All messages received by a consumer must be acknowledged.
Expand Down Expand Up @@ -1935,17 +1940,16 @@ public class FruitConsumer {
@Incoming("fruits") // <1>
@Transactional // <2>
@Blocking // <3>
public void persistFruits(Fruit fruit) { // <4>
fruit.persist(); // <5>
public void persistFruits(Fruit fruit) { // <3>
fruit.persist(); // <4>
}
}
----
<1> Configuring the incoming channel. This channel reads from Kafka.
<2> As we are writing in a database, we must be in a transaction. This annotation starts a new transaction and commits it when the method returns.
<3> Writing to a database using classic Hibernate is blocking. So, you must tell to Quarkus that the method must be called on a worker thread you can block (and not an I/O thread).
<4> The method receives each Fruit. Note that you would need a deserializer to reconstruct the Fruit instances from the Kafka records.
<5> Persist the received `fruit` object.
Quarkus automatically considers the method as _blocking_. Indeed, writing to a database using classic Hibernate is blocking. So, Quarkus calls the method on a worker thread you can block (and not an I/O thread).
<3> The method receives each Fruit. Note that you would need a deserializer to reconstruct the Fruit instances from the Kafka records.
<4> Persist the received `fruit` object.

As mentioned in <4>, you need a deserializer that can create a `Fruit` from the record.
This can be done using a Jackson deserializer:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.MERGE;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.OUTGOING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.SMALLRYE_BLOCKING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.TRANSACTIONAL;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.VOID_CLASS;

import java.util.ArrayList;
Expand Down Expand Up @@ -158,7 +159,8 @@ public Integer get() {

AnnotationInstance blockingAnnotation = methodInfo.annotation(BLOCKING);
AnnotationInstance smallryeBlockingAnnotation = methodInfo.annotation(SMALLRYE_BLOCKING);
if (blockingAnnotation != null || smallryeBlockingAnnotation != null) {
AnnotationInstance transactionalAnnotation = methodInfo.annotation(TRANSACTIONAL);
if (blockingAnnotation != null || smallryeBlockingAnnotation != null || transactionalAnnotation != null) {
mediatorConfigurationSupport.validateBlocking(validationOutput);
configuration.setBlocking(true);
if (blockingAnnotation != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public final class ReactiveMessagingDotNames {
static final DotName ABSTRACT_SUBSCRIBING_COROUTINE_INVOKER = DotName
.createSimple("io.quarkus.smallrye.reactivemessaging.runtime.kotlin.AbstractSubscribingCoroutineInvoker");

static final DotName TRANSACTIONAL = DotName.createSimple("javax.transaction.Transactional");

private ReactiveMessagingDotNames() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static io.quarkus.deployment.annotations.ExecutionTime.STATIC_INIT;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.BLOCKING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.SMALLRYE_BLOCKING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.TRANSACTIONAL;

import java.lang.reflect.Modifier;
import java.util.ArrayList;
Expand Down Expand Up @@ -218,9 +219,12 @@ public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext re
MethodInfo methodInfo = mediatorMethod.getMethod();
BeanInfo bean = mediatorMethod.getBean();

if (methodInfo.hasAnnotation(BLOCKING) || methodInfo.hasAnnotation(SMALLRYE_BLOCKING)) {
if (methodInfo.hasAnnotation(BLOCKING) || methodInfo.hasAnnotation(SMALLRYE_BLOCKING)
|| methodInfo.hasAnnotation(TRANSACTIONAL)) {
// Just in case both annotation are used, use @Blocking value.
String poolName = Blocking.DEFAULT_WORKER_POOL;

// If the method is annotated with the SmallRye Reactive Messaging @Blocking, extract the worker pool name if any
if (methodInfo.hasAnnotation(ReactiveMessagingDotNames.BLOCKING)) {
AnnotationInstance blocking = methodInfo.annotation(ReactiveMessagingDotNames.BLOCKING);
poolName = blocking.value() == null ? Blocking.DEFAULT_WORKER_POOL : blocking.value().asString();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.quarkus.smallrye.reactivemessaging;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.io.File;
import java.util.List;
import java.util.stream.Collectors;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.reactivestreams.Publisher;

import io.quarkus.smallrye.reactivemessaging.blocking.beans.IncomingUsingTransactional;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.mutiny.Multi;

public class TransactionalSubscriberTest {
@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.withApplicationRoot((jar) -> jar
.addClasses(ProduceIn.class, IncomingUsingTransactional.class)
.addAsResource(
new File("src/test/resources/config/worker-config.properties"),
"application.properties"));

@Inject
IncomingUsingTransactional incoming;

@Test
public void testIncomingUsingRunOnWorkerThread() {
await().until(() -> incoming.list().size() == 6);
assertThat(incoming.list()).contains("a", "b", "c", "d", "e", "f");

List<String> threadNames = incoming.threads().stream().distinct()
.collect(Collectors.toList());
assertThat(threadNames.contains(Thread.currentThread().getName())).isFalse();
for (String name : threadNames) {
assertThat(name.startsWith("executor-thread-")).isTrue();
}
}

@ApplicationScoped
public static class ProduceIn {
@Outgoing("in")
public Publisher<String> produce() {
return Multi.createFrom().items("a", "b", "c", "d", "e", "f");
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.quarkus.smallrye.reactivemessaging.blocking.beans;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import javax.enterprise.context.ApplicationScoped;
import javax.transaction.Transactional;

import org.eclipse.microprofile.reactive.messaging.Incoming;

@ApplicationScoped
public class IncomingUsingTransactional {
private final List<String> list = new CopyOnWriteArrayList<>();
private final List<String> threads = new CopyOnWriteArrayList<>();

@Incoming("in")
@Transactional
public void consume(String s) {
if (s.equals("b") || s.equals("d") || s.equals("f")) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
threads.add(Thread.currentThread().getName());
list.add(s);
}

public List<String> list() {
return list;
}

public List<String> threads() {
return threads;
}
}

0 comments on commit 267941c

Please sign in to comment.