Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: consumer option to balance partition lags #222

Closed
erenboz opened this issue Oct 14, 2022 · 6 comments · Fixed by #251
Closed

feature: consumer option to balance partition lags #222

erenboz opened this issue Oct 14, 2022 · 6 comments · Fixed by #251
Labels
enhancement New feature or request help wanted

Comments

@erenboz
Copy link

erenboz commented Oct 14, 2022

In one of our use-cases we had significantly different production/consumption rate on different partitions and ended up writing a logic of pausing unpausing fetched partitions to balance lag. It would be quite handy to add a consumer option to influence how many records pulled from each partitions or directly option to pull proportionately to the lag.

@erenboz erenboz changed the title Consumer option to balance partition lags feature: Consumer option to balance partition lags Oct 14, 2022
@erenboz erenboz changed the title feature: Consumer option to balance partition lags feature: consumer option to balance partition lags Oct 14, 2022
@twmb
Copy link
Owner

twmb commented Oct 19, 2022

@erenboz that really isn't a thing that belongs in a client. A single client doesn't know anything about lag -- lag is a calculation that is determined from a few individual requests. It isn't up to a client itself to determine its own lag. Manual pausing & unpausing is the best path forward for your problem.

For the "how many records pulled from each partition" problem, I tried designing something like this in the past in #55 (comment). In short, though, it doesn't really make sense and would be pretty difficult to implement. Fetches behind the scenes are per broker. Even if you only drain partition A from the fetch, other partitions will remain in the fetch buffered within the client. The client cannot request another fetch for a broker until the entirety of its current buffered fetch is drained.

There could be some weird hacky option that allows you to control which partitions actually get issued in a request: the client could build candidate partitions, then your function could return partitions to actually issue in the request. I'm not sure this is a great idea ... but it could be feasible if a good API exists. What do you think?

@yuzhichang
Copy link
Contributor

yuzhichang commented Oct 19, 2022

I have opposite need with @erenboz. There are ~1000 Kafka topics, each has several partitions. A few topics' producing throughput are high(~100K records/second), others are not such high.
If I create a kgo client for each topic, these clients compete fetching records, each gets a small batch of records and write to downstream OLAP database(the database prefers infrequent large batch insertion). The total consuming throughput is low(~700K records/second).
If I create only a kgo clients for only five topics, the total consuming throughput is better(~1000K records/second).

If a kgo client subscribe multiple topics, is there a way to prefer a large batch from one topic instead of proportionately small batches from multiple topics?

@twmb twmb removed the help wanted label Oct 19, 2022
@yuzhichang
Copy link
Contributor

yuzhichang commented Oct 21, 2022

@twmb From Kafka code here, the client know the lag of every topic and partition, and has the chance to adjust preference(default, proportionately, or large lag) of later FetchRequest?

@twmb
Copy link
Owner

twmb commented Oct 23, 2022

My mistake, you're right, the client is given that information. I never historically used that field because the client hasn't needed to know lag. I'll try to think of some method of calculating how to prefer lag while also avoiding partition starvation.

twmb added a commit that referenced this issue Nov 14, 2022
This adds three new APIs:

func ConsumePreferringLagFn(fn PreferLagFn) ConsumerOpt
type PreferLagFn func(lag map[string]map[int32]int64, torderPrior []string, porderPrior map[string][]int32) ([]string, map[string][]int32)
func PreferLagAt(preferLagAt int64) PreferLagFn

These functions allow an end user to adjust the order of partitions that
are being fetched. Ideally, an end user will only need:

    kgo.ConsumePreferringLagFn(kgo.PreferLagAt(50))

But, PreferLagFn exists to allow for more advanced use cases.

Closes #222
@twmb
Copy link
Owner

twmb commented Nov 14, 2022

PR #251 should close this, feel free to take a look / check it out if you want before it is merged for release 1.10

@twmb twmb closed this as completed in #251 Nov 15, 2022
@erenboz
Copy link
Author

erenboz commented Dec 8, 2022

@twmb @yuzhichang Oh I'm very late to the party cause I completely missed the mention notification. The solution looks great solving many similar issues. Thanks a lot.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request help wanted
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants