Skip to content

Commit

Permalink
Transactional Kafka Producer per thread (#882)
Browse files Browse the repository at this point in the history
* Update documentation

* Add code snippets
  • Loading branch information
guillermocalvo authored Oct 3, 2023
1 parent 3bafac8 commit b5b2944
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 1 deletion.
15 changes: 14 additions & 1 deletion src/main/docs/guide/kafkaClient/kafkaClientTx.adoc
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
Transaction processing can be enabled by defining `transactionalId` on `@KafkaClient`, which will initialize the producer for transactional usage and wrap any send operation with a transaction demarcation.

.Transactional Client

snippet::io.micronaut.kafka.docs.producer.config.TransactionalClient[tags = 'clazz']

.Alternative Kafka producer transactional code
[source,java]
----
Expand All @@ -11,4 +15,13 @@ try {
} catch (Exception e) {
producer.abortTransaction();
}
----
----

[NOTE]
====
`@KafkaClient` beans are by default singleton. When using multiple threads, you must either synchronize access to the individual instance or declare the bean as `@Prototype`. Additionally, you can use link:https://docs.micronaut.io/latest/guide/#_using_random_properties[random properties] to your advantage so that each instance of your producer gets a different transactional ID.
.Random transactional ID
snippet::io.micronaut.kafka.docs.producer.config.RandomTransactionalIdClient[tags = 'clazz']
====
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.micronaut.kafka.docs.producer.config

import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.context.annotation.Prototype
import io.micronaut.context.annotation.Requires

@Requires(property = "spec.name", value = "RandomTransactionalIdClientTest")
// tag::clazz[]
@Prototype
@KafkaClient(id = "my-client", transactionalId = 'my-tx-id-${random.uuid}')
interface RandomTransactionalIdClient {
// define client API
}
// end::clazz[]
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.micronaut.kafka.docs.producer.config

import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.context.annotation.Requires

@Requires(property = "spec.name", value = "TransactionalClientTest")
// tag::clazz[]
@KafkaClient(id = "my-client", transactionalId = "my-tx-id")
interface TransactionalClient {
// define client API
}
// end::clazz[]
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.micronaut.kafka.docs.producer.config

import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.context.annotation.Prototype
import io.micronaut.context.annotation.Requires

@Requires(property = "spec.name", value = "RandomTransactionalIdClientTest")
// tag::clazz[]
@Prototype
@KafkaClient(id = "my-client", transactionalId = "my-tx-id-\${random.uuid}")
interface RandomTransactionalIdClient {
// define client API
}
// end::clazz[]
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.micronaut.kafka.docs.producer.config

import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.context.annotation.Requires

@Requires(property = "spec.name", value = "TransactionalClientTest")
// tag::clazz[]
@KafkaClient(id = "my-client", transactionalId = "my-tx-id")
interface TransactionalClient {
// define client API
}
// end::clazz[]
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.micronaut.kafka.docs.producer.config;

import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.context.annotation.Prototype;
import io.micronaut.context.annotation.Requires;

@Requires(property = "spec.name", value = "RandomTransactionalIdClientTest")
// tag::clazz[]
@Prototype
@KafkaClient(id = "my-client", transactionalId = "my-tx-id-${random.uuid}")
public interface RandomTransactionalIdClient {
// define client API
}
// end::clazz[]
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.micronaut.kafka.docs.producer.config;

import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.context.annotation.Requires;

@Requires(property = "spec.name", value = "TransactionalClientTest")
// tag::clazz[]
@KafkaClient(id = "my-client", transactionalId = "my-tx-id")
public interface TransactionalClient {
// define client API
}
// end::clazz[]

0 comments on commit b5b2944

Please sign in to comment.