Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Samir/block rebalance on poll #155

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

samirketema
Copy link
Contributor

Description

Experimenting with BlockRebalanceOnPoll franz consumer config

Fixes # (issue)

Quick checks:

  • I have followed the Code Guidelines.
  • There is no other pull request for the same update/change.
  • I have written unit tests.
  • I have made sure that the PR is of reasonable size and can be easily reviewed.

Comment on lines +84 to +87
for c.acker.curBatchIndex != 0 {
sdk.Logger(ctx).Warn().Msgf("partitions revoked or lost, waiting until current batch of records are acked and committed...")
time.Sleep(50 * time.Millisecond)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One concern is that we want to make sure that the current batch of records were acked, so we can go ahead and commit them, then continue processing - this currently loops infinitely, as some records aren't acked. Need to continue debugging.

In general, this is aimed to help prevent duplicates. But if we are okay with at-least-once processing, then, we don't need to implement OnPartitionsRevoked / OnPartitionsLost: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#Client.CommitRecords

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant