Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

manual commits only after successful message production #300

Closed
3 of 7 tasks
natb1 opened this issue Jan 17, 2018 · 5 comments
Closed
3 of 7 tasks

manual commits only after successful message production #300

natb1 opened this issue Jan 17, 2018 · 5 comments
Labels

Comments

@natb1
Copy link

natb1 commented Jan 17, 2018

Description

I expect a relatively common use case for Kafka is to consume a message, process it, and publish the results to a different topic (let’s say, one published message for every one message consumed). I am wondering if I am overlooking “the easy way” to use manual commits to do this while guaranteeing at least once processing/publishing of every message consumed.

The default auto-commit strategy doesn’t seem to satisfy this guarantee.The solution also seems more nuanced than manually committing the relevant consumer message offset in the producer’s delivery report callback. Both of those strategies seem like they would violate the at least once processing/publishing guarantee if I:

  • asynchronously publish message a associated with consumer offset x
  • asynchronously publish message b associated with consumer offset x + 1
  • commit offset x + 1 (at some interval, or on successful delivery report for b) which also effectively commits offset x
  • get a failed delivery report for a

Given the assumptions above, the logic required to meet this guarantee seems tricky for what I expect is a typical use case. Currently, I have logic in the delivery report callback that tracks pending commits for each TopicPartition and only finalizes a commit when there are no smaller offsets still outstanding.

Am I overlooking something? Maybe librdkafka has the necessary intelligence in its implementation of commit() without my help (though I could not find evidence of it after a quick search), or maybe this is handled by Kafka’s offset tracking?

>>> confluent_kafka.version()
('0.11.0', 720896)
>>> confluent_kafka.libversion()
('0.11.0', 721151)

Broker version: 0.11.0
Consumer configuration:

{
    ...
    'enable.auto.commit': False
}

How to reproduce

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
  • Apache Kafka broker version:
  • Client configuration: {...}
  • Operating system:
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue
@edenhill
Copy link
Contributor

You are right that the default auto commit kicks in before passing the message to the application rather than when the application is done processing the message.
But there is a way around that without having to do manual commits.

There are actually two layers of offset commit, first there is the store_offsets() that stores (in client memory) the offset to be committed, then there's the actual commit which commits the stored offset. store_offsets() is run automatically for each message just prior to passing the message to the application, which breaks your at-least-once guarantee.

So, what you do is disable the automatic offset store by setting enable.auto.offset.store=false and then manually call store_offsets() after full processing and delivery of a message has succeeded.
Since the producer may deliver messages out of order (when using multiple partitions) you might want to make sure you don't store an older offset than what you've already stored, to avoid commits going backwards (you need to keep track of last stored offset per partition yourself).

@natb1
Copy link
Author

natb1 commented Jan 17, 2018

Thanks for the additional context. I think I understand, but I want to be sure I get how this ensures we don’t store/auto-commit an offset, then later find out that the delivery of a lower offset failed.

Is the basic idea that if producer delivery is going to fail, we can assume it will fail in some bounded amount of time. So, if we use store_offsets() to simply start the auto-commit timer later (after processing, at producer delivery request time) then that will give us time to bail out before the auto-commit if we see a delivery error?

@edenhill
Copy link
Contributor

A produce()d message is guaranteed to succeed or fail within message.timeout.ms (+ a bit of grace time).

The case of reordering only exists if input and output partitions don't use the same partitioning schema, which they should do if the key remains the same.

So I think you want to start out testing with something like:

  • set enable.auto.offset.store=false on the consumer
  • read message from consumer, do processing.
  • pass message to produce(), passing the original consume partition and offset to the on delivery callback.
  • when the on delivery callback fires: if delivery succeeded (no error) then call store_offsets() for the consumed message's partition and offset. But make sure you store offsets sequencially according to the consumed input, not based on delivery report. So you might need a separate layer that maps consumed&processed message offsets to delivered offsets, and only when you have a consecutive range of at least one delivered message you store that offset.
  • if delivery fails, do custom logic - ignore, retry produce, ..

@natb1
Copy link
Author

natb1 commented Jan 17, 2018

got it. thanks a bunch

@ibalashov
Copy link

@edenhill Thanks for the detailed recipe!
Did anything in this process become any simpler in the past two years, maybe due to some recent developments in Kafka? 🙄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants