diff --git a/async_producer_test.go b/async_producer_test.go index 7febbb7de..7306256aa 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -33,13 +33,15 @@ func closeProducer(t *testing.T, p AsyncProducer) { } func expectResults(t *testing.T, p AsyncProducer, successes, errors int) { - for successes > 0 || errors > 0 { + expect := successes + errors + for expect > 0 { select { case msg := <-p.Errors(): if msg.Msg.flags != 0 { t.Error("Message had flags set") } errors-- + expect-- if errors < 0 { t.Error(msg.Err) } @@ -48,11 +50,15 @@ func expectResults(t *testing.T, p AsyncProducer, successes, errors int) { t.Error("Message had flags set") } successes-- + expect-- if successes < 0 { t.Error("Too many successes") } } } + if successes != 0 || errors != 0 { + t.Error("Unexpected successes", successes, "or errors", errors) + } } type testPartitioner chan *int32 @@ -74,6 +80,19 @@ func (p testPartitioner) feed(partition int32) { p <- &partition } +type flakyEncoder bool + +func (f flakyEncoder) Length() int { + return len(TestMessage) +} + +func (f flakyEncoder) Encode() ([]byte, error) { + if !bool(f) { + return nil, errors.New("flaky encoding error") + } + return []byte(TestMessage), nil +} + func TestAsyncProducer(t *testing.T) { seedBroker := newMockBroker(t, 1) leader := newMockBroker(t, 2) @@ -285,6 +304,42 @@ func TestAsyncProducerFailureRetry(t *testing.T) { closeProducer(t, producer) } +func TestAsyncProducerEncoderFailures(t *testing.T) { + seedBroker := newMockBroker(t, 1) + leader := newMockBroker(t, 2) + + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) + metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) + seedBroker.Returns(metadataResponse) + + prodSuccess := new(ProduceResponse) + prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) + leader.Returns(prodSuccess) + leader.Returns(prodSuccess) + leader.Returns(prodSuccess) + + config := NewConfig() + config.Producer.Flush.Messages = 3 + config.Producer.Return.Successes = true + config.Producer.Partitioner = NewManualPartitioner + producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + for flush := 0; flush < 3; flush++ { + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(false)} + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(true)} + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(false), Value: flakyEncoder(true)} + expectResults(t, producer, 1, 2) + } + + closeProducer(t, producer) + leader.Close() + seedBroker.Close() +} + // If a Kafka broker becomes unavailable and then returns back in service, then // producer reconnects to it and continues sending messages. func TestAsyncProducerBrokerBounce(t *testing.T) {