diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerState.java b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerState.java index 78aee22ac..18e2bebc0 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerState.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerState.java @@ -284,6 +284,11 @@ private synchronized void resumeTopicPartitions() { } private void processAsBatch(final ConsumerRecords consumerRecords) { + // Bind Acknowledgement argument + if (info.ackArg != null) { + final Map batchOffsets = getBatchOffsets(consumerRecords); + boundArguments.put(info.ackArg, (KafkaAcknowledgement) () -> kafkaConsumer.commitSync(batchOffsets)); + } final Object methodResult = kafkaConsumerProcessor.bindAsBatch(info.method, boundArguments, consumerRecords).invoke(consumerBean); normalizeResult(methodResult).ifPresent(result -> { final Flux resultFlux = toFlux(result); @@ -303,6 +308,16 @@ private void processAsBatch(final ConsumerRecords consumerRecords) { failed = false; } + private Map getBatchOffsets(ConsumerRecords consumerRecords) { + Map batchOffsets = new HashMap<>(); + for (ConsumerRecord consumerRecord : consumerRecords) { + final TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition()); + final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(consumerRecord.offset() + 1, null); + batchOffsets.put(topicPartition, offsetAndMetadata); + } + return batchOffsets; + } + @Nullable private static Optional normalizeResult(@Nullable Object result) { return Optional.ofNullable(result).map(x -> x.getClass().isArray() ? Arrays.asList((Object[]) x) : x); diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/offsets/BatchManualAckSpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/offsets/BatchManualAckSpec.groovy new file mode 100644 index 000000000..8463317b4 --- /dev/null +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/offsets/BatchManualAckSpec.groovy @@ -0,0 +1,68 @@ +package io.micronaut.configuration.kafka.offsets + +import io.micronaut.configuration.kafka.AbstractKafkaContainerSpec +import io.micronaut.configuration.kafka.annotation.KafkaClient +import io.micronaut.configuration.kafka.annotation.KafkaListener +import io.micronaut.configuration.kafka.annotation.Topic +import io.micronaut.context.annotation.Requires +import io.micronaut.messaging.Acknowledgement +import io.micronaut.serde.annotation.Serdeable +import jakarta.inject.Singleton + +import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST +import static io.micronaut.configuration.kafka.annotation.OffsetStrategy.DISABLED +import static io.micronaut.configuration.kafka.config.AbstractKafkaConfiguration.EMBEDDED_TOPICS + +class BatchManualAckSpec extends AbstractKafkaContainerSpec { + + public static final String TOPIC_SYNC = "BatchManualAckSpec-products-sync" + + protected Map getConfiguration() { + super.configuration + + [(EMBEDDED_TOPICS): [TOPIC_SYNC]] + } + + void "test manual ack"() { + given: + ProductClient client = context.getBean(ProductClient) + ProductListener listener = context.getBean(ProductListener) + + when: + client.send(new Product(name: "Apple")) + client.send(new Product(name: "Orange")) + + then: + conditions.eventually { + listener.products.size() == 2 + listener.products.find() { it.name == "Apple"} + } + } + + @Requires(property = 'spec.name', value = 'BatchManualAckSpec') + @KafkaClient + static interface ProductClient { + @Topic(BatchManualAckSpec.TOPIC_SYNC) + void send(Product product) + } + + @Requires(property = 'spec.name', value = 'BatchManualAckSpec') + @Singleton + static class ProductListener { + + List products = [] + + @KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED, batch = true) + @Topic(BatchManualAckSpec.TOPIC_SYNC) + void receive(List products, Acknowledgement acknowledgement) { + for (p in products) { + this.products << p + } + acknowledgement.ack() + } + } + + @Serdeable + static class Product { + String name + } +} diff --git a/src/main/docs/guide/kafkaListener/kafkaListenerBatch.adoc b/src/main/docs/guide/kafkaListener/kafkaListenerBatch.adoc index 286b218d6..c3d83d5a0 100644 --- a/src/main/docs/guide/kafkaListener/kafkaListenerBatch.adoc +++ b/src/main/docs/guide/kafkaListener/kafkaListenerBatch.adoc @@ -16,16 +16,29 @@ Note in the previous case offsets will automatically be committed for the whole == Manually Committing Offsets with Batch +As with one by one message processing, if you set the `OffsetStrategy` to api:configuration.kafka.annotation.OffsetStrategy#DISABLED[] it becomes your responsibility to commit offsets. + +If you want to commit the entire batch of offsets at once during the course of processing, then the simplest approach is to add an argument of type link:{apimicronaut}messaging/Acknowledgement.html[Acknowledgement] and call the `ack()` method to commit the batch of offsets synchronously: + +.Committing a Batch of Offsets Manually with ack() + +snippet::io.micronaut.kafka.docs.consumer.batch.ack.BookListener[tags=method, indent=0] + +<1> Committing offsets automatically is disabled +<2> The listener method specifies a parameter of type link:{apimicronaut}messaging/Acknowledgement.html[Acknowledgement] +<3> The `ack()` method is called once the records have been processed + You can also take more control of committing offsets when doing batch processing by specifying a method that receives the offsets in addition to the batch: .Committing Offsets Manually with Batch -snippet::io.micronaut.kafka.docs.consumer.batch.BookListener[tags=manual, indent=0] +snippet::io.micronaut.kafka.docs.consumer.batch.manual.BookListener[tags=method, indent=0] -<1> The method receives the batch of books as a list of consumer records -<2> Each record is processed -<3> The offset, partition and topic is read for the record -<4> Offsets are committed +<1> Committing offsets automatically is disabled +<2> The method receives the batch of books as a list of consumer records +<3> Each record is processed +<4> The offset, partition and topic is read for the record +<5> Offsets are committed This example is fairly trivial in that it commits offsets after processing each record in a batch, but you can for example commit after processing every 10, or every 100 or whatever makes sense for your application. diff --git a/src/main/docs/guide/kafkaListener/kafkaOffsets.adoc b/src/main/docs/guide/kafkaListener/kafkaOffsets.adoc index e69428705..25a18bab6 100644 --- a/src/main/docs/guide/kafkaListener/kafkaOffsets.adoc +++ b/src/main/docs/guide/kafkaListener/kafkaOffsets.adoc @@ -39,14 +39,14 @@ If you set the `OffsetStrategy` to api:configuration.kafka.annotation.OffsetStra There are a couple of ways that can be achieved. -The simplest way is to define an argument of type api:configuration.kafka.Acknowledgement[] and call the `ack()` method to commit offsets synchronously: +The simplest way is to define an argument of type api:configuration.kafka.annotation.KafkaClient.Acknowledge[] and call the `ack()` method to commit offsets synchronously: .Committing offsets with `ack()` snippet::io.micronaut.kafka.docs.consumer.offsets.ack.ProductListener[tags="clazz"] <1> Committing offsets automatically is disabled -<2> The listener method specifies a parameter of type api:configuration.kafka.Acknowledgement[] +<2> The listener method specifies a parameter of type api:configuration.kafka.annotation.KafkaClient.Acknowledge[] <3> The `ack()` method is called once the record has been processed Alternatively, you an supply a `KafkaConsumer` method argument and then call `commitSync` (or `commitAsync`) yourself when you are ready to commit offsets: diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/batch/BookListener.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/batch/BookListener.groovy index 77355b9aa..9ea679514 100644 --- a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/batch/BookListener.groovy +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/batch/BookListener.groovy @@ -6,10 +6,6 @@ import groovy.util.logging.Slf4j import io.micronaut.configuration.kafka.annotation.KafkaListener import io.micronaut.configuration.kafka.annotation.Topic import io.micronaut.context.annotation.Requires -import org.apache.kafka.clients.consumer.Consumer -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.apache.kafka.clients.consumer.OffsetAndMetadata -import org.apache.kafka.common.TopicPartition import reactor.core.publisher.Flux // end::imports[] @@ -37,30 +33,6 @@ class BookListener { ) } // end::reactive[] - - // tag::manual[] - @Topic("all-the-books") - void receive(List> records, Consumer kafkaConsumer) { // <1> - - for (int i = 0; i < records.size(); i++) { - ConsumerRecord record = records.get(i) // <2> - - // process the book - Book book = record.value() - - // commit offsets - String topic = record.topic() - int partition = record.partition() - long offset = record.offset() // <3> - - kafkaConsumer.commitSync(Collections.singletonMap( // <4> - new TopicPartition(topic, partition), - new OffsetAndMetadata(offset + 1, "my metadata") - )) - - } - } - // end::manual[] //tag::endclazz[] } //end::endclazz[] diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/batch/ack/BookListener.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/batch/ack/BookListener.groovy new file mode 100644 index 000000000..159a09418 --- /dev/null +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/batch/ack/BookListener.groovy @@ -0,0 +1,25 @@ +package io.micronaut.kafka.docs.consumer.batch.ack + +import io.micronaut.configuration.kafka.annotation.KafkaListener +import io.micronaut.configuration.kafka.annotation.Topic +import io.micronaut.context.annotation.Requires +import io.micronaut.kafka.docs.consumer.batch.Book +import io.micronaut.messaging.Acknowledgement + +import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST +import static io.micronaut.configuration.kafka.annotation.OffsetStrategy.DISABLED + +@Requires(property = 'spec.name', value = 'BatchManualAckSpec') +class BookListener { + + // tag::method[] + @KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED, batch = true) // <1> + @Topic("all-the-books") + void receive(List books, Acknowledgement acknowledgement) { // <2> + + //process the books + + acknowledgement.ack() // <3> + } + // end::method[] +} diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/batch/manual/BookListener.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/batch/manual/BookListener.groovy new file mode 100644 index 000000000..8413a8794 --- /dev/null +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/batch/manual/BookListener.groovy @@ -0,0 +1,41 @@ +package io.micronaut.kafka.docs.consumer.batch.manual + +import io.micronaut.configuration.kafka.annotation.KafkaListener +import io.micronaut.configuration.kafka.annotation.Topic +import io.micronaut.context.annotation.Requires +import io.micronaut.kafka.docs.consumer.batch.Book +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.consumer.OffsetAndMetadata +import org.apache.kafka.common.TopicPartition + +import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST +import static io.micronaut.configuration.kafka.annotation.OffsetStrategy.DISABLED + +@Requires(property = 'spec.name', value = 'BatchManualAckSpec') +class BookListener { + + // tag::method[] + @KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED, batch = true) // <1> + @Topic("all-the-books") + void receive(List> records, Consumer kafkaConsumer) { // <2> + + for (int i = 0; i < records.size(); i++) { + ConsumerRecord record = records.get(i) // <3> + + // process the book + Book book = record.value() + + // commit offsets + String topic = record.topic() + int partition = record.partition() + long offset = record.offset() // <4> + + kafkaConsumer.commitSync(Collections.singletonMap( // <5> + new TopicPartition(topic, partition), + new OffsetAndMetadata(offset + 1, "my metadata") + )) + } + } + // end::method[] +} diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/batch/BookListener.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/batch/BookListener.kt index 267340c04..17c2b5886 100644 --- a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/batch/BookListener.kt +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/batch/BookListener.kt @@ -4,10 +4,6 @@ package io.micronaut.kafka.docs.consumer.batch import io.micronaut.configuration.kafka.annotation.KafkaListener import io.micronaut.configuration.kafka.annotation.Topic import io.micronaut.context.annotation.Requires -import org.apache.kafka.clients.consumer.Consumer -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.apache.kafka.clients.consumer.OffsetAndMetadata -import org.apache.kafka.common.TopicPartition import org.slf4j.LoggerFactory.getLogger import reactor.core.publisher.Flux import java.util.* @@ -39,29 +35,6 @@ class BookListener { } } // end::reactive[] - - // tag::manual[] - @Topic("all-the-books") - fun receive(records: List>, kafkaConsumer: Consumer<*, *>) { // <1> - for (i in records.indices) { - val record = records[i] // <2> - - // process the book - val book = record.value() - - // commit offsets - val topic = record.topic() - val partition = record.partition() - val offset = record.offset() // <3> - kafkaConsumer.commitSync( - Collections.singletonMap( // <4> - TopicPartition(topic, partition), - OffsetAndMetadata(offset + 1, "my metadata") - ) - ) - } - } - // end::manual[] //tag::endclazz[] } //end::endclazz[] diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/batch/ack/BookListener.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/batch/ack/BookListener.kt new file mode 100644 index 000000000..8834cc982 --- /dev/null +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/batch/ack/BookListener.kt @@ -0,0 +1,24 @@ +package io.micronaut.kafka.docs.consumer.batch.ack + +import io.micronaut.configuration.kafka.annotation.KafkaListener +import io.micronaut.configuration.kafka.annotation.OffsetReset +import io.micronaut.configuration.kafka.annotation.OffsetStrategy +import io.micronaut.configuration.kafka.annotation.Topic +import io.micronaut.context.annotation.Requires +import io.micronaut.kafka.docs.consumer.batch.Book +import io.micronaut.messaging.Acknowledgement + +@Requires(property = "spec.name", value = "BatchManualAckSpec") +internal class BookListener { + + // tag::method[] + @KafkaListener(offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED, batch = true) // <1> + @Topic("all-the-books") + fun receive(books: List?, acknowledgement: Acknowledgement) { // <2> + + //process the books + + acknowledgement.ack() // <3> + } + // end::method[] +} diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/batch/manual/BookListener.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/batch/manual/BookListener.kt new file mode 100644 index 000000000..14e0d2982 --- /dev/null +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/batch/manual/BookListener.kt @@ -0,0 +1,45 @@ +package io.micronaut.kafka.docs.consumer.batch.manual + +// tag::imports[] +import io.micronaut.configuration.kafka.annotation.KafkaListener +import io.micronaut.configuration.kafka.annotation.OffsetReset +import io.micronaut.configuration.kafka.annotation.OffsetStrategy +import io.micronaut.configuration.kafka.annotation.Topic +import io.micronaut.context.annotation.Requires +import io.micronaut.kafka.docs.consumer.batch.Book +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.consumer.OffsetAndMetadata +import org.apache.kafka.common.TopicPartition + +import java.util.* +// end::imports[] + +@Requires(property = "spec.name", value = "BatchManualAckSpec") +internal class BookListener { + + // tag::method[] + @KafkaListener(offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED, batch = true) // <1> + @Topic("all-the-books") + fun receive(records: List>, kafkaConsumer: Consumer<*, *>) { // <2> + for (i in records.indices) { + val record = records[i] // <3> + + // process the book + val book = record.value() + + // commit offsets + val topic = record.topic() + val partition = record.partition() + val offset = record.offset() // <4> + kafkaConsumer.commitSync( + Collections.singletonMap( // <5> + TopicPartition(topic, partition), + OffsetAndMetadata(offset + 1, "my metadata") + ) + ) + } + } + // end::method[] +} + diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/batch/BookListener.java b/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/batch/BookListener.java index 67042bd42..9adef6f7f 100644 --- a/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/batch/BookListener.java +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/batch/BookListener.java @@ -4,14 +4,9 @@ import io.micronaut.configuration.kafka.annotation.KafkaListener; import io.micronaut.configuration.kafka.annotation.Topic; import io.micronaut.context.annotation.Requires; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import reactor.core.publisher.Flux; -import java.util.Collections; import java.util.List; import static org.slf4j.LoggerFactory.getLogger; @@ -41,30 +36,6 @@ public Flux receiveFlux(Flux books) { ); } // end::reactive[] - - // tag::manual[] - @Topic("all-the-books") - public void receive(List> records, Consumer kafkaConsumer) { // <1> - - for (int i = 0; i < records.size(); i++) { - ConsumerRecord record = records.get(i); // <2> - - // process the book - Book book = record.value(); - - // commit offsets - String topic = record.topic(); - int partition = record.partition(); - long offset = record.offset(); // <3> - - kafkaConsumer.commitSync(Collections.singletonMap( // <4> - new TopicPartition(topic, partition), - new OffsetAndMetadata(offset + 1, "my metadata") - )); - - } - } - // end::manual[] //tag::endclazz[] } //end::endclazz[] diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/batch/ack/BookListener.java b/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/batch/ack/BookListener.java new file mode 100644 index 000000000..f827a4a25 --- /dev/null +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/batch/ack/BookListener.java @@ -0,0 +1,27 @@ +package io.micronaut.kafka.docs.consumer.batch.ack; + +import io.micronaut.configuration.kafka.annotation.KafkaListener; +import io.micronaut.configuration.kafka.annotation.Topic; +import io.micronaut.context.annotation.Requires; +import io.micronaut.kafka.docs.consumer.batch.Book; +import io.micronaut.messaging.Acknowledgement; + +import java.util.List; + +import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST; +import static io.micronaut.configuration.kafka.annotation.OffsetStrategy.DISABLED; + +@Requires(property = "spec.name", value = "BatchManualAckSpec") +class BookListener { + + // tag::method[] + @KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED, batch = true) // <1> + @Topic("all-the-books") + public void receive(List books, Acknowledgement acknowledgement) { // <2> + + //process the books + + acknowledgement.ack(); // <3> + } + // end::method[] +} diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/batch/manual/BookListener.java b/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/batch/manual/BookListener.java new file mode 100644 index 000000000..73dae51e6 --- /dev/null +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/batch/manual/BookListener.java @@ -0,0 +1,45 @@ +package io.micronaut.kafka.docs.consumer.batch.manual; + +import io.micronaut.configuration.kafka.annotation.KafkaListener; +import io.micronaut.configuration.kafka.annotation.Topic; +import io.micronaut.context.annotation.Requires; +import io.micronaut.kafka.docs.consumer.batch.Book; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; + +import java.util.Collections; +import java.util.List; + +import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST; +import static io.micronaut.configuration.kafka.annotation.OffsetStrategy.DISABLED; + +@Requires(property = "spec.name", value = "BatchManualAckSpec") +class BookListener { + + // tag::method[] + @KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED, batch = true) // <1> + @Topic("all-the-books") + public void receive(List> records, Consumer kafkaConsumer) { // <2> + + for (int i = 0; i < records.size(); i++) { + ConsumerRecord record = records.get(i); // <3> + + // process the book + Book book = record.value(); + + // commit offsets + String topic = record.topic(); + int partition = record.partition(); + long offset = record.offset(); // <4> + + kafkaConsumer.commitSync(Collections.singletonMap( // <5> + new TopicPartition(topic, partition), + new OffsetAndMetadata(offset + 1, "my metadata") + )); + + } + } + // end::method[] +}