Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

receiveAutoAck commits the batch even in case of exception (or Error) while processing #261

Closed
serejke opened this issue Jan 12, 2022 · 7 comments · Fixed by #359
Closed
Labels
type/enhancement A general enhancement
Milestone

Comments

@serejke
Copy link

serejke commented Jan 12, 2022

receiveAutoAck returns a Flux<Flux<Record>> for batches of received records. The internal flux gets terminated (even in case of exception) and commits the batch, because of doOnTerminate callback.

I might be missing something, but I don't see a way to avoid committing the batch if a crash occurs (such as OutOfMemory).

I've prepared a test case

receiveAutoAck is said to be suitable for at-least-once delivery of messages, but in fact the events might be committed before they are processed.

@serejke serejke changed the title receiveAutoAck commits the batch even in case of exception (or even an Error) while processing receiveAutoAck commits the batch even in case of exception (or Error) while processing Jan 12, 2022
@rancherz
Copy link

Seems pretty serious

@rancherz
Copy link

rancherz commented Jan 12, 2022

Is there any fix you can think of?

@serejke
Copy link
Author

serejke commented Jan 12, 2022

@rancherz the simplest solution would be replacing of Flux.doAfterTerminate with Flux.doOnComplete to only commit if the flux has completed successfully.

It might be not expected for existing clients, so there may be a configuration parameter added to the ReceiverOptions. Maybe commitBatchOnError: Boolean with default value of true. Documentation/reference and javadocs should be amended with a warning message.

@garyrussell
Copy link
Contributor

I see the issue, but the behavior is consistent with the javadocs and reference manual:

All the records in a batch are acknowledged automatically when its Flux terminates.

So I am not sure there is anything to be done here.

This is similar (not identical) to the Kafka.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG where the Kafka clients commits the offsets in the background (when true) regardless of processing activity.

For more control over commits, you can use the receive() method instead.

If you feel a change is necessary, I would concur that it would need to be optional; contributions are welcome.

@garyrussell garyrussell added ❓need-triage This issue needs triage, hasn't been looked at by a team member yet for/user-attention This issue needs user attention (feedback, rework, etc...) labels Jan 12, 2022
serejke added a commit to rarible/service-core that referenced this issue Jan 12, 2022
@serejke
Copy link
Author

serejke commented Jan 12, 2022

@garyrussell to me as a user of this library's API this was such an unclear moment that I filed this issue. Apparently, Javadoc or the reference might be improved in this regard.

I don't have spare time at the moment. I choose to stay with the single manual acknowledge receive function.

What I suggest as a contribution is to add a new API function
default Flux<Flux<ConsumerRecord<K, V>>> receiveBatch() {} with a semantic similar to receive where the user has to acknowledge the ConsumerRecord-s after processing, WDYT?

@garyrussell garyrussell added type/enhancement A general enhancement and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet for/user-attention This issue needs user attention (feedback, rework, etc...) labels Jan 12, 2022
@garyrussell garyrussell added this to the 1.3.x Backlog milestone Jan 12, 2022
@kharole
Copy link

kharole commented Jul 27, 2023

but the behavior is consistent with the javadocs and reference manual:

I would say that "can be used for at-least-once delivery of messages" is really misleading. This misunderstanding cost me hours of lost updates hunting. All @serejke solutions look reasonable.

@awattez
Copy link

awattez commented Jul 27, 2023

I plan to do a PR on this subject during October

mohamed-gara pushed a commit to mohamed-gara/reactor-kafka that referenced this issue Aug 31, 2023
mohamed-gara pushed a commit to mohamed-gara/reactor-kafka that referenced this issue Aug 31, 2023
@artembilan artembilan modified the milestones: 1.3.x Backlog, 1.3.21 Aug 31, 2023
mohamed-gara pushed a commit to mohamed-gara/reactor-kafka that referenced this issue Aug 31, 2023
mohamed-gara added a commit to mohamed-gara/reactor-kafka that referenced this issue Aug 31, 2023
mohamed-gara added a commit to mohamed-gara/reactor-kafka that referenced this issue Aug 31, 2023
mohamed-gara added a commit to mohamed-gara/reactor-kafka that referenced this issue Sep 7, 2023
mohamed-gara added a commit to mohamed-gara/reactor-kafka that referenced this issue Sep 7, 2023
mohamed-gara added a commit to mohamed-gara/reactor-kafka that referenced this issue Sep 7, 2023
mohamed-gara added a commit to mohamed-gara/reactor-kafka that referenced this issue Sep 7, 2023
garyrussell pushed a commit that referenced this issue Sep 11, 2023
* Upgrade confluentinc/cp-kafka to support running tests on arm

* Add receiveBatch in KafkaReceiver (#261)

* Add receiveBatch documentation (#261)

* Fix receiveAutoAck documentation (#261)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement A general enhancement
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants