Skip to content

Commit

Permalink
Add receiveBatch in KafkaReceiver (#261)
Browse files Browse the repository at this point in the history
  • Loading branch information
mohamed committed Aug 31, 2023
1 parent 0e4b298 commit 16ae4fd
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 1 deletion.
31 changes: 30 additions & 1 deletion src/main/java/reactor/kafka/receiver/KafkaReceiver.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -97,6 +97,35 @@ default Flux<ReceiverRecord<K, V>> receive() {
return receive(null);
}

/**
* Returns a {@link Flux} containing each batch of consumer records returned by {@link Consumer#poll(long)}.
* The maximum number of records returned in each batch can be configured on {@link ReceiverOptions} by setting
* the consumer property {@link ConsumerConfig#MAX_POLL_RECORDS_CONFIG}. Each batch is returned as one Flux.
* Every record must be acknowledged using ReceiverOffset.acknowledge() in order to commit the offset
* corresponding to the record. Acknowledged records are committed based on the configured commit interval
* and commit batch size in ReceiverOptions. Records may also be committed manually using ReceiverOffset.commit().
*
* @param prefetch amount of prefetched batches
* @return Flux of consumer record batches from Kafka that are committed only after acknowledgement
* @since 1.3.21
*/
Flux<Flux<ReceiverRecord<K, V>>> receiveBatch(Integer prefetch);

/**
* Returns a {@link Flux} containing each batch of consumer records returned by {@link Consumer#poll(long)}.
* The maximum number of records returned in each batch can be configured on {@link ReceiverOptions} by setting
* the consumer property {@link ConsumerConfig#MAX_POLL_RECORDS_CONFIG}. Each batch is returned as one Flux.
* Every record must be acknowledged using ReceiverOffset.acknowledge() in order to commit the offset
* corresponding to the record. Acknowledged records are committed based on the configured commit interval
* and commit batch size in ReceiverOptions. Records may also be committed manually using ReceiverOffset.commit().
*
* @return Flux of consumer record batches from Kafka that are committed only after acknowledgement
* @since 1.3.21
*/
default Flux<Flux<ReceiverRecord<K, V>>> receiveBatch() {
return receiveBatch(null);
}

/**
* Returns a {@link Flux} containing each batch of consumer records returned by {@link Consumer#poll(long)}.
* The maximum number of records returned in each batch can be configured on {@link ReceiverOptions} by setting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,23 @@ public Flux<ReceiverRecord<K, V>> receive(Integer prefetch) {
});
}

@Override
public Flux<Flux<ReceiverRecord<K, V>>> receiveBatch(Integer prefetch) {
return withHandler(AckMode.MANUAL_ACK, (scheduler, handler) -> {
int prefetchCalculated = preparePublishOnQueueSize(prefetch);
return handler
.receive()
.filter(it -> !it.isEmpty())
.publishOn(scheduler, prefetchCalculated)
.map(records -> Flux.fromIterable(records)
.map(record -> new ReceiverRecord<>(
record,
handler.toCommittableOffset(record)
))
);
});
}

@Override
public Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck(Integer prefetch) {
return withHandler(AckMode.AUTO_ACK, (scheduler, handler) -> handler
Expand Down
39 changes: 39 additions & 0 deletions src/test/java/reactor/kafka/receiver/KafkaReceiverTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,45 @@ public void manualCommitSync() throws Exception {
checkCommitCallbacks(commitLatch, committedOffsets);
}

@Test
public void manualCommitBatchSync() throws Exception {
int count = 10;
CountDownLatch commitLatch = new CountDownLatch(count);
long[] committedOffsets = new long[partitions];
for (int i = 0; i < committedOffsets.length; i++)
committedOffsets[i] = 0;
receiverOptions = receiverOptions.commitInterval(Duration.ZERO).commitBatchSize(0);
KafkaReceiver<Integer, String> receiver = createReceiver();
Flux<? extends ConsumerRecord<Integer, String>> kafkaFlux = receiver.receiveBatch()
.flatMap(v -> v)
.delayUntil(record -> {
assertEquals(committedOffsets[record.partition()], record.offset());
return record.receiverOffset().commit()
.doOnSuccess(i -> onCommit(record, commitLatch, committedOffsets));
})
.doOnError(e -> log.error("KafkaFlux exception", e));

sendAndWaitForMessages(kafkaFlux, count);
checkCommitCallbacks(commitLatch, committedOffsets);
}

@Test
public void batchRecordsShouldNotBeAutoCommitted() throws Exception {
receiverOptions = receiverOptions.closeTimeout(Duration.ofMillis(1000))
.commitBatchSize(10)
.commitInterval(Duration.ofMillis(10))
.consumerProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaReceiver<Integer, String> receiver = createReceiver();
Flux<ReceiverRecord<Integer, String>> firstReceiveBatch = receiver.receiveBatch().flatMap(v -> v);
sendReceive(firstReceiveBatch, 0, 100, 0, 100);

// Check that close commits ack'ed records, does not commit un-ack'ed records
cancelSubscriptions(true);
clearReceivedMessages();
Flux<? extends ConsumerRecord<Integer, String>> secondReceiveBatch = createReceiver().receiveBatch().flatMap(r -> r);
sendReceive(secondReceiveBatch, 100, 100, 0, 200);
}

@Test
public void manualCommitSyncNoPoll() throws Exception {
CountDownLatch commitLatch = new CountDownLatch(1);
Expand Down

0 comments on commit 16ae4fd

Please sign in to comment.