From cbcae911769b05cf892022d863f963d777a35374 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Wed, 31 Jul 2024 14:29:00 +0530 Subject: [PATCH 1/3] try reading chunks which have incorrect offset for blocks --- pkg/chunkenc/memchunk.go | 60 +++++++++++++--- pkg/chunkenc/memchunk_test.go | 126 ++++++++++++++++++++++++++++++++++ 2 files changed, 176 insertions(+), 10 deletions(-) diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 3d87aec705dc5..476125defca76 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -441,13 +441,18 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me metasOffset := uint64(0) metasLen := uint64(0) + expectedBlockOffset := 0 if version >= ChunkFormatV4 { - // version >= 4 starts writing length of sections after their offsets + // version >= 4 starts writing length of sections before their offsets metasLen, metasOffset = readSectionLenAndOffset(chunkMetasSectionIdx) + structuredMetadataLength, structuredMetadataOffset := readSectionLenAndOffset(chunkStructuredMetadataSectionIdx) + expectedBlockOffset = int(structuredMetadataLength + structuredMetadataOffset + 4) } else { // version <= 3 does not store length of metas. metas are followed by metasOffset + hash and then the chunk ends metasOffset = binary.BigEndian.Uint64(b[len(b)-8:]) metasLen = uint64(len(b)-(8+4)) - metasOffset + // version 1 writes blocks after version number while version 2 and 3 write blocks after chunk encoding + expectedBlockOffset = len(b) - len(db.b) } mb := b[metasOffset : metasOffset+metasLen] db = decbuf{b: mb} @@ -476,18 +481,35 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me blk.uncompressedSize = db.uvarint() } l := db.uvarint() - if blk.offset+l > len(b) { - return nil, fmt.Errorf("block %d offset %d + length %d exceeds chunk length %d", i, blk.offset, l, len(b)) - } - blk.b = b[blk.offset : blk.offset+l] - // Verify checksums. - expCRC := binary.BigEndian.Uint32(b[blk.offset+l:]) - if expCRC != crc32.Checksum(blk.b, castagnoliTable) { - _ = level.Error(util_log.Logger).Log("msg", "Checksum does not match for a block in chunk, this block will be skipped", "err", ErrInvalidChecksum) - continue + invalidBlockErr := validateBlock(b, blk.offset, l) + if invalidBlockErr != nil { + level.Error(util_log.Logger).Log("msg", "invalid block found", "err", invalidBlockErr) + // if block is expected to have different offset than what is encoded, see if we get a valid block using expected offset + if blk.offset != expectedBlockOffset { + _ = level.Error(util_log.Logger).Log("msg", "block offset does not match expected one, will try reading with expected offset", "actual", blk.offset, "expected", expectedBlockOffset) + blk.offset = expectedBlockOffset + if err := validateBlock(b, blk.offset, l); err != nil { + level.Error(util_log.Logger).Log("msg", "could not find valid block using expected offset", "err", err) + } else { + invalidBlockErr = nil + level.Info(util_log.Logger).Log("msg", "valid block found using expected offset") + } + } + + // if the block read with expected offset is still invalid, do not continue further + if invalidBlockErr != nil { + if errors.Is(invalidBlockErr, ErrInvalidChecksum) { + expectedBlockOffset += l + 4 + continue + } + return nil, invalidBlockErr + } } + // next block starts at current block start + current block length + checksum + expectedBlockOffset = blk.offset + l + 4 + blk.b = b[blk.offset : blk.offset+l] bc.blocks = append(bc.blocks, blk) // Update the counter used to track the size of cut blocks. @@ -1696,3 +1718,21 @@ func (e *sampleBufferedIterator) StreamHash() uint64 { return e.extractor.BaseLa func (e *sampleBufferedIterator) At() logproto.Sample { return e.cur } + +// validateBlock validates block by doing following checks: +// 1. Offset+length do not overrun size of the chunk from which we are reading the block. +// 2. Checksum of the block we will read matches the stored checksum in the chunk. +func validateBlock(chunkBytes []byte, offset, length int) error { + if offset+length > len(chunkBytes) { + return fmt.Errorf("offset %d + length %d exceeds chunk length %d", offset, length, len(chunkBytes)) + } + + blockBytes := chunkBytes[offset : offset+length] + // Verify checksums. + expCRC := binary.BigEndian.Uint32(chunkBytes[offset+length:]) + if expCRC != crc32.Checksum(blockBytes, castagnoliTable) { + return ErrInvalidChecksum + } + + return nil +} diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index 6c48a28b0650f..f400d7bd7137d 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -5,6 +5,7 @@ import ( "context" "encoding/binary" "fmt" + "hash" "math" "math/rand" "sort" @@ -2044,3 +2045,128 @@ func TestMemChunk_IteratorWithStructuredMetadata(t *testing.T) { }) } } + +func TestDecodeChunkIncorrectBlockOffset(t *testing.T) { + // use small block size to build multiple blocks in the test chunk + blockSize := 10 + + for _, format := range allPossibleFormats { + t.Run(fmt.Sprintf("chunkFormat:%v headBlockFmt:%v", format.chunkFormat, format.headBlockFmt), func(t *testing.T) { + for incorrectOffsetBlockNum := 0; incorrectOffsetBlockNum < 3; incorrectOffsetBlockNum++ { + t.Run(fmt.Sprintf("inorrect offset block: %d", incorrectOffsetBlockNum), func(t *testing.T) { + chk := NewMemChunk(format.chunkFormat, EncNone, format.headBlockFmt, blockSize, testTargetSize) + ts := time.Now().Unix() + for i := 0; i < 3; i++ { + dup, err := chk.Append(&logproto.Entry{ + Timestamp: time.Now(), + Line: fmt.Sprintf("%d-%d", ts, i), + StructuredMetadata: []logproto.LabelAdapter{ + {Name: "foo", Value: fmt.Sprintf("%d-%d", ts, i)}, + }, + }) + require.NoError(t, err) + require.False(t, dup) + } + + require.Len(t, chk.blocks, 3) + + b, err := chk.Bytes() + require.NoError(t, err) + + metasOffset := binary.BigEndian.Uint64(b[len(b)-8:]) + metasLen := 0 + if chk.format >= ChunkFormatV4 { + metasLen = int(binary.BigEndian.Uint64(b[len(b)-16:])) + } else { + metasLen = len(b) - (8 + 4) - int(metasOffset) + } + + w := bytes.NewBuffer(nil) + eb := EncodeBufferPool.Get().(*encbuf) + defer EncodeBufferPool.Put(eb) + + crc32Hash := crc32HashPool.Get().(hash.Hash32) + defer crc32HashPool.Put(crc32Hash) + + crc32Hash.Reset() + eb.reset() + + // BEGIN - code copied from writeTo func starting from encoding of block metas to change offset of a block + eb.putUvarint(len(chk.blocks)) + offset := int64(metasOffset) + + for i, b := range chk.blocks { + eb.putUvarint(b.numEntries) + eb.putVarint64(b.mint) + eb.putVarint64(b.maxt) + // change offset of one block + blockOffset := b.offset + if i == incorrectOffsetBlockNum { + blockOffset += 5 + } + eb.putUvarint(blockOffset) + if chk.format >= ChunkFormatV3 { + eb.putUvarint(b.uncompressedSize) + } + eb.putUvarint(len(b.b)) + } + metasLen = len(eb.get()) + eb.putHash(crc32Hash) + + n, err := w.Write(eb.get()) + require.NoError(t, err) + offset += int64(n) + + if chk.format >= ChunkFormatV4 { + // Write structured metadata offset and length + eb.reset() + + eb.putBE64int(int(binary.BigEndian.Uint64(b[len(b)-32:]))) + eb.putBE64int(int(binary.BigEndian.Uint64(b[len(b)-24:]))) + n, err = w.Write(eb.get()) + require.NoError(t, err) + offset += int64(n) + } + + // Write the metasOffset. + eb.reset() + if chk.format >= ChunkFormatV4 { + eb.putBE64int(metasLen) + } + eb.putBE64int(int(metasOffset)) + n, err = w.Write(eb.get()) + require.NoError(t, err) + // END - code copied from writeTo func + + // build chunk using pre-block meta section + rewritten remainder of the chunk with incorrect offset for a block + chkWithIncorrectOffset := make([]byte, int(metasOffset)+w.Len()) + copy(chkWithIncorrectOffset, b[:metasOffset]) + copy(chkWithIncorrectOffset[metasOffset:], w.Bytes()) + + // decoding the problematic chunk should succeed + decodedChkWithIncorrectOffset, err := newByteChunk(chkWithIncorrectOffset, blockSize, testTargetSize, false) + require.NoError(t, err) + + require.Len(t, decodedChkWithIncorrectOffset.blocks, len(chk.blocks)) + + // both chunks should have same log lines + origChunkItr, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{})) + require.NoError(t, err) + + corruptChunkItr, err := decodedChkWithIncorrectOffset.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{})) + require.NoError(t, err) + + numEntriesFound := 0 + for origChunkItr.Next() { + numEntriesFound++ + require.True(t, corruptChunkItr.Next()) + require.Equal(t, origChunkItr.At(), corruptChunkItr.At()) + } + + require.False(t, corruptChunkItr.Next()) + require.Equal(t, 3, numEntriesFound) + }) + } + }) + } +} From 28a587e6821dd75850c1a80eae4bc93dcefb499e Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Wed, 31 Jul 2024 15:05:22 +0530 Subject: [PATCH 2/3] lint --- pkg/chunkenc/memchunk_test.go | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index f400d7bd7137d..daa97a2616917 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -2074,12 +2074,6 @@ func TestDecodeChunkIncorrectBlockOffset(t *testing.T) { require.NoError(t, err) metasOffset := binary.BigEndian.Uint64(b[len(b)-8:]) - metasLen := 0 - if chk.format >= ChunkFormatV4 { - metasLen = int(binary.BigEndian.Uint64(b[len(b)-16:])) - } else { - metasLen = len(b) - (8 + 4) - int(metasOffset) - } w := bytes.NewBuffer(nil) eb := EncodeBufferPool.Get().(*encbuf) @@ -2093,7 +2087,6 @@ func TestDecodeChunkIncorrectBlockOffset(t *testing.T) { // BEGIN - code copied from writeTo func starting from encoding of block metas to change offset of a block eb.putUvarint(len(chk.blocks)) - offset := int64(metasOffset) for i, b := range chk.blocks { eb.putUvarint(b.numEntries) @@ -2110,12 +2103,11 @@ func TestDecodeChunkIncorrectBlockOffset(t *testing.T) { } eb.putUvarint(len(b.b)) } - metasLen = len(eb.get()) + metasLen := len(eb.get()) eb.putHash(crc32Hash) - n, err := w.Write(eb.get()) + _, err = w.Write(eb.get()) require.NoError(t, err) - offset += int64(n) if chk.format >= ChunkFormatV4 { // Write structured metadata offset and length @@ -2123,9 +2115,8 @@ func TestDecodeChunkIncorrectBlockOffset(t *testing.T) { eb.putBE64int(int(binary.BigEndian.Uint64(b[len(b)-32:]))) eb.putBE64int(int(binary.BigEndian.Uint64(b[len(b)-24:]))) - n, err = w.Write(eb.get()) + _, err = w.Write(eb.get()) require.NoError(t, err) - offset += int64(n) } // Write the metasOffset. @@ -2134,7 +2125,7 @@ func TestDecodeChunkIncorrectBlockOffset(t *testing.T) { eb.putBE64int(metasLen) } eb.putBE64int(int(metasOffset)) - n, err = w.Write(eb.get()) + _, err = w.Write(eb.get()) require.NoError(t, err) // END - code copied from writeTo func From 8386ebec05d058b95560e6bc810318b5a9d649ef Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Sun, 4 Aug 2024 19:07:39 +0530 Subject: [PATCH 3/3] Update memchunk.go --- pkg/chunkenc/memchunk.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 476125defca76..328e91c94deb3 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -441,6 +441,8 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me metasOffset := uint64(0) metasLen := uint64(0) + // There is a rare issue where chunks built by Loki have incorrect offset for some blocks which causes Loki to fail to read those chunks. + // While the root cause is yet to be identified, we will try to read those problematic chunks using the expected offset for blocks calculated using other relative offsets in the chunk. expectedBlockOffset := 0 if version >= ChunkFormatV4 { // version >= 4 starts writing length of sections before their offsets