Skip to content

Commit

Permalink
Add a test for retries during shutdow
Browse files Browse the repository at this point in the history
  • Loading branch information
eapache committed Apr 24, 2015
1 parent 00db3e7 commit bd75081
Showing 1 changed file with 43 additions and 0 deletions.
43 changes: 43 additions & 0 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,49 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
leader.Close()
}

func TestAsyncProducerRetryShutdown(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)

metadataLeader := new(MetadataResponse)
metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataLeader)

config := NewConfig()
config.Producer.Flush.Messages = 10
config.Producer.Return.Successes = true
config.Producer.Retry.Backoff = 0
producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}
producer.AsyncClose()

prodNotLeader := new(ProduceResponse)
prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
leader.Returns(prodNotLeader)

seedBroker.Returns(metadataLeader)

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)
expectSuccesses(t, producer, 10)

seedBroker.Close()
leader.Close()

// wait for the async-closed producer to shut down fully
for err := range producer.Errors() {
t.Error(err)
}
}

// This example shows how to use the producer while simultaneously
// reading the Errors channel to know about any failures.
func ExampleAsyncProducer_select() {
Expand Down

0 comments on commit bd75081

Please sign in to comment.