diff --git a/src/main/docs/guide/kafkaClient/kafkaClientTx.adoc b/src/main/docs/guide/kafkaClient/kafkaClientTx.adoc index 5d6e1c024..cdd299fb6 100644 --- a/src/main/docs/guide/kafkaClient/kafkaClientTx.adoc +++ b/src/main/docs/guide/kafkaClient/kafkaClientTx.adoc @@ -1,5 +1,9 @@ Transaction processing can be enabled by defining `transactionalId` on `@KafkaClient`, which will initialize the producer for transactional usage and wrap any send operation with a transaction demarcation. +.Transactional Client + +snippet::io.micronaut.kafka.docs.producer.config.TransactionalClient[tags = 'clazz'] + .Alternative Kafka producer transactional code [source,java] ---- @@ -11,4 +15,13 @@ try { } catch (Exception e) { producer.abortTransaction(); } ----- \ No newline at end of file +---- + +[NOTE] +==== +`@KafkaClient` beans are by default singleton. When using multiple threads, you must either synchronize access to the individual instance or declare the bean as `@Prototype`. Additionally, you can use link:https://docs.micronaut.io/latest/guide/#_using_random_properties[random properties] to your advantage so that each instance of your producer gets a different transactional ID. + +.Random transactional ID + +snippet::io.micronaut.kafka.docs.producer.config.RandomTransactionalIdClient[tags = 'clazz'] +==== diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/producer/config/RandomTransactionalIdClient.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/producer/config/RandomTransactionalIdClient.groovy new file mode 100644 index 000000000..d6f80d72c --- /dev/null +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/producer/config/RandomTransactionalIdClient.groovy @@ -0,0 +1,14 @@ +package io.micronaut.kafka.docs.producer.config + +import io.micronaut.configuration.kafka.annotation.KafkaClient +import io.micronaut.context.annotation.Prototype +import io.micronaut.context.annotation.Requires + +@Requires(property = "spec.name", value = "RandomTransactionalIdClientTest") +// tag::clazz[] +@Prototype +@KafkaClient(id = "my-client", transactionalId = 'my-tx-id-${random.uuid}') +interface RandomTransactionalIdClient { + // define client API +} +// end::clazz[] diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/producer/config/TransactionalClient.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/producer/config/TransactionalClient.groovy new file mode 100644 index 000000000..8b5f164d7 --- /dev/null +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/producer/config/TransactionalClient.groovy @@ -0,0 +1,12 @@ +package io.micronaut.kafka.docs.producer.config + +import io.micronaut.configuration.kafka.annotation.KafkaClient +import io.micronaut.context.annotation.Requires + +@Requires(property = "spec.name", value = "TransactionalClientTest") +// tag::clazz[] +@KafkaClient(id = "my-client", transactionalId = "my-tx-id") +interface TransactionalClient { + // define client API +} +// end::clazz[] diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/producer/config/RandomTransactionalIdClient.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/producer/config/RandomTransactionalIdClient.kt new file mode 100644 index 000000000..db5e26258 --- /dev/null +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/producer/config/RandomTransactionalIdClient.kt @@ -0,0 +1,14 @@ +package io.micronaut.kafka.docs.producer.config + +import io.micronaut.configuration.kafka.annotation.KafkaClient +import io.micronaut.context.annotation.Prototype +import io.micronaut.context.annotation.Requires + +@Requires(property = "spec.name", value = "RandomTransactionalIdClientTest") +// tag::clazz[] +@Prototype +@KafkaClient(id = "my-client", transactionalId = "my-tx-id-\${random.uuid}") +interface RandomTransactionalIdClient { + // define client API +} +// end::clazz[] diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/producer/config/TransactionalClient.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/producer/config/TransactionalClient.kt new file mode 100644 index 000000000..8b5f164d7 --- /dev/null +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/producer/config/TransactionalClient.kt @@ -0,0 +1,12 @@ +package io.micronaut.kafka.docs.producer.config + +import io.micronaut.configuration.kafka.annotation.KafkaClient +import io.micronaut.context.annotation.Requires + +@Requires(property = "spec.name", value = "TransactionalClientTest") +// tag::clazz[] +@KafkaClient(id = "my-client", transactionalId = "my-tx-id") +interface TransactionalClient { + // define client API +} +// end::clazz[] diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/producer/config/RandomTransactionalIdClient.java b/test-suite/src/test/java/io/micronaut/kafka/docs/producer/config/RandomTransactionalIdClient.java new file mode 100644 index 000000000..dd528994f --- /dev/null +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/producer/config/RandomTransactionalIdClient.java @@ -0,0 +1,14 @@ +package io.micronaut.kafka.docs.producer.config; + +import io.micronaut.configuration.kafka.annotation.KafkaClient; +import io.micronaut.context.annotation.Prototype; +import io.micronaut.context.annotation.Requires; + +@Requires(property = "spec.name", value = "RandomTransactionalIdClientTest") +// tag::clazz[] +@Prototype +@KafkaClient(id = "my-client", transactionalId = "my-tx-id-${random.uuid}") +public interface RandomTransactionalIdClient { + // define client API +} +// end::clazz[] diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/producer/config/TransactionalClient.java b/test-suite/src/test/java/io/micronaut/kafka/docs/producer/config/TransactionalClient.java new file mode 100644 index 000000000..1e879138f --- /dev/null +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/producer/config/TransactionalClient.java @@ -0,0 +1,12 @@ +package io.micronaut.kafka.docs.producer.config; + +import io.micronaut.configuration.kafka.annotation.KafkaClient; +import io.micronaut.context.annotation.Requires; + +@Requires(property = "spec.name", value = "TransactionalClientTest") +// tag::clazz[] +@KafkaClient(id = "my-client", transactionalId = "my-tx-id") +public interface TransactionalClient { + // define client API +} +// end::clazz[]