-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Problem with producing messages to topic with unavailable partitions #213
Comments
It should recover eventually and start sending messages only on the available partitions. It would recover faster if you reduce your What happens if you reduce the timeouts, or just let it run for longer? |
I have about 100k - 200k messages per second to my cluster. Even setting 3x250ms for retries doesn't seem to be good. I hope that you can just mark partitions as unavailable and fail all messages assigned to them or just reassign messages to available partitions. Now I see that you are probably doing this in #215. Tell me when it's ready to test :) |
You can try the leader-breaker branch now: https://github.com/Shopify/sarama/tree/leader-breaker #215 is the first part, which tweaks behaviour so we stop sending new messages to unavailable partitions. The leader-breaker branch is additional work on top to fail messages quickly when a partition goes down. Once #215 is merged it will have to be rebased on master and then properly vetted. Right now it hardcodes failing-fast for 10 seconds in between retries though I'm open to making that configurable if necessary. P.S. You can also play with |
I test current leader-breaker branch. It looks much better. But I think that there are still some issues. I get a lot of |
It should stop producing to a partition once Kafka returns |
I get 'kafka server: In the middle of a leadership election, there is currently no leader for this partition and hence it is unavailable for writes.' so it looks like Kafka returns I look at RoundRobinPartitioner code and I don't know if I understand it correctly. You pick partition number from 0 to numPartitions-1. What if partition number 2 (of 10) is not available? |
Ok, I see that you select partition from |
It's strange... I have 10 partitions. When I shut down 2 brokers, partition 4 and 8 become unavailable. But in error channel I see @eapache do you have any idea? |
I pushed a potential fix (f1a6b99) to the leader-breaker branch. I suspect what was happening was that partition 9, while still available, was changing leader (i.e. one of the two brokers you shut down was the old leader of 9). When this happened it was refreshing its metadata, and because of a case I missed the error for the other partitions was trickling up, causing its circuit to open. |
Some more info. Here is my topic description:
When I kill broker 1 and 2, I lose partition 4 and 8 and get Lost partitions (4, 8 or 3, 7) are correctly recognized and are not on the WriteablePartitions list. Kafka topic description also correctly marks unavailable partitions - Leader: -1. |
Ok, I can confirm that f1a6b99 fixes the problem. Almost... because when I kill a broker which is also a controller in a cluster, there are still some problems. If controller was our last replica for a partition, then this partition is not recognized as unavailable. This could be a Kafka bug, because description for that partition still contains that broker as available broker, even after several minutes. I am going to test this with Kafka 0.8.2-beta... |
Kafka 0.8.2-beta looks good. Killing a controller is handled properly there. So I must say that everything now works really nice. Thanks! :) I have a question. I want to resend failed messages from error channel. I have a separate goroutine to read from that channel. Is it safe to just construct a new MessageToSend and send to Input() channel? What about deadlocks? |
- As I discovered while investigating additional issues from #213, I made a stupid in 1b465e7 and kept returning an error in the case where we ran out of leader-election retries, despite *explicitly stating in the commit message* that the whole point was to stop doing that. - Simplify and reorganize flow control in several places. - Clean up and improve log messages.
select {
case err := <-Errors():
buffer = append(buffer, err.Msg)
case Input() <- buffer[0]:
buffer = buffer[1:] // pop message from buffer
} (edit: be sure to handle the case when the buffer is empty) |
I have a cluster with 5 brokers and topic with replication factor 2 and 10 partitions.
When I shutdown 2 or 3 brokers, producer has problem to use available partitions only (I am using Round Robin partitioner). I am unable to continuously send new messages to Input() channel. There are only short windows when this is possible (in attached log look for
producing
string).The text was updated successfully, but these errors were encountered: