Skip to content

Commit

Permalink
Add support for batch Acknowledgement
Browse files Browse the repository at this point in the history
Based-on: #692
Co-authored-by: Jeremy Grelle <[email protected]>
  • Loading branch information
guillermocalvo and jeremyg484 committed Oct 2, 2023
1 parent 2ee65d1 commit 330d00e
Show file tree
Hide file tree
Showing 13 changed files with 310 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,11 @@ private synchronized void resumeTopicPartitions() {
}

private void processAsBatch(final ConsumerRecords<?, ?> consumerRecords) {
// Bind Acknowledgement argument
if (info.ackArg != null) {
final Map<TopicPartition, OffsetAndMetadata> batchOffsets = getBatchOffsets(consumerRecords);
boundArguments.put(info.ackArg, (KafkaAcknowledgement) () -> kafkaConsumer.commitSync(batchOffsets));
}
final Object methodResult = kafkaConsumerProcessor.bindAsBatch(info.method, boundArguments, consumerRecords).invoke(consumerBean);
normalizeResult(methodResult).ifPresent(result -> {
final Flux<?> resultFlux = toFlux(result);
Expand All @@ -303,6 +308,16 @@ private void processAsBatch(final ConsumerRecords<?, ?> consumerRecords) {
failed = false;
}

private Map<TopicPartition, OffsetAndMetadata> getBatchOffsets(ConsumerRecords<?, ?> consumerRecords) {
Map<TopicPartition, OffsetAndMetadata> batchOffsets = new HashMap<>();
for (ConsumerRecord<?, ?> consumerRecord : consumerRecords) {
final TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(consumerRecord.offset() + 1, null);
batchOffsets.put(topicPartition, offsetAndMetadata);
}
return batchOffsets;
}

@Nullable
private static Optional<Object> normalizeResult(@Nullable Object result) {
return Optional.ofNullable(result).map(x -> x.getClass().isArray() ? Arrays.asList((Object[]) x) : x);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package io.micronaut.configuration.kafka.offsets

import io.micronaut.configuration.kafka.AbstractKafkaContainerSpec
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import io.micronaut.messaging.Acknowledgement
import io.micronaut.serde.annotation.Serdeable
import jakarta.inject.Singleton

import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST
import static io.micronaut.configuration.kafka.annotation.OffsetStrategy.DISABLED
import static io.micronaut.configuration.kafka.config.AbstractKafkaConfiguration.EMBEDDED_TOPICS

class BatchManualAckSpec extends AbstractKafkaContainerSpec {

public static final String TOPIC_SYNC = "BatchManualAckSpec-products-sync"

protected Map<String, Object> getConfiguration() {
super.configuration +
[(EMBEDDED_TOPICS): [TOPIC_SYNC]]
}

void "test manual ack"() {
given:
ProductClient client = context.getBean(ProductClient)
ProductListener listener = context.getBean(ProductListener)

when:
client.send(new Product(name: "Apple"))
client.send(new Product(name: "Orange"))

then:
conditions.eventually {
listener.products.size() == 2
listener.products.find() { it.name == "Apple"}
}
}

@Requires(property = 'spec.name', value = 'BatchManualAckSpec')
@KafkaClient
static interface ProductClient {
@Topic(BatchManualAckSpec.TOPIC_SYNC)
void send(Product product)
}

@Requires(property = 'spec.name', value = 'BatchManualAckSpec')
@Singleton
static class ProductListener {

List<Product> products = []

@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED, batch = true)
@Topic(BatchManualAckSpec.TOPIC_SYNC)
void receive(List<Product> products, Acknowledgement acknowledgement) {
for (p in products) {
this.products << p
}
acknowledgement.ack()
}
}

@Serdeable
static class Product {
String name
}
}
23 changes: 18 additions & 5 deletions src/main/docs/guide/kafkaListener/kafkaListenerBatch.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,29 @@ Note in the previous case offsets will automatically be committed for the whole

== Manually Committing Offsets with Batch

As with one by one message processing, if you set the `OffsetStrategy` to api:configuration.kafka.annotation.OffsetStrategy#DISABLED[] it becomes your responsibility to commit offsets.

If you want to commit the entire batch of offsets at once during the course of processing, then the simplest approach is to add an argument of type link:{apimicronaut}messaging/Acknowledgement.html[Acknowledgement] and call the `ack()` method to commit the batch of offsets synchronously:

.Committing a Batch of Offsets Manually with ack()

snippet::io.micronaut.kafka.docs.consumer.batch.ack.BookListener[tags=method, indent=0]

<1> Committing offsets automatically is disabled
<2> The listener method specifies a parameter of type link:{apimicronaut}messaging/Acknowledgement.html[Acknowledgement]
<3> The `ack()` method is called once the records have been processed

You can also take more control of committing offsets when doing batch processing by specifying a method that receives the offsets in addition to the batch:

.Committing Offsets Manually with Batch

snippet::io.micronaut.kafka.docs.consumer.batch.BookListener[tags=manual, indent=0]
snippet::io.micronaut.kafka.docs.consumer.batch.manual.BookListener[tags=method, indent=0]

<1> The method receives the batch of books as a list of consumer records
<2> Each record is processed
<3> The offset, partition and topic is read for the record
<4> Offsets are committed
<1> Committing offsets automatically is disabled
<2> The method receives the batch of books as a list of consumer records
<3> Each record is processed
<4> The offset, partition and topic is read for the record
<5> Offsets are committed

This example is fairly trivial in that it commits offsets after processing each record in a batch, but you can for example commit after processing every 10, or every 100 or whatever makes sense for your application.

Expand Down
4 changes: 2 additions & 2 deletions src/main/docs/guide/kafkaListener/kafkaOffsets.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ If you set the `OffsetStrategy` to api:configuration.kafka.annotation.OffsetStra

There are a couple of ways that can be achieved.

The simplest way is to define an argument of type api:configuration.kafka.Acknowledgement[] and call the `ack()` method to commit offsets synchronously:
The simplest way is to define an argument of type api:configuration.kafka.annotation.KafkaClient.Acknowledge[] and call the `ack()` method to commit offsets synchronously:

.Committing offsets with `ack()`

snippet::io.micronaut.kafka.docs.consumer.offsets.ack.ProductListener[tags="clazz"]

<1> Committing offsets automatically is disabled
<2> The listener method specifies a parameter of type api:configuration.kafka.Acknowledgement[]
<2> The listener method specifies a parameter of type api:configuration.kafka.annotation.KafkaClient.Acknowledge[]
<3> The `ack()` method is called once the record has been processed

Alternatively, you an supply a `KafkaConsumer` method argument and then call `commitSync` (or `commitAsync`) yourself when you are ready to commit offsets:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@ import groovy.util.logging.Slf4j
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
import reactor.core.publisher.Flux
// end::imports[]

Expand Down Expand Up @@ -37,30 +33,6 @@ class BookListener {
)
}
// end::reactive[]

// tag::manual[]
@Topic("all-the-books")
void receive(List<ConsumerRecord<String, Book>> records, Consumer kafkaConsumer) { // <1>

for (int i = 0; i < records.size(); i++) {
ConsumerRecord<String, Book> record = records.get(i) // <2>

// process the book
Book book = record.value()

// commit offsets
String topic = record.topic()
int partition = record.partition()
long offset = record.offset() // <3>

kafkaConsumer.commitSync(Collections.singletonMap( // <4>
new TopicPartition(topic, partition),
new OffsetAndMetadata(offset + 1, "my metadata")
))

}
}
// end::manual[]
//tag::endclazz[]
}
//end::endclazz[]
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.micronaut.kafka.docs.consumer.batch.ack

import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import io.micronaut.kafka.docs.consumer.batch.Book
import io.micronaut.messaging.Acknowledgement

import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST
import static io.micronaut.configuration.kafka.annotation.OffsetStrategy.DISABLED

@Requires(property = 'spec.name', value = 'BatchManualAckSpec')
class BookListener {

// tag::method[]
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED, batch = true) // <1>
@Topic("all-the-books")
void receive(List<Book> books, Acknowledgement acknowledgement) { // <2>

//process the books

acknowledgement.ack() // <3>
}
// end::method[]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.micronaut.kafka.docs.consumer.batch.manual

import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import io.micronaut.kafka.docs.consumer.batch.Book
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition

import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST
import static io.micronaut.configuration.kafka.annotation.OffsetStrategy.DISABLED

@Requires(property = 'spec.name', value = 'BatchManualAckSpec')
class BookListener {

// tag::method[]
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED, batch = true) // <1>
@Topic("all-the-books")
void receive(List<ConsumerRecord<String, Book>> records, Consumer kafkaConsumer) { // <2>

for (int i = 0; i < records.size(); i++) {
ConsumerRecord<String, Book> record = records.get(i) // <3>

// process the book
Book book = record.value()

// commit offsets
String topic = record.topic()
int partition = record.partition()
long offset = record.offset() // <4>

kafkaConsumer.commitSync(Collections.singletonMap( // <5>
new TopicPartition(topic, partition),
new OffsetAndMetadata(offset + 1, "my metadata")
))
}
}
// end::method[]
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@ package io.micronaut.kafka.docs.consumer.batch
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
import org.slf4j.LoggerFactory.getLogger
import reactor.core.publisher.Flux
import java.util.*
Expand Down Expand Up @@ -39,29 +35,6 @@ class BookListener {
}
}
// end::reactive[]

// tag::manual[]
@Topic("all-the-books")
fun receive(records: List<ConsumerRecord<String?, Book?>>, kafkaConsumer: Consumer<*, *>) { // <1>
for (i in records.indices) {
val record = records[i] // <2>

// process the book
val book = record.value()

// commit offsets
val topic = record.topic()
val partition = record.partition()
val offset = record.offset() // <3>
kafkaConsumer.commitSync(
Collections.singletonMap( // <4>
TopicPartition(topic, partition),
OffsetAndMetadata(offset + 1, "my metadata")
)
)
}
}
// end::manual[]
//tag::endclazz[]
}
//end::endclazz[]
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.micronaut.kafka.docs.consumer.batch.ack

import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetReset
import io.micronaut.configuration.kafka.annotation.OffsetStrategy
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import io.micronaut.kafka.docs.consumer.batch.Book
import io.micronaut.messaging.Acknowledgement

@Requires(property = "spec.name", value = "BatchManualAckSpec")
internal class BookListener {

// tag::method[]
@KafkaListener(offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED, batch = true) // <1>
@Topic("all-the-books")
fun receive(books: List<Book?>?, acknowledgement: Acknowledgement) { // <2>

//process the books

acknowledgement.ack() // <3>
}
// end::method[]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.micronaut.kafka.docs.consumer.batch.manual

// tag::imports[]
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetReset
import io.micronaut.configuration.kafka.annotation.OffsetStrategy
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import io.micronaut.kafka.docs.consumer.batch.Book
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition

import java.util.*
// end::imports[]

@Requires(property = "spec.name", value = "BatchManualAckSpec")
internal class BookListener {

// tag::method[]
@KafkaListener(offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED, batch = true) // <1>
@Topic("all-the-books")
fun receive(records: List<ConsumerRecord<String?, Book?>>, kafkaConsumer: Consumer<*, *>) { // <2>
for (i in records.indices) {
val record = records[i] // <3>

// process the book
val book = record.value()

// commit offsets
val topic = record.topic()
val partition = record.partition()
val offset = record.offset() // <4>
kafkaConsumer.commitSync(
Collections.singletonMap( // <5>
TopicPartition(topic, partition),
OffsetAndMetadata(offset + 1, "my metadata")
)
)
}
}
// end::method[]
}

Loading

0 comments on commit 330d00e

Please sign in to comment.