Skip to content

Commit

Permalink
DRY up a common producer test pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
eapache committed Mar 19, 2015
1 parent 83a294c commit c42b7aa
Showing 1 changed file with 28 additions and 126 deletions.
154 changes: 28 additions & 126 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,22 @@ func closeProducer(t *testing.T, p AsyncProducer) {
wg.Wait()
}

func expectSuccesses(t *testing.T, p AsyncProducer, successes int) {
for i := 0; i < successes; i++ {
select {
case msg := <-p.Errors():
t.Error(msg.Err)
if msg.Msg.flags != 0 {
t.Error("Message had flags set")
}
case msg := <-p.Successes():
if msg.flags != 0 {
t.Error("Message had flags set")
}
}
}
}

func TestAsyncProducer(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
Expand Down Expand Up @@ -103,19 +119,7 @@ func TestAsyncProducerMultipleFlushes(t *testing.T) {
for i := 0; i < 5; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}
for i := 0; i < 5; i++ {
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
if msg.Msg.flags != 0 {
t.Error("Message had flags set")
}
case msg := <-producer.Successes():
if msg.flags != 0 {
t.Error("Message had flags set")
}
}
}
expectSuccesses(t, producer, 5)
}

closeProducer(t, producer)
Expand Down Expand Up @@ -155,19 +159,7 @@ func TestAsyncProducerMultipleBrokers(t *testing.T) {
for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}
for i := 0; i < 10; i++ {
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
if msg.Msg.flags != 0 {
t.Error("Message had flags set")
}
case msg := <-producer.Successes():
if msg.flags != 0 {
t.Error("Message had flags set")
}
}
}
expectSuccesses(t, producer, 10)

closeProducer(t, producer)
leader1.Close()
Expand Down Expand Up @@ -210,38 +202,14 @@ func TestAsyncProducerFailureRetry(t *testing.T) {
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader2.Returns(prodSuccess)
for i := 0; i < 10; i++ {
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
if msg.Msg.flags != 0 {
t.Error("Message had flags set")
}
case msg := <-producer.Successes():
if msg.flags != 0 {
t.Error("Message had flags set")
}
}
}
expectSuccesses(t, producer, 10)
leader1.Close()

for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}
leader2.Returns(prodSuccess)
for i := 0; i < 10; i++ {
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
if msg.Msg.flags != 0 {
t.Error("Message had flags set")
}
case msg := <-producer.Successes():
if msg.flags != 0 {
t.Error("Message had flags set")
}
}
}
expectSuccesses(t, producer, 10)

leader2.Close()
closeProducer(t, producer)
Expand Down Expand Up @@ -276,19 +244,7 @@ func TestAsyncProducerBrokerBounce(t *testing.T) {
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)
for i := 0; i < 10; i++ {
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
if msg.Msg.flags != 0 {
t.Error("Message had flags set")
}
case msg := <-producer.Successes():
if msg.flags != 0 {
t.Error("Message had flags set")
}
}
}
expectSuccesses(t, producer, 10)
seedBroker.Close()
leader.Close()

Expand Down Expand Up @@ -331,19 +287,7 @@ func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader2.Returns(prodSuccess)
for i := 0; i < 10; i++ {
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
if msg.Msg.flags != 0 {
t.Error("Message had flags set")
}
case msg := <-producer.Successes():
if msg.flags != 0 {
t.Error("Message had flags set")
}
}
}
expectSuccesses(t, producer, 10)
seedBroker.Close()
leader2.Close()

Expand Down Expand Up @@ -391,37 +335,13 @@ func TestAsyncProducerMultipleRetries(t *testing.T) {
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader2.Returns(prodSuccess)
for i := 0; i < 10; i++ {
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
if msg.Msg.flags != 0 {
t.Error("Message had flags set")
}
case msg := <-producer.Successes():
if msg.flags != 0 {
t.Error("Message had flags set")
}
}
}
expectSuccesses(t, producer, 10)

for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}
leader2.Returns(prodSuccess)
for i := 0; i < 10; i++ {
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
if msg.Msg.flags != 0 {
t.Error("Message had flags set")
}
case msg := <-producer.Successes():
if msg.flags != 0 {
t.Error("Message had flags set")
}
}
}
expectSuccesses(t, producer, 10)

seedBroker.Close()
leader1.Close()
Expand Down Expand Up @@ -479,13 +399,7 @@ func TestAsyncProducerOutOfRetries(t *testing.T) {
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)

for i := 0; i < 10; i++ {
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
case <-producer.Successes():
}
}
expectSuccesses(t, producer, 10)

leader.Close()
seedBroker.Close()
Expand Down Expand Up @@ -518,22 +432,14 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
case <-producer.Successes():
}
expectSuccesses(t, producer, 1)

// prime partition 1
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
prodSuccess = new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
leader.Returns(prodSuccess)
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
case <-producer.Successes():
}
expectSuccesses(t, producer, 1)

// reboot the broker (the producer will get EOF on its existing connection)
leader.Close()
Expand All @@ -549,11 +455,7 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
prodSuccess = new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
case <-producer.Successes():
}
expectSuccesses(t, producer, 1)

// shutdown
closeProducer(t, producer)
Expand Down

0 comments on commit c42b7aa

Please sign in to comment.