From 00db3e70cdd1c90ede839126b2fde20313bbd19b Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Tue, 14 Apr 2015 19:34:37 +0000 Subject: [PATCH] Retry messages on shutdown --- async_producer.go | 102 +++++++++++++++++++--------------------------- 1 file changed, 43 insertions(+), 59 deletions(-) diff --git a/async_producer.go b/async_producer.go index 59c29a219..4cad74335 100644 --- a/async_producer.go +++ b/async_producer.go @@ -54,6 +54,7 @@ type asyncProducer struct { errors chan *ProducerError input, successes, retries chan *ProducerMessage + inFlight sync.WaitGroup brokers map[*Broker]chan *ProducerMessage brokerRefs map[chan *ProducerMessage]int @@ -105,8 +106,6 @@ type flagSet int8 const ( chaser flagSet = 1 << iota // message is last in a group that failed - ref // add a reference to a singleton channel - unref // remove a reference from a singleton channel shutdown // start the shutdown process ) @@ -209,6 +208,7 @@ func (p *asyncProducer) AsyncClose() { // dispatches messages by topic func (p *asyncProducer) topicDispatcher() { handlers := make(map[string]chan *ProducerMessage) + shuttingDown := false for msg := range p.input { if msg == nil { @@ -218,7 +218,16 @@ func (p *asyncProducer) topicDispatcher() { if msg.flags&shutdown != 0 { Logger.Println("Producer shutting down.") - break + shuttingDown = true + go withRecover(p.shutdown) + continue + } else if msg.retries == 0 { + if shuttingDown { + p.returnError(msg, ErrShuttingDown) + continue + } else { + p.inFlight.Add(1) + } } if (p.conf.Producer.Compression == CompressionNone && msg.Value != nil && msg.Value.Length() > p.conf.Producer.MaxMessageBytes) || @@ -230,7 +239,6 @@ func (p *asyncProducer) topicDispatcher() { handler := handlers[msg.Topic] if handler == nil { - p.retries <- &ProducerMessage{flags: ref} newHandler := make(chan *ProducerMessage, p.conf.ChannelBufferSize) topic := msg.Topic // block local because go's closure semantics suck go withRecover(func() { p.partitionDispatcher(topic, newHandler) }) @@ -244,21 +252,6 @@ func (p *asyncProducer) topicDispatcher() { for _, handler := range handlers { close(handler) } - - p.retries <- &ProducerMessage{flags: shutdown} - - for msg := range p.input { - p.returnError(msg, ErrShuttingDown) - } - - if p.ownClient { - err := p.client.Close() - if err != nil { - Logger.Println("producer/shutdown failed to close the embedded client:", err) - } - } - close(p.errors) - close(p.successes) } // one per topic @@ -281,7 +274,6 @@ func (p *asyncProducer) partitionDispatcher(topic string, input chan *ProducerMe handler := handlers[msg.Partition] if handler == nil { - p.retries <- &ProducerMessage{flags: ref} newHandler := make(chan *ProducerMessage, p.conf.ChannelBufferSize) topic := msg.Topic // block local because go's closure semantics suck partition := msg.Partition // block local because go's closure semantics suck @@ -296,7 +288,6 @@ func (p *asyncProducer) partitionDispatcher(topic string, input chan *ProducerMe for _, handler := range handlers { close(handler) } - p.retries <- &ProducerMessage{flags: unref} } // one per partition per topic @@ -414,7 +405,6 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch if output != nil { p.unrefBrokerProducer(leader, output) } - p.retries <- &ProducerMessage{flags: unref} } // one per broker @@ -543,9 +533,7 @@ func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) { if response == nil { // this only happens when RequiredAcks is NoResponse, so we have to assume success - if p.conf.Producer.Return.Successes { - p.returnSuccesses(batch) - } + p.returnSuccesses(batch) continue } @@ -563,12 +551,10 @@ func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) { switch block.Err { case ErrNoError: // All the messages for this topic-partition were delivered successfully! - if p.conf.Producer.Return.Successes { - for i := range msgs { - msgs[i].Offset = block.Offset + int64(i) - } - p.returnSuccesses(msgs) + for i := range msgs { + msgs[i].Offset = block.Offset + int64(i) } + p.returnSuccesses(msgs) case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend: Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n", @@ -585,19 +571,14 @@ func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) { } } Logger.Printf("producer/flusher/%d shut down\n", broker.ID()) - p.retries <- &ProducerMessage{flags: unref} } // singleton // effectively a "bridge" between the flushers and the topicDispatcher in order to avoid deadlock // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel func (p *asyncProducer) retryHandler() { - var ( - msg *ProducerMessage - buf = queue.New() - refs = 0 - shuttingDown = false - ) + var msg *ProducerMessage + buf := queue.New() for { if buf.Length() == 0 { @@ -611,29 +592,12 @@ func (p *asyncProducer) retryHandler() { } } - if msg.flags&ref != 0 { - refs++ - } else if msg.flags&unref != 0 { - refs-- - if refs == 0 && shuttingDown { - break - } - } else if msg.flags&shutdown != 0 { - shuttingDown = true - if refs == 0 { - break - } - } else { - buf.Add(msg) + if msg == nil { + return } - } - close(p.retries) - for buf.Length() != 0 { - p.input <- buf.Peek().(*ProducerMessage) - buf.Remove() + buf.Add(msg) } - close(p.input) } /////////////////////////////////////////// @@ -641,6 +605,22 @@ func (p *asyncProducer) retryHandler() { // utility functions +func (p *asyncProducer) shutdown() { + p.inFlight.Wait() + + if p.ownClient { + err := p.client.Close() + if err != nil { + Logger.Println("producer/shutdown failed to close the embedded client:", err) + } + } + + close(p.errors) + close(p.successes) + close(p.input) + close(p.retries) +} + func (p *asyncProducer) assignPartition(partitioner Partitioner, msg *ProducerMessage) error { var partitions []int32 var err error @@ -745,6 +725,7 @@ func (p *asyncProducer) returnError(msg *ProducerMessage, err error) { } else { Logger.Println(pErr) } + p.inFlight.Done() } func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) { @@ -757,10 +738,14 @@ func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) { func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) { for _, msg := range batch { - if msg != nil { + if msg == nil { + continue + } + if p.conf.Producer.Return.Successes { msg.flags = 0 p.successes <- msg } + p.inFlight.Done() } } @@ -785,7 +770,6 @@ func (p *asyncProducer) getBrokerProducer(broker *Broker) chan *ProducerMessage bp := p.brokers[broker] if bp == nil { - p.retries <- &ProducerMessage{flags: ref} bp = make(chan *ProducerMessage) p.brokers[broker] = bp p.brokerRefs[bp] = 0