diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 3d87aec705dc5..328e91c94deb3 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -441,13 +441,20 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me metasOffset := uint64(0) metasLen := uint64(0) + // There is a rare issue where chunks built by Loki have incorrect offset for some blocks which causes Loki to fail to read those chunks. + // While the root cause is yet to be identified, we will try to read those problematic chunks using the expected offset for blocks calculated using other relative offsets in the chunk. + expectedBlockOffset := 0 if version >= ChunkFormatV4 { - // version >= 4 starts writing length of sections after their offsets + // version >= 4 starts writing length of sections before their offsets metasLen, metasOffset = readSectionLenAndOffset(chunkMetasSectionIdx) + structuredMetadataLength, structuredMetadataOffset := readSectionLenAndOffset(chunkStructuredMetadataSectionIdx) + expectedBlockOffset = int(structuredMetadataLength + structuredMetadataOffset + 4) } else { // version <= 3 does not store length of metas. metas are followed by metasOffset + hash and then the chunk ends metasOffset = binary.BigEndian.Uint64(b[len(b)-8:]) metasLen = uint64(len(b)-(8+4)) - metasOffset + // version 1 writes blocks after version number while version 2 and 3 write blocks after chunk encoding + expectedBlockOffset = len(b) - len(db.b) } mb := b[metasOffset : metasOffset+metasLen] db = decbuf{b: mb} @@ -476,18 +483,35 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me blk.uncompressedSize = db.uvarint() } l := db.uvarint() - if blk.offset+l > len(b) { - return nil, fmt.Errorf("block %d offset %d + length %d exceeds chunk length %d", i, blk.offset, l, len(b)) - } - blk.b = b[blk.offset : blk.offset+l] - // Verify checksums. - expCRC := binary.BigEndian.Uint32(b[blk.offset+l:]) - if expCRC != crc32.Checksum(blk.b, castagnoliTable) { - _ = level.Error(util_log.Logger).Log("msg", "Checksum does not match for a block in chunk, this block will be skipped", "err", ErrInvalidChecksum) - continue + invalidBlockErr := validateBlock(b, blk.offset, l) + if invalidBlockErr != nil { + level.Error(util_log.Logger).Log("msg", "invalid block found", "err", invalidBlockErr) + // if block is expected to have different offset than what is encoded, see if we get a valid block using expected offset + if blk.offset != expectedBlockOffset { + _ = level.Error(util_log.Logger).Log("msg", "block offset does not match expected one, will try reading with expected offset", "actual", blk.offset, "expected", expectedBlockOffset) + blk.offset = expectedBlockOffset + if err := validateBlock(b, blk.offset, l); err != nil { + level.Error(util_log.Logger).Log("msg", "could not find valid block using expected offset", "err", err) + } else { + invalidBlockErr = nil + level.Info(util_log.Logger).Log("msg", "valid block found using expected offset") + } + } + + // if the block read with expected offset is still invalid, do not continue further + if invalidBlockErr != nil { + if errors.Is(invalidBlockErr, ErrInvalidChecksum) { + expectedBlockOffset += l + 4 + continue + } + return nil, invalidBlockErr + } } + // next block starts at current block start + current block length + checksum + expectedBlockOffset = blk.offset + l + 4 + blk.b = b[blk.offset : blk.offset+l] bc.blocks = append(bc.blocks, blk) // Update the counter used to track the size of cut blocks. @@ -1696,3 +1720,21 @@ func (e *sampleBufferedIterator) StreamHash() uint64 { return e.extractor.BaseLa func (e *sampleBufferedIterator) At() logproto.Sample { return e.cur } + +// validateBlock validates block by doing following checks: +// 1. Offset+length do not overrun size of the chunk from which we are reading the block. +// 2. Checksum of the block we will read matches the stored checksum in the chunk. +func validateBlock(chunkBytes []byte, offset, length int) error { + if offset+length > len(chunkBytes) { + return fmt.Errorf("offset %d + length %d exceeds chunk length %d", offset, length, len(chunkBytes)) + } + + blockBytes := chunkBytes[offset : offset+length] + // Verify checksums. + expCRC := binary.BigEndian.Uint32(chunkBytes[offset+length:]) + if expCRC != crc32.Checksum(blockBytes, castagnoliTable) { + return ErrInvalidChecksum + } + + return nil +} diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index 6c48a28b0650f..daa97a2616917 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -5,6 +5,7 @@ import ( "context" "encoding/binary" "fmt" + "hash" "math" "math/rand" "sort" @@ -2044,3 +2045,119 @@ func TestMemChunk_IteratorWithStructuredMetadata(t *testing.T) { }) } } + +func TestDecodeChunkIncorrectBlockOffset(t *testing.T) { + // use small block size to build multiple blocks in the test chunk + blockSize := 10 + + for _, format := range allPossibleFormats { + t.Run(fmt.Sprintf("chunkFormat:%v headBlockFmt:%v", format.chunkFormat, format.headBlockFmt), func(t *testing.T) { + for incorrectOffsetBlockNum := 0; incorrectOffsetBlockNum < 3; incorrectOffsetBlockNum++ { + t.Run(fmt.Sprintf("inorrect offset block: %d", incorrectOffsetBlockNum), func(t *testing.T) { + chk := NewMemChunk(format.chunkFormat, EncNone, format.headBlockFmt, blockSize, testTargetSize) + ts := time.Now().Unix() + for i := 0; i < 3; i++ { + dup, err := chk.Append(&logproto.Entry{ + Timestamp: time.Now(), + Line: fmt.Sprintf("%d-%d", ts, i), + StructuredMetadata: []logproto.LabelAdapter{ + {Name: "foo", Value: fmt.Sprintf("%d-%d", ts, i)}, + }, + }) + require.NoError(t, err) + require.False(t, dup) + } + + require.Len(t, chk.blocks, 3) + + b, err := chk.Bytes() + require.NoError(t, err) + + metasOffset := binary.BigEndian.Uint64(b[len(b)-8:]) + + w := bytes.NewBuffer(nil) + eb := EncodeBufferPool.Get().(*encbuf) + defer EncodeBufferPool.Put(eb) + + crc32Hash := crc32HashPool.Get().(hash.Hash32) + defer crc32HashPool.Put(crc32Hash) + + crc32Hash.Reset() + eb.reset() + + // BEGIN - code copied from writeTo func starting from encoding of block metas to change offset of a block + eb.putUvarint(len(chk.blocks)) + + for i, b := range chk.blocks { + eb.putUvarint(b.numEntries) + eb.putVarint64(b.mint) + eb.putVarint64(b.maxt) + // change offset of one block + blockOffset := b.offset + if i == incorrectOffsetBlockNum { + blockOffset += 5 + } + eb.putUvarint(blockOffset) + if chk.format >= ChunkFormatV3 { + eb.putUvarint(b.uncompressedSize) + } + eb.putUvarint(len(b.b)) + } + metasLen := len(eb.get()) + eb.putHash(crc32Hash) + + _, err = w.Write(eb.get()) + require.NoError(t, err) + + if chk.format >= ChunkFormatV4 { + // Write structured metadata offset and length + eb.reset() + + eb.putBE64int(int(binary.BigEndian.Uint64(b[len(b)-32:]))) + eb.putBE64int(int(binary.BigEndian.Uint64(b[len(b)-24:]))) + _, err = w.Write(eb.get()) + require.NoError(t, err) + } + + // Write the metasOffset. + eb.reset() + if chk.format >= ChunkFormatV4 { + eb.putBE64int(metasLen) + } + eb.putBE64int(int(metasOffset)) + _, err = w.Write(eb.get()) + require.NoError(t, err) + // END - code copied from writeTo func + + // build chunk using pre-block meta section + rewritten remainder of the chunk with incorrect offset for a block + chkWithIncorrectOffset := make([]byte, int(metasOffset)+w.Len()) + copy(chkWithIncorrectOffset, b[:metasOffset]) + copy(chkWithIncorrectOffset[metasOffset:], w.Bytes()) + + // decoding the problematic chunk should succeed + decodedChkWithIncorrectOffset, err := newByteChunk(chkWithIncorrectOffset, blockSize, testTargetSize, false) + require.NoError(t, err) + + require.Len(t, decodedChkWithIncorrectOffset.blocks, len(chk.blocks)) + + // both chunks should have same log lines + origChunkItr, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{})) + require.NoError(t, err) + + corruptChunkItr, err := decodedChkWithIncorrectOffset.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{})) + require.NoError(t, err) + + numEntriesFound := 0 + for origChunkItr.Next() { + numEntriesFound++ + require.True(t, corruptChunkItr.Next()) + require.Equal(t, origChunkItr.At(), corruptChunkItr.At()) + } + + require.False(t, corruptChunkItr.Next()) + require.Equal(t, 3, numEntriesFound) + }) + } + }) + } +}