From 3e798fd0a3929f82745ac2a4d5f3471390c65a96 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Tue, 9 Jun 2015 09:50:27 -0400 Subject: [PATCH 1/2] Shrink the scope of the partition circuit-breaker It shouldn't wrap the entirety of `assignPartition`, since that also protects things like the call to actually choose the partition. The only thing it is supposed to protect is the call to get the list of partitions, since that is what can block. --- async_producer.go | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/async_producer.go b/async_producer.go index e4af98888..770e9f1be 100644 --- a/async_producer.go +++ b/async_producer.go @@ -271,10 +271,7 @@ func (p *asyncProducer) partitionDispatcher(topic string, input <-chan *Producer for msg := range input { if msg.retries == 0 { - err := breaker.Run(func() error { - return p.assignPartition(partitioner, msg) - }) - if err != nil { + if err := p.assignPartition(breaker, partitioner, msg); err != nil { p.returnError(msg, err) continue } @@ -636,15 +633,17 @@ func (p *asyncProducer) shutdown() { close(p.successes) } -func (p *asyncProducer) assignPartition(partitioner Partitioner, msg *ProducerMessage) error { +func (p *asyncProducer) assignPartition(breaker *breaker.Breaker, partitioner Partitioner, msg *ProducerMessage) error { var partitions []int32 - var err error - if partitioner.RequiresConsistency() { - partitions, err = p.client.Partitions(msg.Topic) - } else { - partitions, err = p.client.WritablePartitions(msg.Topic) - } + err := breaker.Run(func() (err error) { + if partitioner.RequiresConsistency() { + partitions, err = p.client.Partitions(msg.Topic) + } else { + partitions, err = p.client.WritablePartitions(msg.Topic) + } + return + }) if err != nil { return err From b21ab845b555d1b501fc2b9a3ef47b38833d98f1 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Tue, 9 Jun 2015 13:26:22 -0400 Subject: [PATCH 2/2] Add a custom-partitioner test --- async_producer_test.go | 107 +++++++++++++++++++++++++++++++++-------- 1 file changed, 88 insertions(+), 19 deletions(-) diff --git a/async_producer_test.go b/async_producer_test.go index 8f271f6fd..93690b2bf 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -1,6 +1,7 @@ package sarama import ( + "errors" "log" "os" "os/signal" @@ -31,22 +32,48 @@ func closeProducer(t *testing.T, p AsyncProducer) { wg.Wait() } -func expectSuccesses(t *testing.T, p AsyncProducer, successes int) { - for i := 0; i < successes; i++ { +func expectResults(t *testing.T, p AsyncProducer, successes, errors int) { + for successes > 0 || errors > 0 { select { case msg := <-p.Errors(): - t.Error(msg.Err) if msg.Msg.flags != 0 { t.Error("Message had flags set") } + errors-- + if errors < 0 { + t.Error(msg.Err) + } case msg := <-p.Successes(): if msg.flags != 0 { t.Error("Message had flags set") } + successes-- + if successes < 0 { + t.Error("Too many successes") + } } } } +type testPartitioner chan *int32 + +func (p testPartitioner) Partition(msg *ProducerMessage, numPartitions int32) (int32, error) { + part := <-p + if part == nil { + return 0, errors.New("BOOM") + } + + return *part, nil +} + +func (p testPartitioner) RequiresConsistency() bool { + return true +} + +func (p testPartitioner) feed(partition int32) { + p <- &partition +} + func TestAsyncProducer(t *testing.T) { seedBroker := newMockBroker(t, 1) leader := newMockBroker(t, 2) @@ -120,7 +147,7 @@ func TestAsyncProducerMultipleFlushes(t *testing.T) { for i := 0; i < 5; i++ { producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} } - expectSuccesses(t, producer, 5) + expectResults(t, producer, 5, 0) } closeProducer(t, producer) @@ -160,7 +187,7 @@ func TestAsyncProducerMultipleBrokers(t *testing.T) { for i := 0; i < 10; i++ { producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} } - expectSuccesses(t, producer, 10) + expectResults(t, producer, 10, 0) closeProducer(t, producer) leader1.Close() @@ -168,6 +195,48 @@ func TestAsyncProducerMultipleBrokers(t *testing.T) { seedBroker.Close() } +func TestAsyncProducerCustomPartitioner(t *testing.T) { + seedBroker := newMockBroker(t, 1) + leader := newMockBroker(t, 2) + + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) + metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) + seedBroker.Returns(metadataResponse) + + prodResponse := new(ProduceResponse) + prodResponse.AddTopicPartition("my_topic", 0, ErrNoError) + leader.Returns(prodResponse) + + config := NewConfig() + config.Producer.Flush.Messages = 2 + config.Producer.Return.Successes = true + config.Producer.Partitioner = func(topic string) Partitioner { + p := make(testPartitioner) + go func() { + p.feed(0) + p <- nil + p <- nil + p <- nil + p.feed(0) + }() + return p + } + producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 5; i++ { + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} + } + expectResults(t, producer, 2, 3) + + closeProducer(t, producer) + leader.Close() + seedBroker.Close() +} + func TestAsyncProducerFailureRetry(t *testing.T) { seedBroker := newMockBroker(t, 1) leader1 := newMockBroker(t, 2) @@ -203,14 +272,14 @@ func TestAsyncProducerFailureRetry(t *testing.T) { prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader2.Returns(prodSuccess) - expectSuccesses(t, producer, 10) + expectResults(t, producer, 10, 0) leader1.Close() for i := 0; i < 10; i++ { producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} } leader2.Returns(prodSuccess) - expectSuccesses(t, producer, 10) + expectResults(t, producer, 10, 0) leader2.Close() closeProducer(t, producer) @@ -245,7 +314,7 @@ func TestAsyncProducerBrokerBounce(t *testing.T) { prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader.Returns(prodSuccess) - expectSuccesses(t, producer, 10) + expectResults(t, producer, 10, 0) seedBroker.Close() leader.Close() @@ -288,7 +357,7 @@ func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) { prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader2.Returns(prodSuccess) - expectSuccesses(t, producer, 10) + expectResults(t, producer, 10, 0) seedBroker.Close() leader2.Close() @@ -336,13 +405,13 @@ func TestAsyncProducerMultipleRetries(t *testing.T) { prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader2.Returns(prodSuccess) - expectSuccesses(t, producer, 10) + expectResults(t, producer, 10, 0) for i := 0; i < 10; i++ { producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} } leader2.Returns(prodSuccess) - expectSuccesses(t, producer, 10) + expectResults(t, producer, 10, 0) seedBroker.Close() leader1.Close() @@ -400,7 +469,7 @@ func TestAsyncProducerOutOfRetries(t *testing.T) { prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader.Returns(prodSuccess) - expectSuccesses(t, producer, 10) + expectResults(t, producer, 10, 0) leader.Close() seedBroker.Close() @@ -433,14 +502,14 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) { prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader.Returns(prodSuccess) - expectSuccesses(t, producer, 1) + expectResults(t, producer, 1, 0) // prime partition 1 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} prodSuccess = new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError) leader.Returns(prodSuccess) - expectSuccesses(t, producer, 1) + expectResults(t, producer, 1, 0) // reboot the broker (the producer will get EOF on its existing connection) leader.Close() @@ -456,7 +525,7 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) { prodSuccess = new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader.Returns(prodSuccess) - expectSuccesses(t, producer, 1) + expectResults(t, producer, 1, 0) // shutdown closeProducer(t, producer) @@ -493,7 +562,7 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) { prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", p, ErrNoError) leader.Returns(prodSuccess) - expectSuccesses(t, producer, 5) + expectResults(t, producer, 5, 0) } // send more messages on partition 0 @@ -511,14 +580,14 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) { prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader.Returns(prodSuccess) - expectSuccesses(t, producer, 5) + expectResults(t, producer, 5, 0) // put five more through for i := 0; i < 5; i++ { producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0} } leader.Returns(prodSuccess) - expectSuccesses(t, producer, 5) + expectResults(t, producer, 5, 0) // shutdown closeProducer(t, producer) @@ -564,7 +633,7 @@ func TestAsyncProducerRetryShutdown(t *testing.T) { prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader.Returns(prodSuccess) - expectSuccesses(t, producer, 10) + expectResults(t, producer, 10, 0) seedBroker.Close() leader.Close()