-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consumer: reduce ticker allocations #1028
Conversation
When the consumer is idle you should only be processing maybe one or two responses per second, so there should be no noticeable cost here. What is your The change is still OK, but I'm curious if it might make things worse for high-volume consumers? My understanding is that |
I use the The |
I don't know a lot about sarama-cluster, but I would think you'd only need one per process?
That is very high for idle. How many brokers are in your kafka cluster? |
In my system, topic has priority, I use multiple instances and control the timing of consumer messages. I'll verify the count of the fetch requests next Monday. |
The |
No, |
One approach that might be more efficient (since it wouldn't require tracking |
d7428ad
to
733cd80
Compare
yeah, it's a good solution. 😁 |
I add a counter in the And I found this consumer.go#L727, when broker fetch an response broker will dispatch the response to every So it looks like there's no problem here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consumer.go
Outdated
@@ -441,20 +441,20 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 { | |||
|
|||
func (child *partitionConsumer) responseFeeder() { | |||
var msgs []*ConsumerMessage | |||
msgSent := false | |||
var firstAttempt bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this needs to be initialized to true
now
Actually, while the leak have been fixed, I meant to ask you what was really the purpose of this ticker. Upon trigger, it does nothing apart from a |
The first time it ticks it simply sets a flag and repeats the select statement. The second time it ticks it fully detaches the partition from the consumer in order to unblock the remaining partitions. This is important so that a slow message consumer doesn't drag the entire process down. |
after this pr merged the line consumer.go#L464 should delete ? |
Yes! I'm not sure how we missed that. |
Now that we reuse a single ticker the whole time, this would probably lead to the consumer just kind of hanging and never resuming. Thanks to #1028 (comment) for catching this.
Now that we reuse a single ticker the whole time, this would probably lead to the consumer just kind of hanging and never resuming. Thanks to #1028 (comment) for catching this.
A ticker is created after each response, even when there is no messages in the response, so when the consumer is idle the CPU is still high. The cost of ticker creation and destruction is expensive, when the number of fetch requests goes up, both the CPU and memory are consumed by the tickers.
So I try to reuse the ticker, and did not change the existing behavior. Now the timeout is still low precision (between
MaxProcessingTime
and2 * MaxProcessingTime
).Before:
After: