From 807de26ba51da5f4225e687e0e3eb18c1567d125 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Thu, 14 Jun 2018 14:00:58 -0400 Subject: [PATCH] Fix block on control messages Move the skip for those block to after the point where we've parsed/incremented the offset so we don't get stuck on a response containing *only* control messages. --- consumer.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/consumer.go b/consumer.go index e00aa01a7..33d9d143f 100644 --- a/consumer.go +++ b/consumer.go @@ -579,10 +579,6 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu messages := []*ConsumerMessage{} for _, records := range block.RecordsSet { - if control, err := records.isControl(); err != nil || control { - continue - } - switch records.recordsType { case legacyRecords: messageSetMessages, err := child.parseMessages(records.MsgSet) @@ -596,6 +592,9 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu if err != nil { return nil, err } + if control, err := records.isControl(); err != nil || control { + continue + } messages = append(messages, recordBatchMessages...) default: