Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer: check offset before returning ConsumePartition. #418

Merged
merged 2 commits into from
Apr 16, 2015
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,22 +313,28 @@ func (child *partitionConsumer) dispatch() error {
return nil
}

func (child *partitionConsumer) chooseStartingOffset(offset int64) (err error) {
var time int64
func (child *partitionConsumer) chooseStartingOffset(offset int64) error {
newestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetNewest)
if err != nil {
return err
}
oldestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetOldest)
if err != nil {
return err
}

switch offset {
case OffsetNewest, OffsetOldest:
time = offset
default:
if offset < 0 {
return ConfigurationError("Invalid offset")
}
switch {
case offset == OffsetNewest:
child.offset = newestOffset
case offset == OffsetOldest:
child.offset = oldestOffset
case offset >= oldestOffset && offset <= newestOffset:
child.offset = offset
return nil
default:
return ErrOffsetOutOfRange
}

child.offset, err = child.consumer.client.GetOffset(child.topic, child.partition, time)
return err
return nil
}

func (child *partitionConsumer) Messages() <-chan *ConsumerMessage {
Expand Down
109 changes: 103 additions & 6 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ func TestConsumerOffsetManual(t *testing.T) {
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)

offsetResponseNewest := new(OffsetResponse)
offsetResponseNewest.AddTopicPartition("my_topic", 0, 2345)
leader.Returns(offsetResponseNewest)

offsetResponseOldest := new(OffsetResponse)
offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
leader.Returns(offsetResponseOldest)

for i := 0; i <= 10; i++ {
fetchResponse := new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+1234))
Expand Down Expand Up @@ -60,9 +68,13 @@ func TestConsumerLatestOffset(t *testing.T) {
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)

offsetResponse := new(OffsetResponse)
offsetResponse.AddTopicPartition("my_topic", 0, 0x010101)
leader.Returns(offsetResponse)
offsetResponseNewest := new(OffsetResponse)
offsetResponseNewest.AddTopicPartition("my_topic", 0, 0x010102)
leader.Returns(offsetResponseNewest)

offsetResponseOldest := new(OffsetResponse)
offsetResponseOldest.AddTopicPartition("my_topic", 0, 0x010101)
leader.Returns(offsetResponseOldest)

fetchResponse := new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 0x010101)
Expand Down Expand Up @@ -101,6 +113,14 @@ func TestConsumerFunnyOffsets(t *testing.T) {
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)

offsetResponseNewest := new(OffsetResponse)
offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
leader.Returns(offsetResponseNewest)

offsetResponseOldest := new(OffsetResponse)
offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
leader.Returns(offsetResponseOldest)

fetchResponse := new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(3))
Expand Down Expand Up @@ -152,10 +172,26 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
t.Fatal(err)
}

offsetResponseNewest0 := new(OffsetResponse)
offsetResponseNewest0.AddTopicPartition("my_topic", 0, 1234)
leader0.Returns(offsetResponseNewest0)

offsetResponseOldest0 := new(OffsetResponse)
offsetResponseOldest0.AddTopicPartition("my_topic", 0, 0)
leader0.Returns(offsetResponseOldest0)

offsetResponseNewest1 := new(OffsetResponse)
offsetResponseNewest1.AddTopicPartition("my_topic", 1, 1234)
leader1.Returns(offsetResponseNewest1)

offsetResponseOldest1 := new(OffsetResponse)
offsetResponseOldest1.AddTopicPartition("my_topic", 1, 0)
leader1.Returns(offsetResponseOldest1)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can dedupe these by moving them inside the loop

it might even be worth writing a helper function that takes mockbroker/topic/partition/newest/oldest and returns the two responses.

// we expect to end up (eventually) consuming exactly ten messages on each partition
var wg sync.WaitGroup
for i := 0; i < 2; i++ {
consumer, err := master.ConsumePartition("my_topic", int32(i), 0)
for i := int32(0); i < 2; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea

consumer, err := master.ConsumePartition("my_topic", i, 0)
if err != nil {
t.Error(err)
}
Expand All @@ -179,7 +215,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
}
safeClose(t, consumer)
wg.Done()
}(int32(i), consumer)
}(i, consumer)
}

// leader0 provides first four messages on partition 0
Expand Down Expand Up @@ -273,6 +309,14 @@ func TestConsumerInterleavedClose(t *testing.T) {
t.Fatal(err)
}

offsetResponseNewest0 := new(OffsetResponse)
offsetResponseNewest0.AddTopicPartition("my_topic", 0, 1234)
leader.Returns(offsetResponseNewest0)

offsetResponseOldest0 := new(OffsetResponse)
offsetResponseOldest0.AddTopicPartition("my_topic", 0, 0)
leader.Returns(offsetResponseOldest0)

c0, err := master.ConsumePartition("my_topic", 0, 0)
if err != nil {
t.Fatal(err)
Expand All @@ -282,6 +326,14 @@ func TestConsumerInterleavedClose(t *testing.T) {
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
leader.Returns(fetchResponse)

offsetResponseNewest1 := new(OffsetResponse)
offsetResponseNewest1.AddTopicPartition("my_topic", 1, 1234)
leader.Returns(offsetResponseNewest1)

offsetResponseOldest1 := new(OffsetResponse)
offsetResponseOldest1.AddTopicPartition("my_topic", 1, 0)
leader.Returns(offsetResponseOldest1)

c1, err := master.ConsumePartition("my_topic", 1, 0)
if err != nil {
t.Fatal(err)
Expand All @@ -298,6 +350,8 @@ func TestConsumerInterleavedClose(t *testing.T) {
}

func TestConsumerBounceWithReferenceOpen(t *testing.T) {
t.Skip("This is not yet working due to concurrency on the mock broker")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll have to add four offset responses (newest/oldest for two partitions) but I didn't think this test was particularly fragile. It should be ok as long as you don't return the second offset pair until after the first ConsumePartition call has returned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that the two consumers are talking to the same broker. The fetch request/responses that happen right after the offset responses, but we have to ensure we handle the offset requests for both consumers before we start dealing with the fetch requests. AFAICS that's not possible right now.


seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
leaderAddr := leader.Addr()
Expand Down Expand Up @@ -386,6 +440,49 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) {
safeClose(t, master)
}

func TestConsumerOffsetOutOfRange(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)

master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
if err != nil {
t.Fatal(err)
}
seedBroker.Close()

offsetResponseNewest := new(OffsetResponse)
offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)

offsetResponseOldest := new(OffsetResponse)
offsetResponseOldest.AddTopicPartition("my_topic", 0, 2345)

leader.Returns(offsetResponseNewest)
leader.Returns(offsetResponseOldest)
if _, err := master.ConsumePartition("my_topic", 0, 0); err != ErrOffsetOutOfRange {
t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
}

leader.Returns(offsetResponseNewest)
leader.Returns(offsetResponseOldest)
if _, err := master.ConsumePartition("my_topic", 0, 3456); err != ErrOffsetOutOfRange {
t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
}

leader.Returns(offsetResponseNewest)
leader.Returns(offsetResponseOldest)
if _, err := master.ConsumePartition("my_topic", 0, -3); err != ErrOffsetOutOfRange {
t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
}

leader.Close()
safeClose(t, master)
}

// This example has the simplest use case of the consumer. It simply
// iterates over the messages channel using a for/range loop. Because
// a producer never stopsunless requested, a signal handler is registered
Expand Down
25 changes: 25 additions & 0 deletions functional_consumer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package sarama

import (
"math"
"testing"
)

func TestFuncConsumerOffsetOutOfRange(t *testing.T) {
checkKafkaAvailability(t)

consumer, err := NewConsumer(kafkaBrokers, nil)
if err != nil {
t.Fatal(err)
}

if _, err := consumer.ConsumePartition("single_partition", 0, -10); err != ErrOffsetOutOfRange {
t.Error("Expected ErrOffsetOutOfRange, got:", err)
}

if _, err := consumer.ConsumePartition("single_partition", 0, math.MaxInt64); err != ErrOffsetOutOfRange {
t.Error("Expected ErrOffsetOutOfRange, got:", err)
}

safeClose(t, consumer)
}