From bd75081a09f970a7b24350eec30d010777ef22f1 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Fri, 17 Apr 2015 19:16:55 +0000 Subject: [PATCH] Add a test for retries during shutdow --- async_producer_test.go | 43 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/async_producer_test.go b/async_producer_test.go index 0242dda80..2fbbbd2b7 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -525,6 +525,49 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) { leader.Close() } +func TestAsyncProducerRetryShutdown(t *testing.T) { + seedBroker := newMockBroker(t, 1) + leader := newMockBroker(t, 2) + + metadataLeader := new(MetadataResponse) + metadataLeader.AddBroker(leader.Addr(), leader.BrokerID()) + metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) + seedBroker.Returns(metadataLeader) + + config := NewConfig() + config.Producer.Flush.Messages = 10 + config.Producer.Return.Successes = true + config.Producer.Retry.Backoff = 0 + producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 10; i++ { + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} + } + producer.AsyncClose() + + prodNotLeader := new(ProduceResponse) + prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition) + leader.Returns(prodNotLeader) + + seedBroker.Returns(metadataLeader) + + prodSuccess := new(ProduceResponse) + prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) + leader.Returns(prodSuccess) + expectSuccesses(t, producer, 10) + + seedBroker.Close() + leader.Close() + + // wait for the async-closed producer to shut down fully + for err := range producer.Errors() { + t.Error(err) + } +} + // This example shows how to use the producer while simultaneously // reading the Errors channel to know about any failures. func ExampleAsyncProducer_select() {