Skip to content

Commit

Permalink
consumer: shut down on OffsetOutOfRange
Browse files Browse the repository at this point in the history
The old behaviour was to redispatch it (so it would go to another broker if
necessary) and then retry with the same offset, which was a rather useless thing
to do unless the offset had somehow ended up slightly ahead of the available
messages (which is unlikely - it is far more likely to fall behind).

Instead, simply shut down the PartitionConsumer so the user gets an error (if
they're subscribed) and their messages channel closes. They then get to choose
whether to give up and switch to a different offset, yell for a human, or
whatever.
  • Loading branch information
eapache committed Apr 16, 2015
1 parent e2d70e1 commit 2c3abe1
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 0 deletions.
5 changes: 5 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,11 @@ func (w *brokerConsumer) subscriptionConsumer() {
for child := range w.subscriptions {
if err := child.handleResponse(response); err != nil {
switch err {
case ErrOffsetOutOfRange:
// there's no point in retrying this it will just fail the same way again
// so shut it down and force the user to choose what to do
child.AsyncClose()
fallthrough
default:
child.sendError(err)
fallthrough
Expand Down
32 changes: 32 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,38 @@ func TestConsumerLatestOffset(t *testing.T) {
}
}

func TestConsumerShutsDownOutOfRange(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)

fetchResponse := new(FetchResponse)
fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
leader.Returns(fetchResponse)

master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
if err != nil {
t.Fatal(err)
}
seedBroker.Close()

consumer, err := master.ConsumePartition("my_topic", 0, 101)
if err != nil {
t.Fatal(err)
}

if _, ok := <-consumer.Messages(); ok {
t.Error("Expected the consumer to shut down")
}

leader.Close()
safeClose(t, master)
}

func TestConsumerFunnyOffsets(t *testing.T) {
// for topics that are compressed and/or compacted (different things!) we have to be
// able to handle receiving offsets that are non-sequential (though still strictly increasing) and
Expand Down

0 comments on commit 2c3abe1

Please sign in to comment.