diff --git a/async_producer.go b/async_producer.go index aced886315..4e870a7c65 100644 --- a/async_producer.go +++ b/async_producer.go @@ -271,10 +271,7 @@ func (p *asyncProducer) partitionDispatcher(topic string, input <-chan *Producer for msg := range input { if msg.retries == 0 { - err := breaker.Run(func() error { - return p.assignPartition(partitioner, msg) - }) - if err != nil { + if err := p.assignPartition(breaker, partitioner, msg); err != nil { p.returnError(msg, err) continue } @@ -636,15 +633,17 @@ func (p *asyncProducer) shutdown() { close(p.successes) } -func (p *asyncProducer) assignPartition(partitioner Partitioner, msg *ProducerMessage) error { +func (p *asyncProducer) assignPartition(breaker *breaker.Breaker, partitioner Partitioner, msg *ProducerMessage) error { var partitions []int32 - var err error - if partitioner.RequiresConsistency() { - partitions, err = p.client.Partitions(msg.Topic) - } else { - partitions, err = p.client.WritablePartitions(msg.Topic) - } + err := breaker.Run(func() (err error) { + if partitioner.RequiresConsistency() { + partitions, err = p.client.Partitions(msg.Topic) + } else { + partitions, err = p.client.WritablePartitions(msg.Topic) + } + return + }) if err != nil { return err