diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaProducerFactory.java b/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaProducerFactory.java index 909539f16..f766d3a38 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaProducerFactory.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaProducerFactory.java @@ -61,6 +61,7 @@ @Factory public class KafkaProducerFactory implements ProducerRegistry, TransactionalProducerRegistry { private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerFactory.class); + private static final String TRANSACTIONAL_ID = "transactionalId"; private final Map clients = new ConcurrentHashMap<>(); private final BeanContext beanContext; private final SerdeRegistry serdeRegistry; @@ -150,7 +151,12 @@ public Producer getProducer( properties.put(ProducerConfig.ACKS_CONFIG, acksValue); } - return getKafkaProducer(id, null, k, v, false, properties); + final String transactionalId = annotationMetadata.stringValue(KafkaClient.class, TRANSACTIONAL_ID) + .filter(StringUtils::isNotEmpty) + .orElse(null); + final boolean transactional = transactionalId != null; + + return getKafkaProducer(id, transactionalId, k, v, transactional, properties); } @SuppressWarnings("unchecked") diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/KafkaTxSpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/KafkaTxSpec.groovy index 988025b35..9ed83ff5e 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/KafkaTxSpec.groovy +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/KafkaTxSpec.groovy @@ -8,6 +8,8 @@ import io.micronaut.configuration.kafka.annotation.Topic import io.micronaut.context.annotation.Requires import io.micronaut.core.type.Argument import io.micronaut.messaging.annotation.SendTo +import jakarta.inject.Singleton +import org.apache.kafka.clients.producer.Producer import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.serialization.IntegerSerializer import org.apache.kafka.common.serialization.StringSerializer @@ -110,6 +112,30 @@ class KafkaTxSpec extends AbstractKafkaContainerSpec { } } + void "should inject a transactional producer"() { + given: + Producer producer = context.getBean(MyService).transactionalProducer + + when: + producer.beginTransaction() + producer.abortTransaction() + + then: + noExceptionThrown() + } + + void "should inject a non-transactional producer"() { + given: + Producer producer = context.getBean(MyService).nonTransactionalProducer + + when: + producer.beginTransaction() + + then: + IllegalStateException e = thrown() + e.message == 'Transactional method invoked on a non-transactional producer.' + } + @Requires(property = 'spec.name', value = 'KafkaTxSpec') @KafkaListener( isolation = READ_COMMITTED, @@ -226,4 +252,17 @@ class KafkaTxSpec extends AbstractKafkaContainerSpec { @Topic("tx-strings") void send(String strings) } + + @Singleton + @Requires(property = 'spec.name', value = 'KafkaTxSpec') + static class MyService { + final Producer transactionalProducer + final Producer nonTransactionalProducer + MyService( + @KafkaClient(transactionalId = "tx-string-producer") Producer transactionalProducer, + @KafkaClient Producer nonTransactionalProducer) { + this.transactionalProducer = transactionalProducer + this.nonTransactionalProducer = nonTransactionalProducer + } + } }