Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer: check offset before returning ConsumePartition. #418

Merged
merged 2 commits into from
Apr 16, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,22 +313,28 @@ func (child *partitionConsumer) dispatch() error {
return nil
}

func (child *partitionConsumer) chooseStartingOffset(offset int64) (err error) {
var time int64
func (child *partitionConsumer) chooseStartingOffset(offset int64) error {
newestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetNewest)
if err != nil {
return err
}
oldestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetOldest)
if err != nil {
return err
}

switch offset {
case OffsetNewest, OffsetOldest:
time = offset
default:
if offset < 0 {
return ConfigurationError("Invalid offset")
}
switch {
case offset == OffsetNewest:
child.offset = newestOffset
case offset == OffsetOldest:
child.offset = oldestOffset
case offset >= oldestOffset && offset <= newestOffset:
child.offset = offset
return nil
default:
return ErrOffsetOutOfRange
}

child.offset, err = child.consumer.client.GetOffset(child.topic, child.partition, time)
return err
return nil
}

func (child *partitionConsumer) Messages() <-chan *ConsumerMessage {
Expand Down
142 changes: 135 additions & 7 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ func TestConsumerOffsetManual(t *testing.T) {
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)

offsetResponseNewest := new(OffsetResponse)
offsetResponseNewest.AddTopicPartition("my_topic", 0, 2345)
leader.Returns(offsetResponseNewest)

offsetResponseOldest := new(OffsetResponse)
offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
leader.Returns(offsetResponseOldest)

for i := 0; i <= 10; i++ {
fetchResponse := new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+1234))
Expand Down Expand Up @@ -60,9 +68,13 @@ func TestConsumerLatestOffset(t *testing.T) {
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)

offsetResponse := new(OffsetResponse)
offsetResponse.AddTopicPartition("my_topic", 0, 0x010101)
leader.Returns(offsetResponse)
offsetResponseNewest := new(OffsetResponse)
offsetResponseNewest.AddTopicPartition("my_topic", 0, 0x010102)
leader.Returns(offsetResponseNewest)

offsetResponseOldest := new(OffsetResponse)
offsetResponseOldest.AddTopicPartition("my_topic", 0, 0x010101)
leader.Returns(offsetResponseOldest)

fetchResponse := new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 0x010101)
Expand Down Expand Up @@ -101,6 +113,14 @@ func TestConsumerFunnyOffsets(t *testing.T) {
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)

offsetResponseNewest := new(OffsetResponse)
offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
leader.Returns(offsetResponseNewest)

offsetResponseOldest := new(OffsetResponse)
offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
leader.Returns(offsetResponseOldest)

fetchResponse := new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(3))
Expand Down Expand Up @@ -152,10 +172,26 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
t.Fatal(err)
}

offsetResponseNewest0 := new(OffsetResponse)
offsetResponseNewest0.AddTopicPartition("my_topic", 0, 1234)
leader0.Returns(offsetResponseNewest0)

offsetResponseOldest0 := new(OffsetResponse)
offsetResponseOldest0.AddTopicPartition("my_topic", 0, 0)
leader0.Returns(offsetResponseOldest0)

offsetResponseNewest1 := new(OffsetResponse)
offsetResponseNewest1.AddTopicPartition("my_topic", 1, 1234)
leader1.Returns(offsetResponseNewest1)

offsetResponseOldest1 := new(OffsetResponse)
offsetResponseOldest1.AddTopicPartition("my_topic", 1, 0)
leader1.Returns(offsetResponseOldest1)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can dedupe these by moving them inside the loop

it might even be worth writing a helper function that takes mockbroker/topic/partition/newest/oldest and returns the two responses.

// we expect to end up (eventually) consuming exactly ten messages on each partition
var wg sync.WaitGroup
for i := 0; i < 2; i++ {
consumer, err := master.ConsumePartition("my_topic", int32(i), 0)
for i := int32(0); i < 2; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea

consumer, err := master.ConsumePartition("my_topic", i, 0)
if err != nil {
t.Error(err)
}
Expand All @@ -179,7 +215,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
}
safeClose(t, consumer)
wg.Done()
}(int32(i), consumer)
}(i, consumer)
}

// leader0 provides first four messages on partition 0
Expand Down Expand Up @@ -273,6 +309,14 @@ func TestConsumerInterleavedClose(t *testing.T) {
t.Fatal(err)
}

offsetResponseNewest0 := new(OffsetResponse)
offsetResponseNewest0.AddTopicPartition("my_topic", 0, 1234)
leader.Returns(offsetResponseNewest0)

offsetResponseOldest0 := new(OffsetResponse)
offsetResponseOldest0.AddTopicPartition("my_topic", 0, 0)
leader.Returns(offsetResponseOldest0)

c0, err := master.ConsumePartition("my_topic", 0, 0)
if err != nil {
t.Fatal(err)
Expand All @@ -282,6 +326,14 @@ func TestConsumerInterleavedClose(t *testing.T) {
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
leader.Returns(fetchResponse)

offsetResponseNewest1 := new(OffsetResponse)
offsetResponseNewest1.AddTopicPartition("my_topic", 1, 1234)
leader.Returns(offsetResponseNewest1)

offsetResponseOldest1 := new(OffsetResponse)
offsetResponseOldest1.AddTopicPartition("my_topic", 1, 0)
leader.Returns(offsetResponseOldest1)

c1, err := master.ConsumePartition("my_topic", 1, 0)
if err != nil {
t.Fatal(err)
Expand All @@ -301,11 +353,13 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
leaderAddr := leader.Addr()
tmp := newMockBroker(t, 3)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddBroker(tmp.Addr(), tmp.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
metadataResponse.AddTopicPartition("my_topic", 1, tmp.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)

config := NewConfig()
Expand All @@ -317,17 +371,44 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) {
t.Fatal(err)
}

offsetResponseNewest := new(OffsetResponse)
offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
leader.Returns(offsetResponseNewest)

offsetResponseOldest := new(OffsetResponse)
offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
leader.Returns(offsetResponseOldest)

c0, err := master.ConsumePartition("my_topic", 0, 0)
if err != nil {
t.Fatal(err)
}

offsetResponseNewest = new(OffsetResponse)
offsetResponseNewest.AddTopicPartition("my_topic", 1, 1234)
tmp.Returns(offsetResponseNewest)

offsetResponseOldest = new(OffsetResponse)
offsetResponseOldest.AddTopicPartition("my_topic", 1, 0)
tmp.Returns(offsetResponseOldest)

c1, err := master.ConsumePartition("my_topic", 1, 0)
if err != nil {
t.Fatal(err)
}

//redirect partition 1 back to main leader
fetchResponse := new(FetchResponse)
fetchResponse.AddError("my_topic", 1, ErrNotLeaderForPartition)
tmp.Returns(fetchResponse)
metadataResponse = new(MetadataResponse)
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)
time.Sleep(5 * time.Millisecond)

// now send one message to each partition to make sure everything is primed
fetchResponse = new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
fetchResponse.AddError("my_topic", 1, ErrNoError)
leader.Returns(fetchResponse)
Expand All @@ -339,6 +420,7 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) {
leader.Returns(fetchResponse)
<-c1.Messages()

// bounce the broker
leader.Close()
leader = newMockBrokerAddr(t, 2, leaderAddr)

Expand All @@ -365,6 +447,8 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) {
// send it back to the same broker
seedBroker.Returns(metadataResponse)

time.Sleep(5 * time.Millisecond)

select {
case <-c0.Messages():
case <-c1.Messages():
Expand All @@ -384,6 +468,50 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) {
}()
wg.Wait()
safeClose(t, master)
tmp.Close()
}

func TestConsumerOffsetOutOfRange(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)

master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
if err != nil {
t.Fatal(err)
}
seedBroker.Close()

offsetResponseNewest := new(OffsetResponse)
offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)

offsetResponseOldest := new(OffsetResponse)
offsetResponseOldest.AddTopicPartition("my_topic", 0, 2345)

leader.Returns(offsetResponseNewest)
leader.Returns(offsetResponseOldest)
if _, err := master.ConsumePartition("my_topic", 0, 0); err != ErrOffsetOutOfRange {
t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
}

leader.Returns(offsetResponseNewest)
leader.Returns(offsetResponseOldest)
if _, err := master.ConsumePartition("my_topic", 0, 3456); err != ErrOffsetOutOfRange {
t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
}

leader.Returns(offsetResponseNewest)
leader.Returns(offsetResponseOldest)
if _, err := master.ConsumePartition("my_topic", 0, -3); err != ErrOffsetOutOfRange {
t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
}

leader.Close()
safeClose(t, master)
}

// This example has the simplest use case of the consumer. It simply
Expand Down
25 changes: 25 additions & 0 deletions functional_consumer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package sarama

import (
"math"
"testing"
)

func TestFuncConsumerOffsetOutOfRange(t *testing.T) {
checkKafkaAvailability(t)

consumer, err := NewConsumer(kafkaBrokers, nil)
if err != nil {
t.Fatal(err)
}

if _, err := consumer.ConsumePartition("single_partition", 0, -10); err != ErrOffsetOutOfRange {
t.Error("Expected ErrOffsetOutOfRange, got:", err)
}

if _, err := consumer.ConsumePartition("single_partition", 0, math.MaxInt64); err != ErrOffsetOutOfRange {
t.Error("Expected ErrOffsetOutOfRange, got:", err)
}

safeClose(t, consumer)
}
1 change: 1 addition & 0 deletions mockbroker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func newMockBrokerAddr(t *testing.T, brokerID int32, addr string) *mockBroker {
if err != nil {
t.Fatal(err)
}
Logger.Printf("mockbroker/%d listening on %s\n", brokerID, broker.listener.Addr().String())
_, portStr, err := net.SplitHostPort(broker.listener.Addr().String())
if err != nil {
t.Fatal(err)
Expand Down