Skip to content

Commit

Permalink
Merge pull request #329 from Shopify/consumer-test-325
Browse files Browse the repository at this point in the history
Add a test for bug #325
  • Loading branch information
eapache committed Mar 10, 2015
2 parents f732a1b + ea09ec0 commit 3ea0b8f
Showing 1 changed file with 45 additions and 0 deletions.
45 changes: 45 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ func TestConsumerFunnyOffsets(t *testing.T) {
}

consumer, err := master.ConsumePartition("my_topic", 0, 2)
if err != nil {
t.Fatal(err)
}

message := <-consumer.Messages()
if message.Offset != 3 {
Expand Down Expand Up @@ -245,6 +248,48 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
seedBroker.Close()
}

func TestConsumerInterleavedClose(t *testing.T) {
t.Skip("Enable once bug #325 is fixed.")

seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)

config := NewConfig()
config.ChannelBufferSize = 0
master, err := NewConsumer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

c0, err := master.ConsumePartition("my_topic", 0, 0)
if err != nil {
t.Fatal(err)
}

fetchResponse := new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
leader.Returns(fetchResponse)

c1, err := master.ConsumePartition("my_topic", 1, 0)
if err != nil {
t.Fatal(err)
}

fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
leader.Returns(fetchResponse)

safeClose(t, c1)
safeClose(t, c0)
leader.Close()
seedBroker.Close()
}

// This example shows how to use a consumer with a select statement
// dealing with the different channels.
func ExampleConsumer_select() {
Expand Down

0 comments on commit 3ea0b8f

Please sign in to comment.