From a381b3667d81dbdfc0aae557b22260eba8e4e1fa Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Thu, 19 Mar 2015 02:06:53 -0400 Subject: [PATCH] producer: bugfix for aggregators getting stuck In circumstances where Flush.Messages and/or Flush.Bytes were set but Flush.Frequency was not, the producer's aggregator could get stuck on a retry because a metadata-only chaser message would not be enough on its own to trigger a flush, and so it would sit in limbo forever. Always trigger a flush in the aggregator when the message is a chaser. This has the additional benefit of reducing retry latency when Flush.Frequency *is* set. Add a test for this case. --- CHANGELOG.md | 2 ++ async_producer.go | 1 + async_producer_test.go | 62 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 65 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f133a6864..8a5992b47 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ Bug Fixes: - Fix the producer's internal reference counting in certain unusual scenarios ([#367](https://github.com/Shopify/sarama/pull/367)). + - Fix a condition where the producer's internal control messages could have + gotten stuck ([#368](https://github.com/Shopify/sarama/pull/368)). #### Version 1.0.0 (2015-03-17) diff --git a/async_producer.go b/async_producer.go index a7e5b5e78..f776853f4 100644 --- a/async_producer.go +++ b/async_producer.go @@ -458,6 +458,7 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMe bytesAccumulated += msg.byteSize() if defaultFlush || + msg.flags&chaser == chaser || (p.conf.Producer.Flush.Messages > 0 && len(buffer) >= p.conf.Producer.Flush.Messages) || (p.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= p.conf.Producer.Flush.Bytes) { doFlush = flusher diff --git a/async_producer_test.go b/async_producer_test.go index c6e487937..0242dda80 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -463,6 +463,68 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) { leader.Close() } +func TestAsyncProducerFlusherRetryCondition(t *testing.T) { + seedBroker := newMockBroker(t, 1) + leader := newMockBroker(t, 2) + + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) + metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) + metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError) + seedBroker.Returns(metadataResponse) + + config := NewConfig() + config.Producer.Flush.Messages = 5 + config.Producer.Return.Successes = true + config.Producer.Retry.Backoff = 0 + config.Producer.Retry.Max = 1 + config.Producer.Partitioner = NewManualPartitioner + producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + // prime partitions + for p := int32(0); p < 2; p++ { + for i := 0; i < 5; i++ { + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: p} + } + prodSuccess := new(ProduceResponse) + prodSuccess.AddTopicPartition("my_topic", p, ErrNoError) + leader.Returns(prodSuccess) + expectSuccesses(t, producer, 5) + } + + // send more messages on partition 0 + for i := 0; i < 5; i++ { + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0} + } + prodNotLeader := new(ProduceResponse) + prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition) + leader.Returns(prodNotLeader) + + // tell partition 0 to go to that broker again + seedBroker.Returns(metadataResponse) + + // succeed this time + prodSuccess := new(ProduceResponse) + prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) + leader.Returns(prodSuccess) + expectSuccesses(t, producer, 5) + + // put five more through + for i := 0; i < 5; i++ { + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0} + } + leader.Returns(prodSuccess) + expectSuccesses(t, producer, 5) + + // shutdown + closeProducer(t, producer) + seedBroker.Close() + leader.Close() +} + // This example shows how to use the producer while simultaneously // reading the Errors channel to know about any failures. func ExampleAsyncProducer_select() {