From 4b2461f87a5d44262687032be19e0f83f1380394 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Thu, 19 Mar 2015 20:32:33 +0000 Subject: [PATCH] producer: wrap partitioning in a circuit-breaker This prevents repeatedly broken topics (e.g. ones that don't even exist) from choking throughput for other topics. --- CHANGELOG.md | 5 +++++ async_producer.go | 5 ++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 092623e6f..a34c066ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,11 @@ #### Unreleased +Improvements: + - Wrap the producer's partitioner call in a circuit-breaker so that repeatedly + broken topics don't choke throughput + ([#373](https://github.com/Shopify/sarama/pull/373)). + Bug Fixes: - Fix the producer's internal reference counting in certain unusual scenarios ([#367](https://github.com/Shopify/sarama/pull/367)). diff --git a/async_producer.go b/async_producer.go index f776853f4..374bce2eb 100644 --- a/async_producer.go +++ b/async_producer.go @@ -265,10 +265,13 @@ func (p *asyncProducer) topicDispatcher() { func (p *asyncProducer) partitionDispatcher(topic string, input chan *ProducerMessage) { handlers := make(map[int32]chan *ProducerMessage) partitioner := p.conf.Producer.Partitioner(topic) + breaker := breaker.New(3, 1, 10*time.Second) for msg := range input { if msg.retries == 0 { - err := p.assignPartition(partitioner, msg) + err := breaker.Run(func() error { + return p.assignPartition(partitioner, msg) + }) if err != nil { p.returnError(msg, err) continue