diff --git a/async_producer.go b/async_producer.go index 7ab1ae7e4..6378ae5c5 100644 --- a/async_producer.go +++ b/async_producer.go @@ -221,6 +221,7 @@ func (p *asyncProducer) topicDispatcher() { if msg.flags&shutdown != 0 { shuttingDown = true + p.inFlight.Done() continue } else if msg.retries == 0 { p.inFlight.Add(1) @@ -256,7 +257,7 @@ func (p *asyncProducer) topicDispatcher() { // one per topic // partitions messages, then dispatches them by partition -func (p *asyncProducer) partitionDispatcher(topic string, input chan *ProducerMessage) { +func (p *asyncProducer) partitionDispatcher(topic string, input <-chan *ProducerMessage) { handlers := make(map[int32]chan *ProducerMessage) partitioner := p.conf.Producer.Partitioner(topic) breaker := breaker.New(3, 1, 10*time.Second) @@ -293,7 +294,7 @@ func (p *asyncProducer) partitionDispatcher(topic string, input chan *ProducerMe // one per partition per topic // dispatches messages to the appropriate broker // also responsible for maintaining message order during retries -func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input chan *ProducerMessage) { +func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input <-chan *ProducerMessage) { var leader *Broker var output chan *ProducerMessage @@ -413,7 +414,7 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch // one per broker // groups messages together into appropriately-sized batches for sending to the broker // based on https://godoc.org/github.com/eapache/channels#BatchingChannel -func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMessage) { +func (p *asyncProducer) messageAggregator(broker *Broker, input <-chan *ProducerMessage) { var ( timer <-chan time.Time buffer []*ProducerMessage @@ -477,7 +478,7 @@ shutdown: // one per broker // takes a batch at a time from the messageAggregator and sends to the broker -func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) { +func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage) { var closing error currentRetries := make(map[string]map[int32]error) Logger.Printf("producer/flusher/%d starting up\n", broker.ID()) @@ -610,6 +611,7 @@ func (p *asyncProducer) retryHandler() { func (p *asyncProducer) shutdown() { Logger.Println("Producer shutting down.") + p.inFlight.Add(1) p.input <- &ProducerMessage{flags: shutdown} p.inFlight.Wait()