-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Timeout issue in consumer on broker death #546
Comments
Can you set My best guess is this:
So basically, Sarama will continue to retry using the same broker for a partition until the leader of that partition has been updated in Zookeeper. When a kafka broker doesn't shut down properly, it doesn't have a chance to hand off leadership to another one, and the handoff is determined by the connection timeout of the Kafka broker to Zookeeper. You can configure it in the Kafka broker configuration file. |
Note things recovered around 20:03:25. The following is the sarama logging output. This part of the log goes from 19:57:03 to 20:07:01. [Sarama] 2015/09/28 19:59:43 consumer/broker/1 disconnecting due to error processing FetchRequest: read tcp [DEAD BROKER IP]:9092: i/o timeout |
I assume you've left I notice you have set |
Yes, it should still be 2 seconds. For handling Errors, we have a select in a loop that in the error case logs the error and does a few other things for certain errors (offset out of range, channel closed) that shouldn't apply here. In any case it should be fast. |
This is the broker-consumer disconnecting after the first i/o timeout. The consumer then calls
This is the broker connection finally being closed, three and a half minutes later. This unblocks the consumer, permitting it to check its metadata and reassign the partitions to other brokers. So the question here is: why is closing the broker taking 3 1/2 minutes? The
So potentially there were a lot of other pending requests that needed to time out? They wouldn't be coming from this consumer, but if another consumer or producer or application was sharing the same broker reference then it could be clogging the pipe. |
The other things going on should be
These following ones are called on the broker directly. I'm not sure what timeouts they would have. Would they also just be 30s?
There aren't any producers involved. By blocking the pipe, do you mean hitting the Net.MaxOpenRequests limit? |
The read/write/connect timeouts all default to 30 seconds, affect all request/response types, and are set in the Are the 25 consumers and other calls all sharing a single
Basically. If that is set to 5, then up to 5 requests can be open-waiting-for-a-response. Thus when |
Yes, they all share the same client. Oh, I did not know that they were all serial. So the ones in the pipeline would mostly hit the connect timeout in that case right? So that could explain things. Do you recommend dropping MaxOpenRequests to 1 or just using many clients or lowering the connect timeout? (We should be able to guarantee much lower connect latency) Is this something that you might try to improve the situation of or should we just use one of the above workarounds? |
Yes, within a single connection to a single broker, requests are strictly serial. You can have multiple connections open to the same broker, but you need multiple clients for that. This actually has implications for any request that can be slow. For example, if you are consuming on a low-volume topic with a
Exactly.
Because of the point above about Which brings me to: why do you have 25 consumers in the same program? Multiple topics/partitions can be handled within a single consumer, so unless you're actually consuming the same partition from 25 different offsets simultaneously I'm not sure I see the use case.
Yes, we tend to run with this lowered a bit from the default. It helps of course but is not a final solution, just an optimization. |
Sorry, I mean that we have one consumer with 256 PartitionConsumers attached (of which 25 were talking to the dead broker). It also turns out we have a separate call per partition to fetch offsets for tracking (I did not think that really mattered at the time, oops), and this happens up to every 10 seconds. |
The consumer does the right thing in this case, batching requests together, so it is not the problem.
256 calls every 10 seconds could definitely be causing the queueing. They're fast calls, so in normal operation they probably don't have any effect, but when each one takes 30 seconds to timeout that could cause this. |
Added some custom logging to Sarama and got this, which suggests that grabbing the lock takes a long time. Not sure why that would be though. [Sarama] 2015/09/30 17:32:11 consumer/broker/7 disconnecting due to error processing FetchRequest: read tcp 10.32.87.43:9092: i/o timeout |
Oh of course, because the broker holds the lock while it writes into the response channel. So if the If you're able to easily test this, I'd appreciate knowing if #548 makes it any better? |
After two trials with #548, it looks the connection gets closed almost immediately (within the same second) and time to recover is <2 minutes. So the patch is really helpful. Perhaps a 1-2 minute recovery time is to be expected, and it should be good enough for us (though I'll poke around a bit more to see what exactly is going on). |
Looking at a specific incident with the new code: The controller logs first mention at 18:33:44: INFO controller.KafkaController: [Controller 6]: Broker failure callback for 0 Sarama detected the timeout at 18:34:06 and then quickly closed the connection to the dead broker. Later it had these (not the full Sarama logs) It looks like finding a new broker didn't work for a while, even after the controller gave out all the partition assignments. Though it's definitely possible that kafka brokers are returning stale data and the controller logs are misleading. Anyways it would be nice to reduce recovery time further, but #548 is really helpful and should be good enough for our use case. |
#548 is in master and will be in the next release. When a consumer partition repeatedly tries to find a new broker, that means it can't fetch new metadata either because the If you want to dig further, I'd look at what data those methods are getting back from the cluster, and what's actually in zookeeper during this window. |
Thanks for all of your help! |
We are currently running release 1.6.1. During testing, we killed a broker by ipmi_rebooting it (so that it wouldn't be able to close the connection). This caused consumers that were reading from partitions for which that was the leader to get stuck for 3-4 minutes, at the end of which there was this message repeated once per partition
read tcp :9092: i/o timeout
and after that things recovered.
Based on the fact that the read timeout is only 30 seconds, it seems like it shouldn't take 3-4 minutes for things to get unstuck. Here are our settings:
kafkaConfig.ClientID = config.ClientID
kafkaConfig.Consumer.Fetch.Default = 16 * 1024 * 1024
kafkaConfig.Consumer.Fetch.Min = 8 * 1024 * 1024
kafkaConfig.Consumer.Fetch.Max = 64 * 1024 * 1024
kafkaConfig.ChannelBufferSize = 500
kafkaConfig.Consumer.Return.Errors = true
kafkaConfig.Metadata.Retry.Max = 10
kafkaConfig.Metadata.Retry.Backoff = 5 * time.Second
Thanks for any insight you can provide!
The text was updated successfully, but these errors were encountered: