diff --git a/CHANGELOG.md b/CHANGELOG.md index 092623e6f..a34c066ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,11 @@ #### Unreleased +Improvements: + - Wrap the producer's partitioner call in a circuit-breaker so that repeatedly + broken topics don't choke throughput + ([#373](https://github.com/Shopify/sarama/pull/373)). + Bug Fixes: - Fix the producer's internal reference counting in certain unusual scenarios ([#367](https://github.com/Shopify/sarama/pull/367)). diff --git a/async_producer.go b/async_producer.go index f776853f4..374bce2eb 100644 --- a/async_producer.go +++ b/async_producer.go @@ -265,10 +265,13 @@ func (p *asyncProducer) topicDispatcher() { func (p *asyncProducer) partitionDispatcher(topic string, input chan *ProducerMessage) { handlers := make(map[int32]chan *ProducerMessage) partitioner := p.conf.Producer.Partitioner(topic) + breaker := breaker.New(3, 1, 10*time.Second) for msg := range input { if msg.retries == 0 { - err := p.assignPartition(partitioner, msg) + err := breaker.Run(func() error { + return p.assignPartition(partitioner, msg) + }) if err != nil { p.returnError(msg, err) continue