Skip to content

Commit

Permalink
Honor transactionalId when a Kafka producer bean is injected (#865)
Browse files Browse the repository at this point in the history
* Add unit tests

* Honor `transactionalId` when a Kafka producer bean is injected

* Extract constant for annotation member
  • Loading branch information
guillermocalvo authored Sep 12, 2023
1 parent 8c650e9 commit a4a7074
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
@Factory
public class KafkaProducerFactory implements ProducerRegistry, TransactionalProducerRegistry {
private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerFactory.class);
private static final String TRANSACTIONAL_ID = "transactionalId";
private final Map<ClientKey, Producer> clients = new ConcurrentHashMap<>();
private final BeanContext beanContext;
private final SerdeRegistry serdeRegistry;
Expand Down Expand Up @@ -150,7 +151,12 @@ public <K, V> Producer<K, V> getProducer(
properties.put(ProducerConfig.ACKS_CONFIG, acksValue);
}

return getKafkaProducer(id, null, k, v, false, properties);
final String transactionalId = annotationMetadata.stringValue(KafkaClient.class, TRANSACTIONAL_ID)
.filter(StringUtils::isNotEmpty)
.orElse(null);
final boolean transactional = transactionalId != null;

return getKafkaProducer(id, transactionalId, k, v, transactional, properties);
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import io.micronaut.core.type.Argument
import io.micronaut.messaging.annotation.SendTo
import jakarta.inject.Singleton
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.IntegerSerializer
import org.apache.kafka.common.serialization.StringSerializer
Expand Down Expand Up @@ -110,6 +112,30 @@ class KafkaTxSpec extends AbstractKafkaContainerSpec {
}
}

void "should inject a transactional producer"() {
given:
Producer<String, String> producer = context.getBean(MyService).transactionalProducer

when:
producer.beginTransaction()
producer.abortTransaction()

then:
noExceptionThrown()
}

void "should inject a non-transactional producer"() {
given:
Producer<String, String> producer = context.getBean(MyService).nonTransactionalProducer

when:
producer.beginTransaction()

then:
IllegalStateException e = thrown()
e.message == 'Transactional method invoked on a non-transactional producer.'
}

@Requires(property = 'spec.name', value = 'KafkaTxSpec')
@KafkaListener(
isolation = READ_COMMITTED,
Expand Down Expand Up @@ -226,4 +252,17 @@ class KafkaTxSpec extends AbstractKafkaContainerSpec {
@Topic("tx-strings")
void send(String strings)
}

@Singleton
@Requires(property = 'spec.name', value = 'KafkaTxSpec')
static class MyService {
final Producer<String, String> transactionalProducer
final Producer<String, String> nonTransactionalProducer
MyService(
@KafkaClient(transactionalId = "tx-string-producer") Producer<String, String> transactionalProducer,
@KafkaClient Producer<String, String> nonTransactionalProducer) {
this.transactionalProducer = transactionalProducer
this.nonTransactionalProducer = nonTransactionalProducer
}
}
}

0 comments on commit a4a7074

Please sign in to comment.