From 42123dd0dd3326bb1b86bfb334a3b93edb71808a Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Tue, 14 Apr 2015 19:34:37 +0000 Subject: [PATCH 1/2] Retry messages on shutdown --- async_producer.go | 111 ++++++++++++++++++++-------------------------- 1 file changed, 48 insertions(+), 63 deletions(-) diff --git a/async_producer.go b/async_producer.go index d8f2b06c9..9efbee525 100644 --- a/async_producer.go +++ b/async_producer.go @@ -54,6 +54,7 @@ type asyncProducer struct { errors chan *ProducerError input, successes, retries chan *ProducerMessage + inFlight sync.WaitGroup brokers map[*Broker]chan *ProducerMessage brokerRefs map[chan *ProducerMessage]int @@ -105,8 +106,6 @@ type flagSet int8 const ( chaser flagSet = 1 << iota // message is last in a group that failed - ref // add a reference to a singleton channel - unref // remove a reference from a singleton channel shutdown // start the shutdown process ) @@ -193,9 +192,7 @@ func (p *asyncProducer) Close() error { } func (p *asyncProducer) AsyncClose() { - go withRecover(func() { - p.input <- &ProducerMessage{flags: shutdown} - }) + go withRecover(p.shutdown) } /////////////////////////////////////////// @@ -209,6 +206,7 @@ func (p *asyncProducer) AsyncClose() { // dispatches messages by topic func (p *asyncProducer) topicDispatcher() { handlers := make(map[string]chan *ProducerMessage) + shuttingDown := false for msg := range p.input { if msg == nil { @@ -217,8 +215,14 @@ func (p *asyncProducer) topicDispatcher() { } if msg.flags&shutdown != 0 { - Logger.Println("Producer shutting down.") - break + shuttingDown = true + continue + } else if msg.retries == 0 { + p.inFlight.Add(1) + if shuttingDown { + p.returnError(msg, ErrShuttingDown) + continue + } } if (p.conf.Producer.Compression == CompressionNone && msg.Value != nil && msg.Value.Length() > p.conf.Producer.MaxMessageBytes) || @@ -230,7 +234,6 @@ func (p *asyncProducer) topicDispatcher() { handler := handlers[msg.Topic] if handler == nil { - p.retries <- &ProducerMessage{flags: ref} newHandler := make(chan *ProducerMessage, p.conf.ChannelBufferSize) topic := msg.Topic // block local because go's closure semantics suck go withRecover(func() { p.partitionDispatcher(topic, newHandler) }) @@ -244,21 +247,6 @@ func (p *asyncProducer) topicDispatcher() { for _, handler := range handlers { close(handler) } - - p.retries <- &ProducerMessage{flags: shutdown} - - for msg := range p.input { - p.returnError(msg, ErrShuttingDown) - } - - if p.ownClient { - err := p.client.Close() - if err != nil { - Logger.Println("producer/shutdown failed to close the embedded client:", err) - } - } - close(p.errors) - close(p.successes) } // one per topic @@ -281,7 +269,6 @@ func (p *asyncProducer) partitionDispatcher(topic string, input chan *ProducerMe handler := handlers[msg.Partition] if handler == nil { - p.retries <- &ProducerMessage{flags: ref} newHandler := make(chan *ProducerMessage, p.conf.ChannelBufferSize) topic := msg.Topic // block local because go's closure semantics suck partition := msg.Partition // block local because go's closure semantics suck @@ -296,7 +283,6 @@ func (p *asyncProducer) partitionDispatcher(topic string, input chan *ProducerMe for _, handler := range handlers { close(handler) } - p.retries <- &ProducerMessage{flags: unref} } // one per partition per topic @@ -344,6 +330,7 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch highWatermark = msg.retries Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", topic, partition, highWatermark) retryState[msg.retries].expectChaser = true + p.inFlight.Add(1) // we're generating a chaser message; track it so we don't shut down while it's still inflight output <- &ProducerMessage{Topic: topic, Partition: partition, flags: chaser, retries: msg.retries - 1} Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", topic, partition, leader.ID()) p.unrefBrokerProducer(leader, output) @@ -355,6 +342,7 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch // in fact this message is not even the current retry level, so buffer it for now (unless it's a just a chaser) if msg.flags&chaser == chaser { retryState[msg.retries].expectChaser = false + p.inFlight.Done() // this chaser is now handled and will be garbage collected } else { retryState[msg.retries].buf = append(retryState[msg.retries].buf, msg) } @@ -392,6 +380,7 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch } } + p.inFlight.Done() // this chaser is now handled and will be garbage collected continue } } @@ -414,7 +403,6 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch if output != nil { p.unrefBrokerProducer(leader, output) } - p.retries <- &ProducerMessage{flags: unref} } // one per broker @@ -543,9 +531,7 @@ func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) { if response == nil { // this only happens when RequiredAcks is NoResponse, so we have to assume success - if p.conf.Producer.Return.Successes { - p.returnSuccesses(batch) - } + p.returnSuccesses(batch) continue } @@ -563,12 +549,10 @@ func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) { switch block.Err { case ErrNoError: // All the messages for this topic-partition were delivered successfully! - if p.conf.Producer.Return.Successes { - for i := range msgs { - msgs[i].Offset = block.Offset + int64(i) - } - p.returnSuccesses(msgs) + for i := range msgs { + msgs[i].Offset = block.Offset + int64(i) } + p.returnSuccesses(msgs) case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend: Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n", @@ -585,19 +569,14 @@ func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) { } } Logger.Printf("producer/flusher/%d shut down\n", broker.ID()) - p.retries <- &ProducerMessage{flags: unref} } // singleton // effectively a "bridge" between the flushers and the topicDispatcher in order to avoid deadlock // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel func (p *asyncProducer) retryHandler() { - var ( - msg *ProducerMessage - buf = queue.New() - refs = 0 - shuttingDown = false - ) + var msg *ProducerMessage + buf := queue.New() for { if buf.Length() == 0 { @@ -611,29 +590,12 @@ func (p *asyncProducer) retryHandler() { } } - if msg.flags&ref != 0 { - refs++ - } else if msg.flags&unref != 0 { - refs-- - if refs == 0 && shuttingDown { - break - } - } else if msg.flags&shutdown != 0 { - shuttingDown = true - if refs == 0 { - break - } - } else { - buf.Add(msg) + if msg == nil { + return } - } - close(p.retries) - for buf.Length() != 0 { - p.input <- buf.Peek().(*ProducerMessage) - buf.Remove() + buf.Add(msg) } - close(p.input) } /////////////////////////////////////////// @@ -641,6 +603,25 @@ func (p *asyncProducer) retryHandler() { // utility functions +func (p *asyncProducer) shutdown() { + Logger.Println("Producer shutting down.") + p.input <- &ProducerMessage{flags: shutdown} + + p.inFlight.Wait() + + if p.ownClient { + err := p.client.Close() + if err != nil { + Logger.Println("producer/shutdown failed to close the embedded client:", err) + } + } + + close(p.input) + close(p.retries) + close(p.errors) + close(p.successes) +} + func (p *asyncProducer) assignPartition(partitioner Partitioner, msg *ProducerMessage) error { var partitions []int32 var err error @@ -745,6 +726,7 @@ func (p *asyncProducer) returnError(msg *ProducerMessage, err error) { } else { Logger.Println(pErr) } + p.inFlight.Done() } func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) { @@ -757,10 +739,14 @@ func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) { func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) { for _, msg := range batch { - if msg != nil { + if msg == nil { + continue + } + if p.conf.Producer.Return.Successes { msg.flags = 0 p.successes <- msg } + p.inFlight.Done() } } @@ -785,7 +771,6 @@ func (p *asyncProducer) getBrokerProducer(broker *Broker) chan *ProducerMessage bp := p.brokers[broker] if bp == nil { - p.retries <- &ProducerMessage{flags: ref} bp = make(chan *ProducerMessage) p.brokers[broker] = bp p.brokerRefs[bp] = 0 From 08ccf5e1b5a761b33a7f0d5a0fdbf0b5fc322e2d Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Fri, 17 Apr 2015 19:16:55 +0000 Subject: [PATCH 2/2] Add a test for retries during shutdow --- async_producer_test.go | 50 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/async_producer_test.go b/async_producer_test.go index 0242dda80..8f271f6fd 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -6,6 +6,7 @@ import ( "os/signal" "sync" "testing" + "time" ) const TestMessage = "ABC THE MESSAGE" @@ -525,6 +526,55 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) { leader.Close() } +func TestAsyncProducerRetryShutdown(t *testing.T) { + seedBroker := newMockBroker(t, 1) + leader := newMockBroker(t, 2) + + metadataLeader := new(MetadataResponse) + metadataLeader.AddBroker(leader.Addr(), leader.BrokerID()) + metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) + seedBroker.Returns(metadataLeader) + + config := NewConfig() + config.Producer.Flush.Messages = 10 + config.Producer.Return.Successes = true + config.Producer.Retry.Backoff = 0 + producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 10; i++ { + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} + } + producer.AsyncClose() + time.Sleep(5 * time.Millisecond) // let the shutdown goroutine kick in + + producer.Input() <- &ProducerMessage{Topic: "FOO"} + if err := <-producer.Errors(); err.Err != ErrShuttingDown { + t.Error(err) + } + + prodNotLeader := new(ProduceResponse) + prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition) + leader.Returns(prodNotLeader) + + seedBroker.Returns(metadataLeader) + + prodSuccess := new(ProduceResponse) + prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) + leader.Returns(prodSuccess) + expectSuccesses(t, producer, 10) + + seedBroker.Close() + leader.Close() + + // wait for the async-closed producer to shut down fully + for err := range producer.Errors() { + t.Error(err) + } +} + // This example shows how to use the producer while simultaneously // reading the Errors channel to know about any failures. func ExampleAsyncProducer_select() {