diff --git a/CHANGELOG.md b/CHANGELOG.md index f133a6864..8a5992b47 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ Bug Fixes: - Fix the producer's internal reference counting in certain unusual scenarios ([#367](https://github.com/Shopify/sarama/pull/367)). + - Fix a condition where the producer's internal control messages could have + gotten stuck ([#368](https://github.com/Shopify/sarama/pull/368)). #### Version 1.0.0 (2015-03-17) diff --git a/async_producer.go b/async_producer.go index a7e5b5e78..f776853f4 100644 --- a/async_producer.go +++ b/async_producer.go @@ -458,6 +458,7 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMe bytesAccumulated += msg.byteSize() if defaultFlush || + msg.flags&chaser == chaser || (p.conf.Producer.Flush.Messages > 0 && len(buffer) >= p.conf.Producer.Flush.Messages) || (p.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= p.conf.Producer.Flush.Bytes) { doFlush = flusher diff --git a/async_producer_test.go b/async_producer_test.go index 04ee9576e..0242dda80 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -30,6 +30,22 @@ func closeProducer(t *testing.T, p AsyncProducer) { wg.Wait() } +func expectSuccesses(t *testing.T, p AsyncProducer, successes int) { + for i := 0; i < successes; i++ { + select { + case msg := <-p.Errors(): + t.Error(msg.Err) + if msg.Msg.flags != 0 { + t.Error("Message had flags set") + } + case msg := <-p.Successes(): + if msg.flags != 0 { + t.Error("Message had flags set") + } + } + } +} + func TestAsyncProducer(t *testing.T) { seedBroker := newMockBroker(t, 1) leader := newMockBroker(t, 2) @@ -103,19 +119,7 @@ func TestAsyncProducerMultipleFlushes(t *testing.T) { for i := 0; i < 5; i++ { producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} } - for i := 0; i < 5; i++ { - select { - case msg := <-producer.Errors(): - t.Error(msg.Err) - if msg.Msg.flags != 0 { - t.Error("Message had flags set") - } - case msg := <-producer.Successes(): - if msg.flags != 0 { - t.Error("Message had flags set") - } - } - } + expectSuccesses(t, producer, 5) } closeProducer(t, producer) @@ -155,19 +159,7 @@ func TestAsyncProducerMultipleBrokers(t *testing.T) { for i := 0; i < 10; i++ { producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} } - for i := 0; i < 10; i++ { - select { - case msg := <-producer.Errors(): - t.Error(msg.Err) - if msg.Msg.flags != 0 { - t.Error("Message had flags set") - } - case msg := <-producer.Successes(): - if msg.flags != 0 { - t.Error("Message had flags set") - } - } - } + expectSuccesses(t, producer, 10) closeProducer(t, producer) leader1.Close() @@ -210,38 +202,14 @@ func TestAsyncProducerFailureRetry(t *testing.T) { prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader2.Returns(prodSuccess) - for i := 0; i < 10; i++ { - select { - case msg := <-producer.Errors(): - t.Error(msg.Err) - if msg.Msg.flags != 0 { - t.Error("Message had flags set") - } - case msg := <-producer.Successes(): - if msg.flags != 0 { - t.Error("Message had flags set") - } - } - } + expectSuccesses(t, producer, 10) leader1.Close() for i := 0; i < 10; i++ { producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} } leader2.Returns(prodSuccess) - for i := 0; i < 10; i++ { - select { - case msg := <-producer.Errors(): - t.Error(msg.Err) - if msg.Msg.flags != 0 { - t.Error("Message had flags set") - } - case msg := <-producer.Successes(): - if msg.flags != 0 { - t.Error("Message had flags set") - } - } - } + expectSuccesses(t, producer, 10) leader2.Close() closeProducer(t, producer) @@ -276,19 +244,7 @@ func TestAsyncProducerBrokerBounce(t *testing.T) { prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader.Returns(prodSuccess) - for i := 0; i < 10; i++ { - select { - case msg := <-producer.Errors(): - t.Error(msg.Err) - if msg.Msg.flags != 0 { - t.Error("Message had flags set") - } - case msg := <-producer.Successes(): - if msg.flags != 0 { - t.Error("Message had flags set") - } - } - } + expectSuccesses(t, producer, 10) seedBroker.Close() leader.Close() @@ -331,19 +287,7 @@ func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) { prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader2.Returns(prodSuccess) - for i := 0; i < 10; i++ { - select { - case msg := <-producer.Errors(): - t.Error(msg.Err) - if msg.Msg.flags != 0 { - t.Error("Message had flags set") - } - case msg := <-producer.Successes(): - if msg.flags != 0 { - t.Error("Message had flags set") - } - } - } + expectSuccesses(t, producer, 10) seedBroker.Close() leader2.Close() @@ -391,37 +335,13 @@ func TestAsyncProducerMultipleRetries(t *testing.T) { prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader2.Returns(prodSuccess) - for i := 0; i < 10; i++ { - select { - case msg := <-producer.Errors(): - t.Error(msg.Err) - if msg.Msg.flags != 0 { - t.Error("Message had flags set") - } - case msg := <-producer.Successes(): - if msg.flags != 0 { - t.Error("Message had flags set") - } - } - } + expectSuccesses(t, producer, 10) for i := 0; i < 10; i++ { producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} } leader2.Returns(prodSuccess) - for i := 0; i < 10; i++ { - select { - case msg := <-producer.Errors(): - t.Error(msg.Err) - if msg.Msg.flags != 0 { - t.Error("Message had flags set") - } - case msg := <-producer.Successes(): - if msg.flags != 0 { - t.Error("Message had flags set") - } - } - } + expectSuccesses(t, producer, 10) seedBroker.Close() leader1.Close() @@ -479,13 +399,7 @@ func TestAsyncProducerOutOfRetries(t *testing.T) { prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader.Returns(prodSuccess) - for i := 0; i < 10; i++ { - select { - case msg := <-producer.Errors(): - t.Error(msg.Err) - case <-producer.Successes(): - } - } + expectSuccesses(t, producer, 10) leader.Close() seedBroker.Close() @@ -518,22 +432,14 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) { prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader.Returns(prodSuccess) - select { - case msg := <-producer.Errors(): - t.Error(msg.Err) - case <-producer.Successes(): - } + expectSuccesses(t, producer, 1) // prime partition 1 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} prodSuccess = new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError) leader.Returns(prodSuccess) - select { - case msg := <-producer.Errors(): - t.Error(msg.Err) - case <-producer.Successes(): - } + expectSuccesses(t, producer, 1) // reboot the broker (the producer will get EOF on its existing connection) leader.Close() @@ -549,11 +455,69 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) { prodSuccess = new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader.Returns(prodSuccess) - select { - case msg := <-producer.Errors(): - t.Error(msg.Err) - case <-producer.Successes(): + expectSuccesses(t, producer, 1) + + // shutdown + closeProducer(t, producer) + seedBroker.Close() + leader.Close() +} + +func TestAsyncProducerFlusherRetryCondition(t *testing.T) { + seedBroker := newMockBroker(t, 1) + leader := newMockBroker(t, 2) + + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) + metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) + metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError) + seedBroker.Returns(metadataResponse) + + config := NewConfig() + config.Producer.Flush.Messages = 5 + config.Producer.Return.Successes = true + config.Producer.Retry.Backoff = 0 + config.Producer.Retry.Max = 1 + config.Producer.Partitioner = NewManualPartitioner + producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + // prime partitions + for p := int32(0); p < 2; p++ { + for i := 0; i < 5; i++ { + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: p} + } + prodSuccess := new(ProduceResponse) + prodSuccess.AddTopicPartition("my_topic", p, ErrNoError) + leader.Returns(prodSuccess) + expectSuccesses(t, producer, 5) + } + + // send more messages on partition 0 + for i := 0; i < 5; i++ { + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0} + } + prodNotLeader := new(ProduceResponse) + prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition) + leader.Returns(prodNotLeader) + + // tell partition 0 to go to that broker again + seedBroker.Returns(metadataResponse) + + // succeed this time + prodSuccess := new(ProduceResponse) + prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) + leader.Returns(prodSuccess) + expectSuccesses(t, producer, 5) + + // put five more through + for i := 0; i < 5; i++ { + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0} } + leader.Returns(prodSuccess) + expectSuccesses(t, producer, 5) // shutdown closeProducer(t, producer)