From de5fea31e139b8dda4e5ab2f773796e5ccf45502 Mon Sep 17 00:00:00 2001 From: Collin Van Dyck Date: Mon, 18 Oct 2021 14:17:05 -0400 Subject: [PATCH] Fix bug when seeking offsets in compressed batches The reader was failing to mark the current message as read when fast forwarding through a compressed message set with more than one inner messages. Additionally, it was only discarding the message key instead of both the key and the value. After fixing this, the reader was able to discard the message correctly, and start parsing the next message header. --- message_reader.go | 33 ++++-- message_test.go | 279 +++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 271 insertions(+), 41 deletions(-) diff --git a/message_reader.go b/message_reader.go index 1775df6c0..7ab6a08b7 100644 --- a/message_reader.go +++ b/message_reader.go @@ -157,7 +157,6 @@ func (r *messageSetReader) readMessageV1(min int64, key readBytesFunc, val readB if err = r.discardN(4); err != nil { return } - r.dumpHex("After discarding 4 bytes") // read and decompress the contained message set. var decompressed bytes.Buffer if err = r.readBytesWith(func(r *bufio.Reader, sz int, n int) (remain int, err error) { @@ -197,8 +196,6 @@ func (r *messageSetReader) readMessageV1(min int64, key readBytesFunc, val readB base: offset, parent: r.readerStack, } - - r.dumpHex("After pushing decompressed") continue } @@ -209,10 +206,23 @@ func (r *messageSetReader) readMessageV1(min int64, key readBytesFunc, val readB // When the messages are compressed kafka may return messages at an // earlier offset than the one that was requested, it's the client's // responsibility to ignore those. + // + // At this point, the message header has been read, so discarding + // the rest of the message means we have to discard the key, and then + // the value. Each of those are preceeded by a 4-byte length. Discarding + // them is then reading that length variable and then discarding that + // amount. if offset < min { - if r.remain, err = discardBytes(r.reader, r.remain); err != nil { + // discard the key + if err = r.discardBytes(); err != nil { + return + } + // discard the value + if err = r.discardBytes(); err != nil { return } + // since we have fully consumed the message, mark as read + r.markRead() continue } if err = r.readBytesWith(key); err != nil { @@ -270,7 +280,6 @@ func (r *messageSetReader) readMessageV2(_ int64, key readBytesFunc, val readByt // stack. here we set the parent count to 0 so that when the child set is exhausted, the // reader will then try to read the header of the next message set r.readerStack.parent.count = 0 - r.dumpHex("After pushing stack") } } var length int64 @@ -311,6 +320,11 @@ func (r *messageSetReader) readMessageV2(_ int64, key readBytesFunc, val readByt return } +func (r *messageSetReader) discardBytes() (err error) { + r.remain, err = discardBytes(r.reader, r.remain) + return +} + func (r *messageSetReader) discardN(sz int) (err error) { r.remain, err = discardN(r.reader, r.remain, sz) return @@ -322,12 +336,14 @@ func (r *messageSetReader) markRead() { } r.count-- r.unwindStack() + r.log("Mark read remain=%d", r.remain) } func (r *messageSetReader) unwindStack() { for r.count == 0 { if r.remain == 0 { if r.parent != nil { + r.log("Popped reader stack") r.readerStack = r.parent continue } @@ -370,11 +386,6 @@ func (r *messageSetReader) readHeader() (err error) { // currently reading a set of messages, no need to read a header until they are exhausted. return } - - r.dumpHex("Before reading header") - defer r.dumpHex("After reading header") - - r.log("Reading header...") r.header = messagesHeader{} if err = r.readInt64(&r.header.firstOffset); err != nil { return @@ -406,7 +417,7 @@ func (r *messageSetReader) readHeader() (err error) { return } r.count = 1 - r.log("Read v1 header with magic=%d and attributes=%d", r.header.magic, r.header.v1.attributes) + r.log("Read v1 header with remain=%d offset=%d magic=%d and attributes=%d", r.remain, r.header.firstOffset, r.header.magic, r.header.v1.attributes) case 2: r.header.v2.leaderEpoch = crcOrLeaderEpoch if err = r.readInt32(&r.header.crc); err != nil { diff --git a/message_test.go b/message_test.go index b417edbe2..390495252 100644 --- a/message_test.go +++ b/message_test.go @@ -18,6 +18,225 @@ import ( "github.com/stretchr/testify/require" ) +// This regression test covers reading messages using offsets that +// are at the beginning and in the middle of compressed and uncompressed +// v1 message sets +func TestV1BatchOffsets(t *testing.T) { + const highWatermark = 5000 + const topic = "test-topic" + var ( + msg0 = Message{ + Offset: 0, + Key: []byte("msg-0"), + Value: []byte("key-0"), + } + msg1 = Message{ + Offset: 1, + Key: []byte("msg-1"), + Value: []byte("key-1"), + } + msg2 = Message{ + Offset: 2, + Key: []byte("msg-2"), + Value: []byte("key-2"), + } + ) + + for _, tc := range []struct { + name string + builder fetchResponseBuilder + offset int64 + expected []Message + debug bool + }{ + { + name: "num=1 off=0", + offset: 0, + builder: fetchResponseBuilder{ + header: fetchResponseHeader{ + highWatermarkOffset: highWatermark, + lastStableOffset: highWatermark, + topic: topic, + }, + msgSets: []messageSetBuilder{ + v1MessageSetBuilder{ + msgs: []Message{msg0}, + }, + }, + }, + expected: []Message{msg0}, + }, + { + name: "num=1 off=0 compressed", + offset: 0, + builder: fetchResponseBuilder{ + header: fetchResponseHeader{ + highWatermarkOffset: highWatermark, + lastStableOffset: highWatermark, + topic: topic, + }, + msgSets: []messageSetBuilder{ + v1MessageSetBuilder{ + codec: new(gzip.Codec), + msgs: []Message{msg0}, + }, + }, + }, + expected: []Message{msg0}, + }, + { + name: "num=1 off=1", + offset: 1, + builder: fetchResponseBuilder{ + header: fetchResponseHeader{ + highWatermarkOffset: highWatermark, + lastStableOffset: highWatermark, + topic: topic, + }, + msgSets: []messageSetBuilder{ + v1MessageSetBuilder{ + msgs: []Message{msg1}, + }, + }, + }, + expected: []Message{msg1}, + }, + { + name: "num=1 off=1 compressed", + offset: 1, + builder: fetchResponseBuilder{ + header: fetchResponseHeader{ + highWatermarkOffset: highWatermark, + lastStableOffset: highWatermark, + topic: topic, + }, + msgSets: []messageSetBuilder{ + v1MessageSetBuilder{ + codec: new(gzip.Codec), + msgs: []Message{msg1}, + }, + }, + }, + expected: []Message{msg1}, + }, + { + name: "num=3 off=0", + offset: 0, + builder: fetchResponseBuilder{ + header: fetchResponseHeader{ + highWatermarkOffset: highWatermark, + lastStableOffset: highWatermark, + topic: topic, + }, + msgSets: []messageSetBuilder{ + v1MessageSetBuilder{ + msgs: []Message{msg0, msg1, msg2}, + }, + }, + }, + expected: []Message{msg0, msg1, msg2}, + }, + { + name: "num=3 off=0 compressed", + offset: 0, + builder: fetchResponseBuilder{ + header: fetchResponseHeader{ + highWatermarkOffset: highWatermark, + lastStableOffset: highWatermark, + topic: topic, + }, + msgSets: []messageSetBuilder{ + v1MessageSetBuilder{ + codec: new(gzip.Codec), + msgs: []Message{msg0, msg1, msg2}, + }, + }, + }, + expected: []Message{msg0, msg1, msg2}, + }, + { + name: "num=3 off=1", + offset: 1, + debug: true, + builder: fetchResponseBuilder{ + header: fetchResponseHeader{ + highWatermarkOffset: highWatermark, + lastStableOffset: highWatermark, + topic: topic, + }, + msgSets: []messageSetBuilder{ + v1MessageSetBuilder{ + msgs: []Message{msg0, msg1, msg2}, + }, + }, + }, + expected: []Message{msg1, msg2}, + }, + { + name: "num=3 off=1 compressed", + offset: 1, + debug: true, + builder: fetchResponseBuilder{ + header: fetchResponseHeader{ + highWatermarkOffset: highWatermark, + lastStableOffset: highWatermark, + topic: topic, + }, + msgSets: []messageSetBuilder{ + v1MessageSetBuilder{ + codec: new(gzip.Codec), + msgs: []Message{msg0, msg1, msg2}, + }, + }, + }, + expected: []Message{msg1, msg2}, + }, + { + name: "num=3 off=2 compressed", + offset: 2, + debug: true, + builder: fetchResponseBuilder{ + header: fetchResponseHeader{ + highWatermarkOffset: highWatermark, + lastStableOffset: highWatermark, + topic: topic, + }, + msgSets: []messageSetBuilder{ + v1MessageSetBuilder{ + codec: new(gzip.Codec), + msgs: []Message{msg0, msg1, msg2}, + }, + }, + }, + expected: []Message{msg2}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + bs := tc.builder.bytes() + r, err := newReaderHelper(t, bs) + require.NoError(t, err) + r.offset = tc.offset + r.debug = tc.debug + filter := func(msg Message) (res Message) { + res.Offset = msg.Offset + res.Key = msg.Key + res.Value = msg.Value + return res + } + for _, expected := range tc.expected { + t.Logf("Want [%d] %s:%s", expected.Offset, expected.Key, expected.Value) + msg := filter(r.readMessage()) + t.Logf("Read [%d] %s:%s", msg.Offset, msg.Key, msg.Value) + require.EqualValues(t, expected, msg) + } + // finally, verify no more bytes remain + require.EqualValues(t, 0, r.remain) + _, err = r.readMessageErr() + require.EqualError(t, err, errShortRead.Error()) + }) + } +} + func TestMessageSetReader(t *testing.T) { const startOffset = 1000 const highWatermark = 5000 @@ -61,7 +280,7 @@ func TestMessageSetReader(t *testing.T) { header: defaultHeader, msgSets: []messageSetBuilder{ v0MessageSetBuilder{ - Message: msgs[0], + msgs: []Message{msgs[0]}, }, }, }, @@ -72,8 +291,8 @@ func TestMessageSetReader(t *testing.T) { header: defaultHeader, msgSets: []messageSetBuilder{ v0MessageSetBuilder{ - codec: new(gzip.Codec), - Message: msgs[0], + codec: new(gzip.Codec), + msgs: []Message{msgs[0]}, }, }, }, @@ -84,7 +303,7 @@ func TestMessageSetReader(t *testing.T) { header: defaultHeader, msgSets: []messageSetBuilder{ v1MessageSetBuilder{ - Message: msgs[0], + msgs: []Message{msgs[0]}, }, }, }, @@ -95,8 +314,8 @@ func TestMessageSetReader(t *testing.T) { header: defaultHeader, msgSets: []messageSetBuilder{ v1MessageSetBuilder{ - codec: new(gzip.Codec), - Message: msgs[0], + codec: new(gzip.Codec), + msgs: []Message{msgs[0]}, }, }, }, @@ -175,25 +394,25 @@ func TestMessageSetReader(t *testing.T) { header: defaultHeader, msgSets: []messageSetBuilder{ v0MessageSetBuilder{ - Message: msgs[0], + msgs: []Message{msgs[0]}, }, v2MessageSetBuilder{ msgs: []Message{msgs[1], msgs[2]}, }, v1MessageSetBuilder{ - Message: msgs[3], + msgs: []Message{msgs[3]}, }, v2MessageSetBuilder{ msgs: []Message{msgs[4], msgs[5]}, }, v1MessageSetBuilder{ - Message: msgs[6], + msgs: []Message{msgs[6]}, }, v1MessageSetBuilder{ - Message: msgs[7], + msgs: []Message{msgs[7]}, }, v0MessageSetBuilder{ - Message: msgs[8], + msgs: []Message{msgs[8]}, }, v2MessageSetBuilder{ msgs: []Message{msgs[9], msgs[10]}, @@ -207,32 +426,32 @@ func TestMessageSetReader(t *testing.T) { header: defaultHeader, msgSets: []messageSetBuilder{ v0MessageSetBuilder{ - codec: new(gzip.Codec), - Message: msgs[0], + codec: new(gzip.Codec), + msgs: []Message{msgs[0]}, }, v2MessageSetBuilder{ codec: new(zstd.Codec), msgs: []Message{msgs[1], msgs[2]}, }, v1MessageSetBuilder{ - codec: new(snappy.Codec), - Message: msgs[3], + codec: new(snappy.Codec), + msgs: []Message{msgs[3]}, }, v2MessageSetBuilder{ codec: new(lz4.Codec), msgs: []Message{msgs[4], msgs[5]}, }, v1MessageSetBuilder{ - codec: new(gzip.Codec), - Message: msgs[6], + codec: new(gzip.Codec), + msgs: []Message{msgs[6]}, }, v1MessageSetBuilder{ - codec: new(zstd.Codec), - Message: msgs[7], + codec: new(zstd.Codec), + msgs: []Message{msgs[7]}, }, v0MessageSetBuilder{ - codec: new(snappy.Codec), - Message: msgs[8], + codec: new(snappy.Codec), + msgs: []Message{msgs[8]}, }, v2MessageSetBuilder{ codec: new(lz4.Codec), @@ -247,28 +466,28 @@ func TestMessageSetReader(t *testing.T) { header: defaultHeader, msgSets: []messageSetBuilder{ v0MessageSetBuilder{ - codec: new(gzip.Codec), - Message: msgs[0], + codec: new(gzip.Codec), + msgs: []Message{msgs[0]}, }, v2MessageSetBuilder{ msgs: []Message{msgs[1], msgs[2]}, }, v1MessageSetBuilder{ - codec: new(snappy.Codec), - Message: msgs[3], + codec: new(snappy.Codec), + msgs: []Message{msgs[3]}, }, v2MessageSetBuilder{ msgs: []Message{msgs[4], msgs[5]}, }, v1MessageSetBuilder{ - Message: msgs[6], + msgs: []Message{msgs[6]}, }, v1MessageSetBuilder{ - codec: new(zstd.Codec), - Message: msgs[7], + codec: new(zstd.Codec), + msgs: []Message{msgs[7]}, }, v0MessageSetBuilder{ - Message: msgs[8], + msgs: []Message{msgs[8]}, }, v2MessageSetBuilder{ codec: new(lz4.Codec), @@ -284,11 +503,11 @@ func TestMessageSetReader(t *testing.T) { if tc.err != nil { return } - rh.offset = tc.builder.messages()[0].Offset rh.debug = tc.debug for _, messageSet := range tc.builder.msgSets { for _, expected := range messageSet.messages() { msg := rh.readMessage() + require.Equal(t, expected.Offset, msg.Offset) require.Equal(t, string(expected.Key), string(msg.Key)) require.Equal(t, string(expected.Value), string(msg.Value)) switch messageSet.(type) {