Skip to content

Commit

Permalink
Add a custom-partitioner test
Browse files Browse the repository at this point in the history
  • Loading branch information
eapache committed Jun 9, 2015
1 parent 3e798fd commit b21ab84
Showing 1 changed file with 88 additions and 19 deletions.
107 changes: 88 additions & 19 deletions async_producer_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sarama

import (
"errors"
"log"
"os"
"os/signal"
Expand Down Expand Up @@ -31,22 +32,48 @@ func closeProducer(t *testing.T, p AsyncProducer) {
wg.Wait()
}

func expectSuccesses(t *testing.T, p AsyncProducer, successes int) {
for i := 0; i < successes; i++ {
func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
for successes > 0 || errors > 0 {
select {
case msg := <-p.Errors():
t.Error(msg.Err)
if msg.Msg.flags != 0 {
t.Error("Message had flags set")
}
errors--
if errors < 0 {
t.Error(msg.Err)
}
case msg := <-p.Successes():
if msg.flags != 0 {
t.Error("Message had flags set")
}
successes--
if successes < 0 {
t.Error("Too many successes")
}
}
}
}

type testPartitioner chan *int32

func (p testPartitioner) Partition(msg *ProducerMessage, numPartitions int32) (int32, error) {
part := <-p
if part == nil {
return 0, errors.New("BOOM")
}

return *part, nil
}

func (p testPartitioner) RequiresConsistency() bool {
return true
}

func (p testPartitioner) feed(partition int32) {
p <- &partition
}

func TestAsyncProducer(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
Expand Down Expand Up @@ -120,7 +147,7 @@ func TestAsyncProducerMultipleFlushes(t *testing.T) {
for i := 0; i < 5; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}
expectSuccesses(t, producer, 5)
expectResults(t, producer, 5, 0)
}

closeProducer(t, producer)
Expand Down Expand Up @@ -160,14 +187,56 @@ func TestAsyncProducerMultipleBrokers(t *testing.T) {
for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}
expectSuccesses(t, producer, 10)
expectResults(t, producer, 10, 0)

closeProducer(t, producer)
leader1.Close()
leader0.Close()
seedBroker.Close()
}

func TestAsyncProducerCustomPartitioner(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)

prodResponse := new(ProduceResponse)
prodResponse.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodResponse)

config := NewConfig()
config.Producer.Flush.Messages = 2
config.Producer.Return.Successes = true
config.Producer.Partitioner = func(topic string) Partitioner {
p := make(testPartitioner)
go func() {
p.feed(0)
p <- nil
p <- nil
p <- nil
p.feed(0)
}()
return p
}
producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

for i := 0; i < 5; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}
expectResults(t, producer, 2, 3)

closeProducer(t, producer)
leader.Close()
seedBroker.Close()
}

func TestAsyncProducerFailureRetry(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader1 := newMockBroker(t, 2)
Expand Down Expand Up @@ -203,14 +272,14 @@ func TestAsyncProducerFailureRetry(t *testing.T) {
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader2.Returns(prodSuccess)
expectSuccesses(t, producer, 10)
expectResults(t, producer, 10, 0)
leader1.Close()

for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}
leader2.Returns(prodSuccess)
expectSuccesses(t, producer, 10)
expectResults(t, producer, 10, 0)

leader2.Close()
closeProducer(t, producer)
Expand Down Expand Up @@ -245,7 +314,7 @@ func TestAsyncProducerBrokerBounce(t *testing.T) {
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)
expectSuccesses(t, producer, 10)
expectResults(t, producer, 10, 0)
seedBroker.Close()
leader.Close()

Expand Down Expand Up @@ -288,7 +357,7 @@ func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader2.Returns(prodSuccess)
expectSuccesses(t, producer, 10)
expectResults(t, producer, 10, 0)
seedBroker.Close()
leader2.Close()

Expand Down Expand Up @@ -336,13 +405,13 @@ func TestAsyncProducerMultipleRetries(t *testing.T) {
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader2.Returns(prodSuccess)
expectSuccesses(t, producer, 10)
expectResults(t, producer, 10, 0)

for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}
leader2.Returns(prodSuccess)
expectSuccesses(t, producer, 10)
expectResults(t, producer, 10, 0)

seedBroker.Close()
leader1.Close()
Expand Down Expand Up @@ -400,7 +469,7 @@ func TestAsyncProducerOutOfRetries(t *testing.T) {
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)

expectSuccesses(t, producer, 10)
expectResults(t, producer, 10, 0)

leader.Close()
seedBroker.Close()
Expand Down Expand Up @@ -433,14 +502,14 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)
expectSuccesses(t, producer, 1)
expectResults(t, producer, 1, 0)

// prime partition 1
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
prodSuccess = new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
leader.Returns(prodSuccess)
expectSuccesses(t, producer, 1)
expectResults(t, producer, 1, 0)

// reboot the broker (the producer will get EOF on its existing connection)
leader.Close()
Expand All @@ -456,7 +525,7 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
prodSuccess = new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)
expectSuccesses(t, producer, 1)
expectResults(t, producer, 1, 0)

// shutdown
closeProducer(t, producer)
Expand Down Expand Up @@ -493,7 +562,7 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", p, ErrNoError)
leader.Returns(prodSuccess)
expectSuccesses(t, producer, 5)
expectResults(t, producer, 5, 0)
}

// send more messages on partition 0
Expand All @@ -511,14 +580,14 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)
expectSuccesses(t, producer, 5)
expectResults(t, producer, 5, 0)

// put five more through
for i := 0; i < 5; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
}
leader.Returns(prodSuccess)
expectSuccesses(t, producer, 5)
expectResults(t, producer, 5, 0)

// shutdown
closeProducer(t, producer)
Expand Down Expand Up @@ -564,7 +633,7 @@ func TestAsyncProducerRetryShutdown(t *testing.T) {
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)
expectSuccesses(t, producer, 10)
expectResults(t, producer, 10, 0)

seedBroker.Close()
leader.Close()
Expand Down

0 comments on commit b21ab84

Please sign in to comment.