Skip to content

Commit

Permalink
Fix bug when seeking offsets in compressed batches
Browse files Browse the repository at this point in the history
The reader was failing to mark the current message as read when
fast forwarding through a compressed message set with more than
one inner messages. Additionally, it was only discarding the
message key instead of both the key and the value.

After fixing this, the reader was able to discard the message
correctly, and start parsing the next message header.
  • Loading branch information
Collin Van Dyck committed Oct 18, 2021
1 parent c5dc58d commit de5fea3
Show file tree
Hide file tree
Showing 2 changed files with 271 additions and 41 deletions.
33 changes: 22 additions & 11 deletions message_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ func (r *messageSetReader) readMessageV1(min int64, key readBytesFunc, val readB
if err = r.discardN(4); err != nil {
return
}
r.dumpHex("After discarding 4 bytes")
// read and decompress the contained message set.
var decompressed bytes.Buffer
if err = r.readBytesWith(func(r *bufio.Reader, sz int, n int) (remain int, err error) {
Expand Down Expand Up @@ -197,8 +196,6 @@ func (r *messageSetReader) readMessageV1(min int64, key readBytesFunc, val readB
base: offset,
parent: r.readerStack,
}

r.dumpHex("After pushing decompressed")
continue
}

Expand All @@ -209,10 +206,23 @@ func (r *messageSetReader) readMessageV1(min int64, key readBytesFunc, val readB
// When the messages are compressed kafka may return messages at an
// earlier offset than the one that was requested, it's the client's
// responsibility to ignore those.
//
// At this point, the message header has been read, so discarding
// the rest of the message means we have to discard the key, and then
// the value. Each of those are preceeded by a 4-byte length. Discarding
// them is then reading that length variable and then discarding that
// amount.
if offset < min {
if r.remain, err = discardBytes(r.reader, r.remain); err != nil {
// discard the key
if err = r.discardBytes(); err != nil {
return
}
// discard the value
if err = r.discardBytes(); err != nil {
return
}
// since we have fully consumed the message, mark as read
r.markRead()
continue
}
if err = r.readBytesWith(key); err != nil {
Expand Down Expand Up @@ -270,7 +280,6 @@ func (r *messageSetReader) readMessageV2(_ int64, key readBytesFunc, val readByt
// stack. here we set the parent count to 0 so that when the child set is exhausted, the
// reader will then try to read the header of the next message set
r.readerStack.parent.count = 0
r.dumpHex("After pushing stack")
}
}
var length int64
Expand Down Expand Up @@ -311,6 +320,11 @@ func (r *messageSetReader) readMessageV2(_ int64, key readBytesFunc, val readByt
return
}

func (r *messageSetReader) discardBytes() (err error) {
r.remain, err = discardBytes(r.reader, r.remain)
return
}

func (r *messageSetReader) discardN(sz int) (err error) {
r.remain, err = discardN(r.reader, r.remain, sz)
return
Expand All @@ -322,12 +336,14 @@ func (r *messageSetReader) markRead() {
}
r.count--
r.unwindStack()
r.log("Mark read remain=%d", r.remain)
}

func (r *messageSetReader) unwindStack() {
for r.count == 0 {
if r.remain == 0 {
if r.parent != nil {
r.log("Popped reader stack")
r.readerStack = r.parent
continue
}
Expand Down Expand Up @@ -370,11 +386,6 @@ func (r *messageSetReader) readHeader() (err error) {
// currently reading a set of messages, no need to read a header until they are exhausted.
return
}

r.dumpHex("Before reading header")
defer r.dumpHex("After reading header")

r.log("Reading header...")
r.header = messagesHeader{}
if err = r.readInt64(&r.header.firstOffset); err != nil {
return
Expand Down Expand Up @@ -406,7 +417,7 @@ func (r *messageSetReader) readHeader() (err error) {
return
}
r.count = 1
r.log("Read v1 header with magic=%d and attributes=%d", r.header.magic, r.header.v1.attributes)
r.log("Read v1 header with remain=%d offset=%d magic=%d and attributes=%d", r.remain, r.header.firstOffset, r.header.magic, r.header.v1.attributes)
case 2:
r.header.v2.leaderEpoch = crcOrLeaderEpoch
if err = r.readInt32(&r.header.crc); err != nil {
Expand Down
Loading

0 comments on commit de5fea3

Please sign in to comment.