diff --git a/async_producer.go b/async_producer.go index e4af98888..770e9f1be 100644 --- a/async_producer.go +++ b/async_producer.go @@ -271,10 +271,7 @@ func (p *asyncProducer) partitionDispatcher(topic string, input <-chan *Producer for msg := range input { if msg.retries == 0 { - err := breaker.Run(func() error { - return p.assignPartition(partitioner, msg) - }) - if err != nil { + if err := p.assignPartition(breaker, partitioner, msg); err != nil { p.returnError(msg, err) continue } @@ -636,15 +633,17 @@ func (p *asyncProducer) shutdown() { close(p.successes) } -func (p *asyncProducer) assignPartition(partitioner Partitioner, msg *ProducerMessage) error { +func (p *asyncProducer) assignPartition(breaker *breaker.Breaker, partitioner Partitioner, msg *ProducerMessage) error { var partitions []int32 - var err error - if partitioner.RequiresConsistency() { - partitions, err = p.client.Partitions(msg.Topic) - } else { - partitions, err = p.client.WritablePartitions(msg.Topic) - } + err := breaker.Run(func() (err error) { + if partitioner.RequiresConsistency() { + partitions, err = p.client.Partitions(msg.Topic) + } else { + partitions, err = p.client.WritablePartitions(msg.Topic) + } + return + }) if err != nil { return err diff --git a/async_producer_test.go b/async_producer_test.go index 8f271f6fd..93690b2bf 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -1,6 +1,7 @@ package sarama import ( + "errors" "log" "os" "os/signal" @@ -31,22 +32,48 @@ func closeProducer(t *testing.T, p AsyncProducer) { wg.Wait() } -func expectSuccesses(t *testing.T, p AsyncProducer, successes int) { - for i := 0; i < successes; i++ { +func expectResults(t *testing.T, p AsyncProducer, successes, errors int) { + for successes > 0 || errors > 0 { select { case msg := <-p.Errors(): - t.Error(msg.Err) if msg.Msg.flags != 0 { t.Error("Message had flags set") } + errors-- + if errors < 0 { + t.Error(msg.Err) + } case msg := <-p.Successes(): if msg.flags != 0 { t.Error("Message had flags set") } + successes-- + if successes < 0 { + t.Error("Too many successes") + } } } } +type testPartitioner chan *int32 + +func (p testPartitioner) Partition(msg *ProducerMessage, numPartitions int32) (int32, error) { + part := <-p + if part == nil { + return 0, errors.New("BOOM") + } + + return *part, nil +} + +func (p testPartitioner) RequiresConsistency() bool { + return true +} + +func (p testPartitioner) feed(partition int32) { + p <- &partition +} + func TestAsyncProducer(t *testing.T) { seedBroker := newMockBroker(t, 1) leader := newMockBroker(t, 2) @@ -120,7 +147,7 @@ func TestAsyncProducerMultipleFlushes(t *testing.T) { for i := 0; i < 5; i++ { producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} } - expectSuccesses(t, producer, 5) + expectResults(t, producer, 5, 0) } closeProducer(t, producer) @@ -160,7 +187,7 @@ func TestAsyncProducerMultipleBrokers(t *testing.T) { for i := 0; i < 10; i++ { producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} } - expectSuccesses(t, producer, 10) + expectResults(t, producer, 10, 0) closeProducer(t, producer) leader1.Close() @@ -168,6 +195,48 @@ func TestAsyncProducerMultipleBrokers(t *testing.T) { seedBroker.Close() } +func TestAsyncProducerCustomPartitioner(t *testing.T) { + seedBroker := newMockBroker(t, 1) + leader := newMockBroker(t, 2) + + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) + metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) + seedBroker.Returns(metadataResponse) + + prodResponse := new(ProduceResponse) + prodResponse.AddTopicPartition("my_topic", 0, ErrNoError) + leader.Returns(prodResponse) + + config := NewConfig() + config.Producer.Flush.Messages = 2 + config.Producer.Return.Successes = true + config.Producer.Partitioner = func(topic string) Partitioner { + p := make(testPartitioner) + go func() { + p.feed(0) + p <- nil + p <- nil + p <- nil + p.feed(0) + }() + return p + } + producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 5; i++ { + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} + } + expectResults(t, producer, 2, 3) + + closeProducer(t, producer) + leader.Close() + seedBroker.Close() +} + func TestAsyncProducerFailureRetry(t *testing.T) { seedBroker := newMockBroker(t, 1) leader1 := newMockBroker(t, 2) @@ -203,14 +272,14 @@ func TestAsyncProducerFailureRetry(t *testing.T) { prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader2.Returns(prodSuccess) - expectSuccesses(t, producer, 10) + expectResults(t, producer, 10, 0) leader1.Close() for i := 0; i < 10; i++ { producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} } leader2.Returns(prodSuccess) - expectSuccesses(t, producer, 10) + expectResults(t, producer, 10, 0) leader2.Close() closeProducer(t, producer) @@ -245,7 +314,7 @@ func TestAsyncProducerBrokerBounce(t *testing.T) { prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader.Returns(prodSuccess) - expectSuccesses(t, producer, 10) + expectResults(t, producer, 10, 0) seedBroker.Close() leader.Close() @@ -288,7 +357,7 @@ func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) { prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader2.Returns(prodSuccess) - expectSuccesses(t, producer, 10) + expectResults(t, producer, 10, 0) seedBroker.Close() leader2.Close() @@ -336,13 +405,13 @@ func TestAsyncProducerMultipleRetries(t *testing.T) { prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader2.Returns(prodSuccess) - expectSuccesses(t, producer, 10) + expectResults(t, producer, 10, 0) for i := 0; i < 10; i++ { producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} } leader2.Returns(prodSuccess) - expectSuccesses(t, producer, 10) + expectResults(t, producer, 10, 0) seedBroker.Close() leader1.Close() @@ -400,7 +469,7 @@ func TestAsyncProducerOutOfRetries(t *testing.T) { prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader.Returns(prodSuccess) - expectSuccesses(t, producer, 10) + expectResults(t, producer, 10, 0) leader.Close() seedBroker.Close() @@ -433,14 +502,14 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) { prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader.Returns(prodSuccess) - expectSuccesses(t, producer, 1) + expectResults(t, producer, 1, 0) // prime partition 1 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} prodSuccess = new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError) leader.Returns(prodSuccess) - expectSuccesses(t, producer, 1) + expectResults(t, producer, 1, 0) // reboot the broker (the producer will get EOF on its existing connection) leader.Close() @@ -456,7 +525,7 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) { prodSuccess = new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader.Returns(prodSuccess) - expectSuccesses(t, producer, 1) + expectResults(t, producer, 1, 0) // shutdown closeProducer(t, producer) @@ -493,7 +562,7 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) { prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", p, ErrNoError) leader.Returns(prodSuccess) - expectSuccesses(t, producer, 5) + expectResults(t, producer, 5, 0) } // send more messages on partition 0 @@ -511,14 +580,14 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) { prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader.Returns(prodSuccess) - expectSuccesses(t, producer, 5) + expectResults(t, producer, 5, 0) // put five more through for i := 0; i < 5; i++ { producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0} } leader.Returns(prodSuccess) - expectSuccesses(t, producer, 5) + expectResults(t, producer, 5, 0) // shutdown closeProducer(t, producer) @@ -564,7 +633,7 @@ func TestAsyncProducerRetryShutdown(t *testing.T) { prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader.Returns(prodSuccess) - expectSuccesses(t, producer, 10) + expectResults(t, producer, 10, 0) seedBroker.Close() leader.Close()