From d3f8616135af395a85abf219eac600e08d0dc206 Mon Sep 17 00:00:00 2001 From: Adam Dratwinski Date: Tue, 24 Apr 2018 16:55:50 +0200 Subject: [PATCH] Unblock consumer when receiving invalid FetchResponse --- consumer.go | 2 +- consumer_test.go | 51 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/consumer.go b/consumer.go index be4e6d4a3..adc33a122 100644 --- a/consumer.go +++ b/consumer.go @@ -528,7 +528,7 @@ func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMes child.offset = offset + 1 } if len(messages) == 0 { - return nil, ErrIncompleteResponse + child.offset += 1 } return messages, nil } diff --git a/consumer_test.go b/consumer_test.go index 881e0fe82..4bd662908 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -448,6 +448,57 @@ func TestConsumerExtraOffsets(t *testing.T) { } } +// In some situations broker may return a block containing only +// messages older then requested, even though there would be +// more messages if higher offset was requested. +func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) { + // Given + fetchResponse1 := &FetchResponse{Version: 4} + fetchResponse1.AddRecord("my_topic", 0, nil, testMsg, 1) + + fetchResponse2 := &FetchResponse{Version: 4} + fetchResponse2.AddRecord("my_topic", 0, nil, testMsg, 1000000) + + cfg := NewConfig() + cfg.Consumer.Return.Errors = true + cfg.Version = V1_1_0_0 + + broker0 := NewMockBroker(t, 0) + + broker0.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("my_topic", 0, broker0.BrokerID()), + "OffsetRequest": NewMockOffsetResponse(t). + SetVersion(1). + SetOffset("my_topic", 0, OffsetNewest, 1234). + SetOffset("my_topic", 0, OffsetOldest, 0), + "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2), + }) + + master, err := NewConsumer([]string{broker0.Addr()}, cfg) + if err != nil { + t.Fatal(err) + } + + // When + consumer, err := master.ConsumePartition("my_topic", 0, 2) + if err != nil { + t.Fatal(err) + } + + select { + case msg := <-consumer.Messages(): + assertMessageOffset(t, msg, 1000000) + case err := <-consumer.Errors(): + t.Fatal(err) + } + + safeClose(t, consumer) + safeClose(t, master) + broker0.Close() +} + func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) { // Given fetchResponse1 := &FetchResponse{Version: 4}