diff --git a/message_reader.go b/message_reader.go index cf9c0c36d..52642e2c6 100644 --- a/message_reader.go +++ b/message_reader.go @@ -6,6 +6,8 @@ import ( "fmt" "io" "log" + + "github.com/segmentio/kafka-go/compress" ) type readBytesFunc func(*bufio.Reader, int, int) (int, error) @@ -247,82 +249,59 @@ func (r *messageSetReader) readMessageV1(min int64, key readBytesFunc, val readB return } -func (r *messageSetReader) readMessageV2(_ int64, key readBytesFunc, val readBytesFunc) ( - offset int64, lastOffset int64, timestamp int64, headers []Header, err error) { - if err = r.readHeader(); err != nil { - return - } - if r.count == int(r.header.v2.count) { // first time reading this set, so check for compression headers. - var codec CompressionCodec - if codec, err = r.header.compression(); err != nil { - return - } - if codec != nil { - batchRemain := int(r.header.length - 49) // TODO: document this magic number - if batchRemain > r.remain { - err = errShortRead - return - } - if batchRemain < 0 { - err = fmt.Errorf("batch remain < 0 (%d)", batchRemain) - return - } - r.decompressed.Reset() - // x4 as a guess that the average compression ratio is near 75% - r.decompressed.Grow(4 * batchRemain) - limitReader := io.LimitedReader{R: r.reader, N: int64(batchRemain)} - codecReader := codec.NewReader(&limitReader) - _, err = r.decompressed.ReadFrom(codecReader) - codecReader.Close() - if err != nil { - return - } - r.remain -= batchRemain - int(limitReader.N) - r.readerStack = &readerStack{ - reader: bufio.NewReaderSize(r.decompressed, 0), // the new stack reads from the decompressed buffer - remain: r.decompressed.Len(), - base: -1, // base is unused here - parent: r.readerStack, - header: r.header, - count: r.count, - } - // all of the messages in this set are in the decompressed set just pushed onto the reader - // stack. here we set the parent count to 0 so that when the child set is exhausted, the - // reader will then try to read the header of the next message set - r.readerStack.parent.count = 0 - } - } - remainBefore := r.remain +func (r *messageSetReader) readMessageV2(_ int64, keyFn readBytesFunc, valFn readBytesFunc) ( + offset int64, + lastOffset int64, + timestamp int64, + headers []Header, + err error, +) { + // start of a record, see https://kafka.apache.org/documentation/#record + + // length: varint + remainBeforeLengthHeader := r.remain var length int64 if err = r.readVarInt(&length); err != nil { return } - lengthOfLength := remainBefore - r.remain - var attrs int8 - if err = r.readInt8(&attrs); err != nil { + lengthHeaderSize := remainBeforeLengthHeader - r.remain + // attributes: int8 + // bit 0~7: unused + if err = r.discardN(1); err != nil { return } + // timestampDelta: varlong var timestampDelta int64 if err = r.readVarInt(×tampDelta); err != nil { return } timestamp = r.header.v2.firstTimestamp + timestampDelta + // offsetDelta: varint var offsetDelta int64 if err = r.readVarInt(&offsetDelta); err != nil { return } offset = r.header.firstOffset + offsetDelta - if err = r.runFunc(key); err != nil { + // read the record key + // keyLength: varint + // key: byte[] + if err = r.runFunc(keyFn); err != nil { return } - if err = r.runFunc(val); err != nil { + // read the record value + // valueLen: varint + // value: byte[] + if err = r.runFunc(valFn); err != nil { return } + // start of record headers, see https://kafka.apache.org/documentation/#recordheader + // headerKeyLength: varint var headerCount int64 if err = r.readVarInt(&headerCount); err != nil { return } if headerCount > 0 { + // read header key value pairs if there are any headers = make([]Header, headerCount) for i := range headers { if err = r.readMessageHeader(&headers[i]); err != nil { @@ -331,11 +310,50 @@ func (r *messageSetReader) readMessageV2(_ int64, key readBytesFunc, val readByt } } lastOffset = r.header.firstOffset + int64(r.header.v2.lastOffsetDelta) - r.lengthRemain -= int(length) + lengthOfLength + r.lengthRemain -= int(length) + lengthHeaderSize + // decrement the number of records remaining r.markRead() return } +func (r *messageSetReader) handleCompressionForV2(codec compress.Codec) error { + // TODO: document this magic number + batchRemain := int(r.header.length - 49) + if batchRemain > r.remain { + return errShortRead + } + if batchRemain < 0 { + return fmt.Errorf("batch remain < 0 (%d)", batchRemain) + } + r.decompressed.Reset() + + // x4 as a guess that the average compression ratio is near 75% + r.decompressed.Grow(4 * batchRemain) + limitReader := io.LimitedReader{R: r.reader, N: int64(batchRemain)} + + codecReader := codec.NewReader(&limitReader) + _, err := r.decompressed.ReadFrom(codecReader) + codecReader.Close() + if err != nil { + return err + } + r.remain -= batchRemain - int(limitReader.N) + r.readerStack = &readerStack{ + reader: bufio.NewReaderSize(r.decompressed, 0), // the new stack reads from the decompressed buffer + remain: r.decompressed.Len(), + base: -1, // base is unused here + parent: r.readerStack, + header: r.header, + count: r.count, + } + + // all of the messages in this set are in the decompressed set just pushed onto the reader + // stack. here we set the parent count to 0 so that when the child set is exhausted, the + // reader will then try to read the header of the next message set + r.readerStack.parent.count = 0 + return nil +} + func (r *messageSetReader) discardBytes() (err error) { r.remain, err = discardBytes(r.reader, r.remain) return @@ -347,6 +365,17 @@ func (r *messageSetReader) discardN(sz int) (err error) { } func (r *messageSetReader) markRead() { + if r.header.magic >= 2 && r.header.v2.count == 0 { + // from https://kafka.apache.org/documentation/#recordbatch + // + // On compaction: unlike the older message formats, magic v2 and above preserves the first and last + // offset/sequence numbers from the original batch when the log is cleaned. [...] As a result, it is possible to + // have empty batches in the log when all the records in the batch are cleaned but batch is still + // retained in order to preserve a producer's last sequence number. + r.log("Got empty message batch") + r.unwindStack() + return + } if r.count == 0 { panic("markRead: negative count") } @@ -369,6 +398,9 @@ func (r *messageSetReader) unwindStack() { } func (r *messageSetReader) readMessageHeader(header *Header) (err error) { + // see https://kafka.apache.org/documentation/#recordheader + + // headerKey: String var keyLen int64 if err = r.readVarInt(&keyLen); err != nil { return @@ -376,10 +408,12 @@ func (r *messageSetReader) readMessageHeader(header *Header) (err error) { if header.Key, err = r.readNewString(int(keyLen)); err != nil { return } + // headerValueLength: varint var valLen int64 if err = r.readVarInt(&valLen); err != nil { return } + // Value: byte[] if header.Value, err = r.readNewBytes(int(valLen)); err != nil { return } @@ -398,81 +432,65 @@ func (r *messageSetReader) runFunc(rbFunc readBytesFunc) (err error) { } func (r *messageSetReader) readHeader() (err error) { + // TODO: figure out a better heuristic for determining if we need to read a new batch header if r.count > 0 { // currently reading a set of messages, no need to read a header until they are exhausted. return + } else if r.header.magic >= 2 && r.header.v2.count == 0 { + // got an enpty message batch and already read the header + return } - r.header = messagesHeader{} + + // read fields common among record batch (magic v2) and message set (magic v0 and v1) + // see https://kafka.apache.org/documentation/#recordbatch and https://kafka.apache.org/documentation/#messageset + + // record batch: baseOffset int64 + // message set: offset int64 if err = r.readInt64(&r.header.firstOffset); err != nil { return } + // record batch: batchLength int32 + // message set: message_size int32 if err = r.readInt32(&r.header.length); err != nil { return } + // record batch: partitionLeaderEpoch int32 + // message set: crc int32 var crcOrLeaderEpoch int32 if err = r.readInt32(&crcOrLeaderEpoch); err != nil { return } + // magic int8 if err = r.readInt8(&r.header.magic); err != nil { return } switch r.header.magic { case 0: - r.header.crc = crcOrLeaderEpoch - if err = r.readInt8(&r.header.v1.attributes); err != nil { + err = r.readMessageSetHeaderV0(crcOrLeaderEpoch) + if err != nil { return } - r.count = 1 - // Set arbitrary non-zero length so that we always assume the - // message is truncated since bytes remain. - r.lengthRemain = 1 - r.log("Read v0 header with offset=%d len=%d magic=%d attributes=%d", r.header.firstOffset, r.header.length, r.header.magic, r.header.v1.attributes) case 1: - r.header.crc = crcOrLeaderEpoch - if err = r.readInt8(&r.header.v1.attributes); err != nil { - return - } - if err = r.readInt64(&r.header.v1.timestamp); err != nil { + err = r.readMessageSetHeaderV1(crcOrLeaderEpoch) + if err != nil { return } - r.count = 1 - // Set arbitrary non-zero length so that we always assume the - // message is truncated since bytes remain. - r.lengthRemain = 1 - r.log("Read v1 header with remain=%d offset=%d magic=%d and attributes=%d", r.remain, r.header.firstOffset, r.header.magic, r.header.v1.attributes) case 2: - r.header.v2.leaderEpoch = crcOrLeaderEpoch - if err = r.readInt32(&r.header.crc); err != nil { - return - } - if err = r.readInt16(&r.header.v2.attributes); err != nil { - return - } - if err = r.readInt32(&r.header.v2.lastOffsetDelta); err != nil { - return - } - if err = r.readInt64(&r.header.v2.firstTimestamp); err != nil { - return - } - if err = r.readInt64(&r.header.v2.lastTimestamp); err != nil { - return - } - if err = r.readInt64(&r.header.v2.producerID); err != nil { - return - } - if err = r.readInt16(&r.header.v2.producerEpoch); err != nil { + err = r.readRecordBatchHeader(crcOrLeaderEpoch) + if err != nil { return } - if err = r.readInt32(&r.header.v2.baseSequence); err != nil { + // we just started reading a new record batch, set up decompression if required + var codec CompressionCodec + if codec, err = r.header.compression(); err != nil { return } - if err = r.readInt32(&r.header.v2.count); err != nil { - return + if codec != nil { + err = r.handleCompressionForV2(codec) + if err != nil { + return + } } - r.count = int(r.header.v2.count) - // Subtracts the header bytes from the length - r.lengthRemain = int(r.header.length) - 49 - r.log("Read v2 header with count=%d offset=%d len=%d magic=%d attributes=%d", r.count, r.header.firstOffset, r.header.length, r.header.magic, r.header.v2.attributes) default: err = r.header.badMagic() return @@ -480,6 +498,115 @@ func (r *messageSetReader) readHeader() (err error) { return } +func (r *messageSetReader) readRecordBatchHeader(crcOrLeaderEpoch int32) error { + // v2 specific header fields, see https://kafka.apache.org/documentation/#recordbatch + r.header.v2.leaderEpoch = crcOrLeaderEpoch + // crc: int32 + if err := r.readInt32(&r.header.crc); err != nil { + return err + } + // attributes: int16 + // bit 0~2: + // 0: no compression + // 1: gzip + // 2: snappy + // 3: lz4 + // 4: zstd + // bit 3: timestampType + // bit 4: isTransactional (0 means not transactional) + // bit 5: isControlBatch (0 means not a control batch) + // bit 6: hasDeleteHorizonMs (0 means baseTimestamp is not set as the delete horizon for compaction) + // bit 7~15: unused + if err := r.readInt16(&r.header.v2.attributes); err != nil { + return err + } + // lastOffsetDelta: int32 + if err := r.readInt32(&r.header.v2.lastOffsetDelta); err != nil { + return err + } + // baseTimestamp: int64 + if err := r.readInt64(&r.header.v2.firstTimestamp); err != nil { + return err + } + // maxTimestamp: int64 + if err := r.readInt64(&r.header.v2.lastTimestamp); err != nil { + return err + } + // producerId: int64 + if err := r.readInt64(&r.header.v2.producerID); err != nil { + return err + } + // producerEpoch: int16 + if err := r.readInt16(&r.header.v2.producerEpoch); err != nil { + return err + } + // baseSequence: int32 + if err := r.readInt32(&r.header.v2.baseSequence); err != nil { + return err + } + + // TODO: find documentation for this header field + if err := r.readInt32(&r.header.v2.count); err != nil { + return err + } + r.count = int(r.header.v2.count) + + // Subtracts the header bytes from the length + r.lengthRemain = int(r.header.length) - 49 + r.log("Read v2 header with count=%d offset=%d len=%d magic=%d attributes=%d", r.count, r.header.firstOffset, r.header.length, r.header.magic, r.header.v2.attributes) + return nil +} + +func (r *messageSetReader) readMessageSetHeaderV1(crcOrLeaderEpoch int32) error { + r.header.crc = crcOrLeaderEpoch + // v1 specific header fields, see https://kafka.apache.org/documentation/#messageset + + // attributes => INT8 + // bit 0~2: + // 0: no compression + // 1: gzip + // 2: snappy + // 3: lz4 + // bit 3: timestampType + // 0: create time + // 1: log append time + // bit 4~7: unused + if err := r.readInt8(&r.header.v1.attributes); err != nil { + return err + } + // timestamp => INT64 + if err := r.readInt64(&r.header.v1.timestamp); err != nil { + return err + } + r.count = 1 + // Set arbitrary non-zero length so that we always assume the + // message is truncated since bytes remain. + r.lengthRemain = 1 + r.log("Read v1 header with remain=%d offset=%d magic=%d and attributes=%d", r.remain, r.header.firstOffset, r.header.magic, r.header.v1.attributes) + return nil +} + +func (r *messageSetReader) readMessageSetHeaderV0(crcOrLeaderEpoch int32) error { + r.header.crc = crcOrLeaderEpoch + // v0 specific header fields, see https://kafka.apache.org/documentation/#messageset + + // attributes => INT8 + // bit 0~2: + // 0: no compression + // 1: gzip + // 2: snappy + // bit 3~7: unused + if err := r.readInt8(&r.header.v1.attributes); err != nil { + return err + } + r.count = 1 + // Set arbitrary non-zero length so that we always assume the + // message is truncated since bytes remain. + r.lengthRemain = 1 + r.log("Read v0 header with offset=%d len=%d magic=%d attributes=%d", r.header.firstOffset, r.header.length, r.header.magic, r.header.v1.attributes) + return nil +} + func (r *messageSetReader) readNewBytes(len int) (res []byte, err error) { res, r.remain, err = readNewBytes(r.reader, r.remain, len) return