From d629146c3e5f32dde47a23766309123404a2199b Mon Sep 17 00:00:00 2001 From: Willem van Bergen Date: Sat, 11 Apr 2015 14:58:36 -0400 Subject: [PATCH 1/2] Consumer: check offset before returning ConsumePartition. When calling ConsumePartition, always check whether the offset is within the offset range. This means we now have to do two OffsetRequests for every ConsumePartition call, even if the offset is provided. The good news is that the method will immediately return an error and never start a goroutine, instead of starting the goroutine and returning an error in the Errors() channel which you can easily ignore. --- consumer.go | 30 ++++++---- consumer_test.go | 109 ++++++++++++++++++++++++++++++++++-- functional_consumer_test.go | 25 +++++++++ 3 files changed, 146 insertions(+), 18 deletions(-) create mode 100644 functional_consumer_test.go diff --git a/consumer.go b/consumer.go index 4c2b1d7b5..25635c026 100644 --- a/consumer.go +++ b/consumer.go @@ -313,22 +313,28 @@ func (child *partitionConsumer) dispatch() error { return nil } -func (child *partitionConsumer) chooseStartingOffset(offset int64) (err error) { - var time int64 +func (child *partitionConsumer) chooseStartingOffset(offset int64) error { + newestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetNewest) + if err != nil { + return err + } + oldestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetOldest) + if err != nil { + return err + } - switch offset { - case OffsetNewest, OffsetOldest: - time = offset - default: - if offset < 0 { - return ConfigurationError("Invalid offset") - } + switch { + case offset == OffsetNewest: + child.offset = newestOffset + case offset == OffsetOldest: + child.offset = oldestOffset + case offset >= oldestOffset && offset <= newestOffset: child.offset = offset - return nil + default: + return ErrOffsetOutOfRange } - child.offset, err = child.consumer.client.GetOffset(child.topic, child.partition, time) - return err + return nil } func (child *partitionConsumer) Messages() <-chan *ConsumerMessage { diff --git a/consumer_test.go b/consumer_test.go index bfae552de..85a2b5888 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -18,6 +18,14 @@ func TestConsumerOffsetManual(t *testing.T) { metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) seedBroker.Returns(metadataResponse) + offsetResponseNewest := new(OffsetResponse) + offsetResponseNewest.AddTopicPartition("my_topic", 0, 2345) + leader.Returns(offsetResponseNewest) + + offsetResponseOldest := new(OffsetResponse) + offsetResponseOldest.AddTopicPartition("my_topic", 0, 0) + leader.Returns(offsetResponseOldest) + for i := 0; i <= 10; i++ { fetchResponse := new(FetchResponse) fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+1234)) @@ -60,9 +68,13 @@ func TestConsumerLatestOffset(t *testing.T) { metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) seedBroker.Returns(metadataResponse) - offsetResponse := new(OffsetResponse) - offsetResponse.AddTopicPartition("my_topic", 0, 0x010101) - leader.Returns(offsetResponse) + offsetResponseNewest := new(OffsetResponse) + offsetResponseNewest.AddTopicPartition("my_topic", 0, 0x010102) + leader.Returns(offsetResponseNewest) + + offsetResponseOldest := new(OffsetResponse) + offsetResponseOldest.AddTopicPartition("my_topic", 0, 0x010101) + leader.Returns(offsetResponseOldest) fetchResponse := new(FetchResponse) fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 0x010101) @@ -101,6 +113,14 @@ func TestConsumerFunnyOffsets(t *testing.T) { metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) seedBroker.Returns(metadataResponse) + offsetResponseNewest := new(OffsetResponse) + offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234) + leader.Returns(offsetResponseNewest) + + offsetResponseOldest := new(OffsetResponse) + offsetResponseOldest.AddTopicPartition("my_topic", 0, 0) + leader.Returns(offsetResponseOldest) + fetchResponse := new(FetchResponse) fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1)) fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(3)) @@ -152,10 +172,26 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { t.Fatal(err) } + offsetResponseNewest0 := new(OffsetResponse) + offsetResponseNewest0.AddTopicPartition("my_topic", 0, 1234) + leader0.Returns(offsetResponseNewest0) + + offsetResponseOldest0 := new(OffsetResponse) + offsetResponseOldest0.AddTopicPartition("my_topic", 0, 0) + leader0.Returns(offsetResponseOldest0) + + offsetResponseNewest1 := new(OffsetResponse) + offsetResponseNewest1.AddTopicPartition("my_topic", 1, 1234) + leader1.Returns(offsetResponseNewest1) + + offsetResponseOldest1 := new(OffsetResponse) + offsetResponseOldest1.AddTopicPartition("my_topic", 1, 0) + leader1.Returns(offsetResponseOldest1) + // we expect to end up (eventually) consuming exactly ten messages on each partition var wg sync.WaitGroup - for i := 0; i < 2; i++ { - consumer, err := master.ConsumePartition("my_topic", int32(i), 0) + for i := int32(0); i < 2; i++ { + consumer, err := master.ConsumePartition("my_topic", i, 0) if err != nil { t.Error(err) } @@ -179,7 +215,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { } safeClose(t, consumer) wg.Done() - }(int32(i), consumer) + }(i, consumer) } // leader0 provides first four messages on partition 0 @@ -273,6 +309,14 @@ func TestConsumerInterleavedClose(t *testing.T) { t.Fatal(err) } + offsetResponseNewest0 := new(OffsetResponse) + offsetResponseNewest0.AddTopicPartition("my_topic", 0, 1234) + leader.Returns(offsetResponseNewest0) + + offsetResponseOldest0 := new(OffsetResponse) + offsetResponseOldest0.AddTopicPartition("my_topic", 0, 0) + leader.Returns(offsetResponseOldest0) + c0, err := master.ConsumePartition("my_topic", 0, 0) if err != nil { t.Fatal(err) @@ -282,6 +326,14 @@ func TestConsumerInterleavedClose(t *testing.T) { fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0)) leader.Returns(fetchResponse) + offsetResponseNewest1 := new(OffsetResponse) + offsetResponseNewest1.AddTopicPartition("my_topic", 1, 1234) + leader.Returns(offsetResponseNewest1) + + offsetResponseOldest1 := new(OffsetResponse) + offsetResponseOldest1.AddTopicPartition("my_topic", 1, 0) + leader.Returns(offsetResponseOldest1) + c1, err := master.ConsumePartition("my_topic", 1, 0) if err != nil { t.Fatal(err) @@ -298,6 +350,8 @@ func TestConsumerInterleavedClose(t *testing.T) { } func TestConsumerBounceWithReferenceOpen(t *testing.T) { + t.Skip("This is not yet working due to concurrency on the mock broker") + seedBroker := newMockBroker(t, 1) leader := newMockBroker(t, 2) leaderAddr := leader.Addr() @@ -386,6 +440,49 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) { safeClose(t, master) } +func TestConsumerOffsetOutOfRange(t *testing.T) { + seedBroker := newMockBroker(t, 1) + leader := newMockBroker(t, 2) + + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) + metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) + seedBroker.Returns(metadataResponse) + + master, err := NewConsumer([]string{seedBroker.Addr()}, nil) + if err != nil { + t.Fatal(err) + } + seedBroker.Close() + + offsetResponseNewest := new(OffsetResponse) + offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234) + + offsetResponseOldest := new(OffsetResponse) + offsetResponseOldest.AddTopicPartition("my_topic", 0, 2345) + + leader.Returns(offsetResponseNewest) + leader.Returns(offsetResponseOldest) + if _, err := master.ConsumePartition("my_topic", 0, 0); err != ErrOffsetOutOfRange { + t.Fatal("Should return ErrOffsetOutOfRange, got:", err) + } + + leader.Returns(offsetResponseNewest) + leader.Returns(offsetResponseOldest) + if _, err := master.ConsumePartition("my_topic", 0, 3456); err != ErrOffsetOutOfRange { + t.Fatal("Should return ErrOffsetOutOfRange, got:", err) + } + + leader.Returns(offsetResponseNewest) + leader.Returns(offsetResponseOldest) + if _, err := master.ConsumePartition("my_topic", 0, -3); err != ErrOffsetOutOfRange { + t.Fatal("Should return ErrOffsetOutOfRange, got:", err) + } + + leader.Close() + safeClose(t, master) +} + // This example has the simplest use case of the consumer. It simply // iterates over the messages channel using a for/range loop. Because // a producer never stopsunless requested, a signal handler is registered diff --git a/functional_consumer_test.go b/functional_consumer_test.go new file mode 100644 index 000000000..28d049504 --- /dev/null +++ b/functional_consumer_test.go @@ -0,0 +1,25 @@ +package sarama + +import ( + "math" + "testing" +) + +func TestFuncConsumerOffsetOutOfRange(t *testing.T) { + checkKafkaAvailability(t) + + consumer, err := NewConsumer(kafkaBrokers, nil) + if err != nil { + t.Fatal(err) + } + + if _, err := consumer.ConsumePartition("single_partition", 0, -10); err != ErrOffsetOutOfRange { + t.Error("Expected ErrOffsetOutOfRange, got:", err) + } + + if _, err := consumer.ConsumePartition("single_partition", 0, math.MaxInt64); err != ErrOffsetOutOfRange { + t.Error("Expected ErrOffsetOutOfRange, got:", err) + } + + safeClose(t, consumer) +} From 83721053d33c63c7dba2d86330fb2a20806feaa7 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Thu, 16 Apr 2015 15:13:00 +0000 Subject: [PATCH 2/2] Fix TestConsumerBounceWithReferenceOpen --- consumer_test.go | 37 ++++++++++++++++++++++++++++++++++--- mockbroker_test.go | 1 + 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/consumer_test.go b/consumer_test.go index 85a2b5888..3b899a801 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -350,16 +350,16 @@ func TestConsumerInterleavedClose(t *testing.T) { } func TestConsumerBounceWithReferenceOpen(t *testing.T) { - t.Skip("This is not yet working due to concurrency on the mock broker") - seedBroker := newMockBroker(t, 1) leader := newMockBroker(t, 2) leaderAddr := leader.Addr() + tmp := newMockBroker(t, 3) metadataResponse := new(MetadataResponse) metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) + metadataResponse.AddBroker(tmp.Addr(), tmp.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) - metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError) + metadataResponse.AddTopicPartition("my_topic", 1, tmp.BrokerID(), nil, nil, ErrNoError) seedBroker.Returns(metadataResponse) config := NewConfig() @@ -371,17 +371,44 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) { t.Fatal(err) } + offsetResponseNewest := new(OffsetResponse) + offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234) + leader.Returns(offsetResponseNewest) + + offsetResponseOldest := new(OffsetResponse) + offsetResponseOldest.AddTopicPartition("my_topic", 0, 0) + leader.Returns(offsetResponseOldest) + c0, err := master.ConsumePartition("my_topic", 0, 0) if err != nil { t.Fatal(err) } + offsetResponseNewest = new(OffsetResponse) + offsetResponseNewest.AddTopicPartition("my_topic", 1, 1234) + tmp.Returns(offsetResponseNewest) + + offsetResponseOldest = new(OffsetResponse) + offsetResponseOldest.AddTopicPartition("my_topic", 1, 0) + tmp.Returns(offsetResponseOldest) + c1, err := master.ConsumePartition("my_topic", 1, 0) if err != nil { t.Fatal(err) } + //redirect partition 1 back to main leader fetchResponse := new(FetchResponse) + fetchResponse.AddError("my_topic", 1, ErrNotLeaderForPartition) + tmp.Returns(fetchResponse) + metadataResponse = new(MetadataResponse) + metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) + metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError) + seedBroker.Returns(metadataResponse) + time.Sleep(5 * time.Millisecond) + + // now send one message to each partition to make sure everything is primed + fetchResponse = new(FetchResponse) fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0)) fetchResponse.AddError("my_topic", 1, ErrNoError) leader.Returns(fetchResponse) @@ -393,6 +420,7 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) { leader.Returns(fetchResponse) <-c1.Messages() + // bounce the broker leader.Close() leader = newMockBrokerAddr(t, 2, leaderAddr) @@ -419,6 +447,8 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) { // send it back to the same broker seedBroker.Returns(metadataResponse) + time.Sleep(5 * time.Millisecond) + select { case <-c0.Messages(): case <-c1.Messages(): @@ -438,6 +468,7 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) { }() wg.Wait() safeClose(t, master) + tmp.Close() } func TestConsumerOffsetOutOfRange(t *testing.T) { diff --git a/mockbroker_test.go b/mockbroker_test.go index b99c18c88..d131cb0c7 100644 --- a/mockbroker_test.go +++ b/mockbroker_test.go @@ -146,6 +146,7 @@ func newMockBrokerAddr(t *testing.T, brokerID int32, addr string) *mockBroker { if err != nil { t.Fatal(err) } + Logger.Printf("mockbroker/%d listening on %s\n", brokerID, broker.listener.Addr().String()) _, portStr, err := net.SplitHostPort(broker.listener.Addr().String()) if err != nil { t.Fatal(err)