Skip to content

Commit

Permalink
Merge pull request #373 from Shopify/circuitbreak-partitioning
Browse files Browse the repository at this point in the history
producer: wrap partitioning in a circuit-breaker
  • Loading branch information
eapache committed Mar 19, 2015
2 parents 4da772a + 4b2461f commit 5ca5e10
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 1 deletion.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

#### Unreleased

Improvements:
- Wrap the producer's partitioner call in a circuit-breaker so that repeatedly
broken topics don't choke throughput
([#373](https://github.com/Shopify/sarama/pull/373)).

Bug Fixes:
- Fix the producer's internal reference counting in certain unusual scenarios
([#367](https://github.com/Shopify/sarama/pull/367)).
Expand Down
5 changes: 4 additions & 1 deletion async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,13 @@ func (p *asyncProducer) topicDispatcher() {
func (p *asyncProducer) partitionDispatcher(topic string, input chan *ProducerMessage) {
handlers := make(map[int32]chan *ProducerMessage)
partitioner := p.conf.Producer.Partitioner(topic)
breaker := breaker.New(3, 1, 10*time.Second)

for msg := range input {
if msg.retries == 0 {
err := p.assignPartition(partitioner, msg)
err := breaker.Run(func() error {
return p.assignPartition(partitioner, msg)
})
if err != nil {
p.returnError(msg, err)
continue
Expand Down

0 comments on commit 5ca5e10

Please sign in to comment.