Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider blocking reactive messaging methods using @Transactional #21889

Merged
merged 1 commit into from
Dec 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion docs/src/main/asciidoc/amqp-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,6 @@ import javax.transaction.Transactional;
public class PriceStorage {

@Incoming("prices")
@Blocking
@Transactional
public void store(int priceInUsd) {
Price price = new Price();
Expand All @@ -393,6 +392,12 @@ The first one provides more fine-grained tuning such as the worker pool to use a
The second one, used also with other reactive features of Quarkus, uses the default worker pool and preserves the order.
====

[TIP]
.@Transactional
====
If your method is annotated with `@Transactional`, it will be considered _blocking_ automatically, even if the method is not annotated with `@Blocking`.
====

== Customizing the underlying AMQP client

The connector uses the Vert.x AMQP client underneath.
Expand Down
18 changes: 11 additions & 7 deletions docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ import javax.transaction.Transactional;
public class PriceStorage {

@Incoming("prices")
@Blocking
@Transactional
public void store(int priceInUsd) {
Price price = new Price();
Expand Down Expand Up @@ -263,6 +262,12 @@ The second one, used also with other reactive features of Quarkus, uses the defa
Detailed information on the usage of `@Blocking` annotation can be found in https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/3.1/advanced/blocking.html[SmallRye Reactive Messaging – Handling blocking execution].
====

[TIP]
.@Transactional
====
If your method is annotated with `@Transactional`, it will be considered _blocking_ automatically, even if the method is not annotated with `@Blocking`.
====

=== Acknowledgment Strategies

All messages received by a consumer must be acknowledged.
Expand Down Expand Up @@ -1935,17 +1940,16 @@ public class FruitConsumer {

@Incoming("fruits") // <1>
@Transactional // <2>
@Blocking // <3>
public void persistFruits(Fruit fruit) { // <4>
fruit.persist(); // <5>
public void persistFruits(Fruit fruit) { // <3>
fruit.persist(); // <4>
}
}
----
<1> Configuring the incoming channel. This channel reads from Kafka.
<2> As we are writing in a database, we must be in a transaction. This annotation starts a new transaction and commits it when the method returns.
<3> Writing to a database using classic Hibernate is blocking. So, you must tell to Quarkus that the method must be called on a worker thread you can block (and not an I/O thread).
<4> The method receives each Fruit. Note that you would need a deserializer to reconstruct the Fruit instances from the Kafka records.
<5> Persist the received `fruit` object.
Quarkus automatically considers the method as _blocking_. Indeed, writing to a database using classic Hibernate is blocking. So, Quarkus calls the method on a worker thread you can block (and not an I/O thread).
<3> The method receives each Fruit. Note that you would need a deserializer to reconstruct the Fruit instances from the Kafka records.
<4> Persist the received `fruit` object.

As mentioned in <4>, you need a deserializer that can create a `Fruit` from the record.
This can be done using a Jackson deserializer:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.MERGE;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.OUTGOING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.SMALLRYE_BLOCKING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.TRANSACTIONAL;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.VOID_CLASS;

import java.util.ArrayList;
Expand Down Expand Up @@ -158,7 +159,8 @@ public Integer get() {

AnnotationInstance blockingAnnotation = methodInfo.annotation(BLOCKING);
AnnotationInstance smallryeBlockingAnnotation = methodInfo.annotation(SMALLRYE_BLOCKING);
if (blockingAnnotation != null || smallryeBlockingAnnotation != null) {
AnnotationInstance transactionalAnnotation = methodInfo.annotation(TRANSACTIONAL);
if (blockingAnnotation != null || smallryeBlockingAnnotation != null || transactionalAnnotation != null) {
mediatorConfigurationSupport.validateBlocking(validationOutput);
configuration.setBlocking(true);
if (blockingAnnotation != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public final class ReactiveMessagingDotNames {
static final DotName ABSTRACT_SUBSCRIBING_COROUTINE_INVOKER = DotName
.createSimple("io.quarkus.smallrye.reactivemessaging.runtime.kotlin.AbstractSubscribingCoroutineInvoker");

static final DotName TRANSACTIONAL = DotName.createSimple("javax.transaction.Transactional");

private ReactiveMessagingDotNames() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static io.quarkus.deployment.annotations.ExecutionTime.STATIC_INIT;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.BLOCKING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.SMALLRYE_BLOCKING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.TRANSACTIONAL;

import java.lang.reflect.Modifier;
import java.util.ArrayList;
Expand Down Expand Up @@ -218,9 +219,12 @@ public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext re
MethodInfo methodInfo = mediatorMethod.getMethod();
BeanInfo bean = mediatorMethod.getBean();

if (methodInfo.hasAnnotation(BLOCKING) || methodInfo.hasAnnotation(SMALLRYE_BLOCKING)) {
if (methodInfo.hasAnnotation(BLOCKING) || methodInfo.hasAnnotation(SMALLRYE_BLOCKING)
|| methodInfo.hasAnnotation(TRANSACTIONAL)) {
// Just in case both annotation are used, use @Blocking value.
String poolName = Blocking.DEFAULT_WORKER_POOL;

// If the method is annotated with the SmallRye Reactive Messaging @Blocking, extract the worker pool name if any
if (methodInfo.hasAnnotation(ReactiveMessagingDotNames.BLOCKING)) {
AnnotationInstance blocking = methodInfo.annotation(ReactiveMessagingDotNames.BLOCKING);
poolName = blocking.value() == null ? Blocking.DEFAULT_WORKER_POOL : blocking.value().asString();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.quarkus.smallrye.reactivemessaging;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.io.File;
import java.util.List;
import java.util.stream.Collectors;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.reactivestreams.Publisher;

import io.quarkus.smallrye.reactivemessaging.blocking.beans.IncomingUsingTransactional;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.mutiny.Multi;

public class TransactionalSubscriberTest {
@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.withApplicationRoot((jar) -> jar
.addClasses(ProduceIn.class, IncomingUsingTransactional.class)
.addAsResource(
new File("src/test/resources/config/worker-config.properties"),
"application.properties"));

@Inject
IncomingUsingTransactional incoming;

@Test
public void testIncomingUsingRunOnWorkerThread() {
await().until(() -> incoming.list().size() == 6);
assertThat(incoming.list()).contains("a", "b", "c", "d", "e", "f");

List<String> threadNames = incoming.threads().stream().distinct()
.collect(Collectors.toList());
assertThat(threadNames.contains(Thread.currentThread().getName())).isFalse();
for (String name : threadNames) {
assertThat(name.startsWith("executor-thread-")).isTrue();
}
}

@ApplicationScoped
public static class ProduceIn {
@Outgoing("in")
public Publisher<String> produce() {
return Multi.createFrom().items("a", "b", "c", "d", "e", "f");
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.quarkus.smallrye.reactivemessaging.blocking.beans;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import javax.enterprise.context.ApplicationScoped;
import javax.transaction.Transactional;

import org.eclipse.microprofile.reactive.messaging.Incoming;

@ApplicationScoped
public class IncomingUsingTransactional {
private final List<String> list = new CopyOnWriteArrayList<>();
private final List<String> threads = new CopyOnWriteArrayList<>();

@Incoming("in")
@Transactional
public void consume(String s) {
if (s.equals("b") || s.equals("d") || s.equals("f")) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
threads.add(Thread.currentThread().getName());
list.add(s);
}

public List<String> list() {
return list;
}

public List<String> threads() {
return threads;
}
}