Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible record loss on unclean kill of consumer when using automatic offset commit #57

Closed
finncolman opened this issue Aug 6, 2021 · 22 comments

Comments

@finncolman
Copy link

Because the offset commit and consumption are done independently it is possible to have record loss on the consumer side if the consumer process has a kill -9 applied to it mid loop.
You can reproduce this by producing 1000 records, then run up a consumer and sleep at the 500 record mark. Then issue a kill -9 on the consumer process. When the consumer starts back up the full 1000 records sometimes don't all get consumed.
This is presumably because the offsets for these records have already been committed.
When you run the Go confluent lib this does not happen because the offset commit occurs inside the Poll call itself, so there is no message loss.

@twmb
Copy link
Owner

twmb commented Aug 6, 2021

This is known, it is in the documentation for ConsumerGroup:

"""
Note that when group consuming, the default is to autocommit every 5s. Autocommitting risks losing data if your applications crashes after autocommitting but before you have processed polled records. To ensure that you lose absolutely no data, you can disable autocommitting and manually commit, like so:
"""

Am I reading it correctly that Confluent's library commits offsets that were previously polled on the next commit? This could be an option inside of kgo itself.

@finncolman
Copy link
Author

Yes, I think how you have described it is how both librdkafka and the Java libraries work. As long as you don't run up a new go routine inside the Poll loop you are guaranteed to not lose messages even with auto commit turned on. The only way this is possible is if it is committing the topic offset after the work in the loop body has finished.

@twmb
Copy link
Owner

twmb commented Aug 9, 2021

Both librdkafka and the Java client have identical autocommit semantics:

  • Article about how Java semantics can lead to loss
  • librdkafka doc about this same behavior

I'm not sure how the confluent Go lib is using librdkafka, because I feel like they'd implement this the same way as their other clients.

I could add an option to issue a blocking commit every poll, but this inherently increases latency by having the commit be a part of the message consumption path, rather than asynchronous. Alternatively, I could add an option such that only what was previously polled is a candidate for autocommitting, which will keep things aynchronous but does mean that users absolutely must close the client correctly on shutdown (and I'd have to have a blocking commit in Close if this option were enabled).

What do you think?

@finncolman
Copy link
Author

I think that first article is either wrong or out of date.
The Java client definitely commits offsets during the Poll call, not in a background thread.
This confluent article talks about delivery guarantees:
https://docs.confluent.io/platform/current/clients/consumer.html:

By default, the consumer is configured to auto-commit offsets. Using auto-commit gives you “at least once” delivery: Kafka guarantees that no messages will be missed, but duplicates are possible. Auto-commit basically works as a cron with a period set through the auto.commit.interval.ms configuration property. If the consumer crashes, then after a restart or a rebalance, the position of all partitions owned by the crashed consumer will be reset to the last committed offset. When this happens, the last committed position may be as old as the auto-commit interval itself. Any messages which have arrived since the last commit will have to be read again.

I tested the go confluent lib by sleeping mid loop for more than 5 seconds which would have been enough time for a background thread to commit offsets (the default commit is every 5 seconds), but there were no lost messages.

If you look at that librdkafka doc it also says:

Offsets to be committed are kept in a local in-memory offset store, this offset store is updated by consumer_poll() (et.al) to store the offset of the last message passed to the application (per topic+partition).

So the offsets are only actually updated inside the Poll call, not in a completely background way.

I don't know what the best way to implement this is, but I think something that replicates how the Java consumer works would be nice.

@finncolman
Copy link
Author

I did a bit of research. I think when the Java consumer calls Poll it uses an asynchronous commit. Synchronous commits are only used on shutdown or consumer rebalance events.

@twmb
Copy link
Owner

twmb commented Aug 10, 2021

Interesting, ok, thanks for looking!

That librdkafka bit somewhat still reads as if what it has just returned is a candidate for committing. But, librdkafka only returns one message at a time from poll? It's been a bit since I looked into it.

I think I'll change the default in kgo to be ~roughly similarish: any Poll will allow the the prior return from Poll to be autocommitted. So, if I poll once, nothing can be autocommitted. If I poll twice, the first poll can be autocommitted. The autocommitting will still run on a completely asynchronous background goroutine. This scheme will require a clean shutdown, too, and the clean shutdown will force one last blocking commit.

What do you think of this proposal? I may be able to work it in this week, but likely next week because I'm currently looking into one other quite odd issue.

@finncolman
Copy link
Author

After much looking at the C source code for librdkafka and running my own experiments with the Go confluent lib I now understand that it works like this:

There are actually two concepts of the offset in librdkafka. One is controlled by an in memory store of the offset. This offset is updated straight after message fetch for each message in the Poll call. The Poll call gets one message at a time.

There is also the offset persisted to the actual __consumer_offsets topic in Kafka. This only gets written to periodically in a background thread, by default every 5 seconds.

I found in my testing that it is indeed possible to lose a single message on the consumer side if the consumer process dies before the message has been fully processed. This is because the in memory offset store gets updated as soon as the next message gets pulled back in the Poll call.

There is a way to prevent this behaviour though. You can set "enable.auto.offset.store" to false and "enable.auto.commit" to true. Then in your Poll loop after the consumer has processed the message it can manually call the StoreOffsets method. This will update the offset stored in memory but will not actually commit the offset to Kafka.

Then after the interval has finished the updated offset will actually get committed to the __consumer_offsets topic in Kafka in a background thread.

With this strategy you are guaranteed at least once semantics because the in memory store that eventually gets saved to Kafka only gets updated after the consumer has processed the message. Also because the store call is just in memory it doesn’t have a big performance impact either.

This is not documented at all in the Go confluent lib documentation.

I haven’t personally used the Java library so I am not totally sure how that works, but I believe it operates in a single thread so there is more control over when the records get committed.

@twmb
Copy link
Owner

twmb commented Aug 11, 2021

Good investigating! So it sounds like librdkafka by default has this same problem, but the problem is so scoped (1 record) that it's hard to notice.

I think I'd like to switch to the model that franz-go will allow everything previously polled to be committed, and to have one blocking commit on close. If a user wants to opt into the previous (current) behavior, they can use a new option, GreedyAutoCommit.

Not sure how long this will take to implement when I'm able to fit it in.

@finncolman
Copy link
Author

finncolman commented Aug 11, 2021

OK, that sounds good. I also looked into Sarama a little and I think it uses a similar type of behaviour to the Go confluent lib where it has an in memory store (map actually) that you can call after each record process and then the commit or auto commit flushes that to Kafka itself.
The one blocking commit on close that you suggested is also how the Confluent Go lib works, so I think people will be expecting that.

@twmb
Copy link
Owner

twmb commented Aug 12, 2021

I've remembered that one reason I did not pursue this in the past is because if a rebalance happens, nothing with autocommit will block the rebalance. The client then has two options:

  • discard any in-memory uncommitted offsets.
  • allow the client to commit offsets that have potentially moved to another group.
    With either of these options, the only real solution from the client is to strongly document that a user should have an OnRebalance and should have it stop their processing loop / commit within it. Otherwise, a user is inevitably going to have dups. What do you think of this?

The current way in the client of having the callbacks be completely independent of the poll loop is more accurate to how heartbeating / rebalancing should work to avoid being booted from a consumer group, but it does punt complexity to the app user.

@finncolman
Copy link
Author

finncolman commented Aug 16, 2021

I think it should be possible to make the user experience a bit easier than this.
I am not sure exactly how this should work. I did find some interesting code in librdkafka related to this though.
https://github.com/edenhill/librdkafka/blob/d2bc7490e4ff34daf208f5a3c3dc8c03c41572af/src/rdkafka_cgrp.c#L3360

In the timer code to do the auto commit it doesn't run it if the consumer group is in a rebalance state:
/* Don't attempt auto commit when rebalancing or initializing since * the rkcg_generation_id is most likely in flux. */ if (rkcg->rkcg_subscription && rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_STEADY) return;

There also seems to be some concept of pausing while a rebalance is in effect:
https://github.com/edenhill/librdkafka/blob/d2bc7490e4ff34daf208f5a3c3dc8c03c41572af/src/rdkafka_cgrp.c#L1003

I think librdkafka also does a final commit of offsets for the removed partitions:
https://github.com/edenhill/librdkafka/blob/d2bc7490e4ff34daf208f5a3c3dc8c03c41572af/src/rdkafka_assignment.c#L386

The flowchart in https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md also makes it sound like it does a final offsets commit after a revoke on a rebalance.

@twmb twmb mentioned this issue Aug 17, 2021
@vtolstov
Copy link
Contributor

so official kafka client commit offset for partitions that it already not handling? does it possible that other consumer in group processing offsets from such partitions and commiting offsets too ?

@twmb
Copy link
Owner

twmb commented Aug 20, 2021

The kgo client already blocks autocommitting during rebalance, the main problem is what was uncommitted going into the rebalance. I think librdkafka gets away with this because it at most only have one message outstanding at a time: if a partition is moved, at most one message could be reconsumed. With the kgo client, the client returns a huge batch of message from polling, so everything in this batch could be doubly consumed if all of the partitions move around during the rebalance.

I think per my previous message, I'm going to go with the first option,

  • discard any in-memory uncommitted offsets.

The client can knife out any uncommitted offsets for partitions that were revoked. These partitions will have doubly consumed offsets, but that's effectively fine. I think this also puts things in line with the Java consume behavior (if it only allows committing for stuff that was previously polled), and puts things in line with librdkafka when manually using the offset store approach.

@vtolstov
Copy link
Contributor

never kafka versions supports sticky partitions , so if clients does not die after rebalance it have all previous partitions.
if kgo can chech partitions after rebalance and drop offsets only for lost partitions i think that this will be good

@vtolstov
Copy link
Contributor

p.s. i'm write github.com/unistack-org/micro broker plugin that uses this package and after i'm finish i want to compare performance with segmentio/kafka-go package =)

@twmb
Copy link
Owner

twmb commented Aug 20, 2021

The current OnRevoke is only passed partitions that are lost (kgo supports sticky rebalancing), so that is doable.

@vtolstov
Copy link
Contributor

cool

@finncolman
Copy link
Author

Thanks for all your hard work on this

@twmb
Copy link
Owner

twmb commented Aug 22, 2021

I haven't worked on implementing the fix to this! Above was planning the fix. I need to implement the fix as proposed in #57 (comment)

@twmb twmb reopened this Aug 22, 2021
@twmb twmb closed this as completed in 28bba43 Aug 25, 2021
@twmb
Copy link
Owner

twmb commented Aug 25, 2021

Theoretically this has been addressed.

@twmb
Copy link
Owner

twmb commented Aug 26, 2021

The current fix works but could be better. Offsets are only updated for new partitions returned from polling. The behavior needs to change such that all dirty offsets are available to commit at the start of polling.

twmb added a commit that referenced this issue Aug 26, 2021
This addresses the other half of #57. The first half ensured that we
only commit was was previously polled, this second half ensures that
when we *enter* poll, dirty offsets can be committed, not only when we
exit.
@twmb
Copy link
Owner

twmb commented Aug 26, 2021

The latest commit ensures that when we enter polling, the previous poll is available to be committed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants