diff --git a/src/main/java/reactor/kafka/receiver/KafkaReceiver.java b/src/main/java/reactor/kafka/receiver/KafkaReceiver.java index 447a4d82..6b9d51cb 100644 --- a/src/main/java/reactor/kafka/receiver/KafkaReceiver.java +++ b/src/main/java/reactor/kafka/receiver/KafkaReceiver.java @@ -97,6 +97,35 @@ default Flux> receive() { return receive(null); } + /** + * Returns a {@link Flux} containing each batch of consumer records returned by {@link Consumer#poll(long)}. + * The maximum number of records returned in each batch can be configured on {@link ReceiverOptions} by setting + * the consumer property {@link ConsumerConfig#MAX_POLL_RECORDS_CONFIG}. Each batch is returned as one Flux. + * Every record must be acknowledged using ReceiverOffset.acknowledge() in order to commit the offset + * corresponding to the record. Acknowledged records are committed based on the configured commit interval + * and commit batch size in ReceiverOptions. Records may also be committed manually using ReceiverOffset.commit(). + * + * @param prefetch amount of prefetched batches + * @return Flux of consumer record batches from Kafka that are committed only after acknowledgement + * @since 1.3.? + */ + Flux>> receiveBatch(Integer prefetch); + + /** + * Returns a {@link Flux} containing each batch of consumer records returned by {@link Consumer#poll(long)}. + * The maximum number of records returned in each batch can be configured on {@link ReceiverOptions} by setting + * the consumer property {@link ConsumerConfig#MAX_POLL_RECORDS_CONFIG}. Each batch is returned as one Flux. + * Every record must be acknowledged using ReceiverOffset.acknowledge() in order to commit the offset + * corresponding to the record. Acknowledged records are committed based on the configured commit interval + * and commit batch size in ReceiverOptions. Records may also be committed manually using ReceiverOffset.commit(). + * + * @return Flux of consumer record batches from Kafka that are committed only after acknowledgement + * @since 1.3.? + */ + default Flux>> receiveBatch() { + return receiveBatch(null); + } + /** * Returns a {@link Flux} containing each batch of consumer records returned by {@link Consumer#poll(long)}. * The maximum number of records returned in each batch can be configured on {@link ReceiverOptions} by setting diff --git a/src/main/java/reactor/kafka/receiver/internals/DefaultKafkaReceiver.java b/src/main/java/reactor/kafka/receiver/internals/DefaultKafkaReceiver.java index 891be8d1..3f370608 100644 --- a/src/main/java/reactor/kafka/receiver/internals/DefaultKafkaReceiver.java +++ b/src/main/java/reactor/kafka/receiver/internals/DefaultKafkaReceiver.java @@ -69,6 +69,23 @@ public Flux> receive(Integer prefetch) { }); } + @Override + public Flux>> receiveBatch(Integer prefetch) { + return withHandler(AckMode.MANUAL_ACK, (scheduler, handler) -> { + int prefetchCalculated = preparePublishOnQueueSize(prefetch); + return handler + .receive() + .filter(it -> !it.isEmpty()) + .publishOn(scheduler, prefetchCalculated) + .map(records -> Flux.fromIterable(records) + .map(record -> new ReceiverRecord<>( + record, + handler.toCommittableOffset(record) + )) + ); + }); + } + @Override public Flux>> receiveAutoAck(Integer prefetch) { return withHandler(AckMode.AUTO_ACK, (scheduler, handler) -> handler diff --git a/src/test/java/reactor/kafka/receiver/KafkaReceiverTest.java b/src/test/java/reactor/kafka/receiver/KafkaReceiverTest.java index 1a334047..efa4e1fd 100644 --- a/src/test/java/reactor/kafka/receiver/KafkaReceiverTest.java +++ b/src/test/java/reactor/kafka/receiver/KafkaReceiverTest.java @@ -517,6 +517,45 @@ public void manualCommitSync() throws Exception { checkCommitCallbacks(commitLatch, committedOffsets); } + @Test + public void manualCommitBatchSync() throws Exception { + int count = 10; + CountDownLatch commitLatch = new CountDownLatch(count); + long[] committedOffsets = new long[partitions]; + for (int i = 0; i < committedOffsets.length; i++) + committedOffsets[i] = 0; + receiverOptions = receiverOptions.commitInterval(Duration.ZERO).commitBatchSize(0); + KafkaReceiver receiver = createReceiver(); + Flux> kafkaFlux = receiver.receiveBatch() + .flatMap(v -> v) + .delayUntil(record -> { + assertEquals(committedOffsets[record.partition()], record.offset()); + return record.receiverOffset().commit() + .doOnSuccess(i -> onCommit(record, commitLatch, committedOffsets)); + }) + .doOnError(e -> log.error("KafkaFlux exception", e)); + + sendAndWaitForMessages(kafkaFlux, count); + checkCommitCallbacks(commitLatch, committedOffsets); + } + + @Test + public void batchRecordsShouldNotBeAutoCommitted() throws Exception { + receiverOptions = receiverOptions.closeTimeout(Duration.ofMillis(1000)) + .commitBatchSize(10) + .commitInterval(Duration.ofMillis(10)) + .consumerProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + KafkaReceiver receiver = createReceiver(); + Flux> firstReceiveBatch = receiver.receiveBatch().flatMap(v -> v); + sendReceive(firstReceiveBatch, 0, 100, 0, 100); + + // Check that close commits ack'ed records, does not commit un-ack'ed records + cancelSubscriptions(true); + clearReceivedMessages(); + Flux> secondReceiveBatch = createReceiver().receiveBatch().flatMap(r -> r); + sendReceive(secondReceiveBatch, 100, 100, 0, 200); + } + @Test public void manualCommitSyncNoPoll() throws Exception { CountDownLatch commitLatch = new CountDownLatch(1);