Skip to content

Commit

Permalink
Merge pull request #450 from Shopify/producer_cleanup
Browse files Browse the repository at this point in the history
Ensure we always have called Add() on the inflight counter before we Wait() for it
  • Loading branch information
eapache committed May 18, 2015
2 parents 23d5233 + 831d561 commit 6468e16
Showing 1 changed file with 6 additions and 4 deletions.
10 changes: 6 additions & 4 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ func (p *asyncProducer) topicDispatcher() {

if msg.flags&shutdown != 0 {
shuttingDown = true
p.inFlight.Done()
continue
} else if msg.retries == 0 {
p.inFlight.Add(1)
Expand Down Expand Up @@ -256,7 +257,7 @@ func (p *asyncProducer) topicDispatcher() {

// one per topic
// partitions messages, then dispatches them by partition
func (p *asyncProducer) partitionDispatcher(topic string, input chan *ProducerMessage) {
func (p *asyncProducer) partitionDispatcher(topic string, input <-chan *ProducerMessage) {
handlers := make(map[int32]chan *ProducerMessage)
partitioner := p.conf.Producer.Partitioner(topic)
breaker := breaker.New(3, 1, 10*time.Second)
Expand Down Expand Up @@ -293,7 +294,7 @@ func (p *asyncProducer) partitionDispatcher(topic string, input chan *ProducerMe
// one per partition per topic
// dispatches messages to the appropriate broker
// also responsible for maintaining message order during retries
func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input chan *ProducerMessage) {
func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input <-chan *ProducerMessage) {
var leader *Broker
var output chan *ProducerMessage

Expand Down Expand Up @@ -413,7 +414,7 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
// one per broker
// groups messages together into appropriately-sized batches for sending to the broker
// based on https://godoc.org/github.com/eapache/channels#BatchingChannel
func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMessage) {
func (p *asyncProducer) messageAggregator(broker *Broker, input <-chan *ProducerMessage) {
var (
timer <-chan time.Time
buffer []*ProducerMessage
Expand Down Expand Up @@ -477,7 +478,7 @@ shutdown:

// one per broker
// takes a batch at a time from the messageAggregator and sends to the broker
func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) {
func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage) {
var closing error
currentRetries := make(map[string]map[int32]error)
Logger.Printf("producer/flusher/%d starting up\n", broker.ID())
Expand Down Expand Up @@ -610,6 +611,7 @@ func (p *asyncProducer) retryHandler() {

func (p *asyncProducer) shutdown() {
Logger.Println("Producer shutting down.")
p.inFlight.Add(1)
p.input <- &ProducerMessage{flags: shutdown}

p.inFlight.Wait()
Expand Down

0 comments on commit 6468e16

Please sign in to comment.