From d03c6297004870c404f1aceb55a1d3e90886e284 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Fri, 13 Nov 2020 14:09:47 -0500 Subject: [PATCH] v3 of chunk encoding serializes all block data --- pkg/chunkenc/memchunk.go | 38 ++++++++++----- pkg/chunkenc/memchunk_test.go | 90 ++++++++++++++++++++++------------- 2 files changed, 84 insertions(+), 44 deletions(-) diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 27d596758e0f1..b6706be0f50d4 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -26,13 +26,15 @@ import ( const ( blocksPerChunk = 10 maxLineLength = 1024 * 1024 * 1024 + + _ byte = iota + chunkFormatV1 + chunkFormatV2 + chunkFormatV3 ) var ( magicNumber = uint32(0x12EE56A) - - chunkFormatV1 = byte(1) - chunkFormatV2 = byte(2) ) // The table gets initialized with sync.Once but may still cause a race @@ -95,6 +97,15 @@ func (hb *headBlock) isEmpty() bool { return len(hb.entries) == 0 } +func (hb *headBlock) clear() { + if hb.entries != nil { + hb.entries = hb.entries[:0] + } + hb.size = 0 + hb.mint = 0 + hb.maxt = 0 +} + func (hb *headBlock) append(ts int64, line string) error { if !hb.isEmpty() && hb.maxt > ts { return ErrOutOfOrder @@ -154,7 +165,7 @@ func NewMemChunk(enc Encoding, blockSize, targetSize int) *MemChunk { blocks: []block{}, head: &headBlock{}, - format: chunkFormatV2, + format: chunkFormatV3, encoding: enc, } @@ -183,8 +194,8 @@ func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) { switch version { case chunkFormatV1: bc.encoding = EncGZIP - case chunkFormatV2: - // format v2 has a byte for block encoding. + case chunkFormatV2, chunkFormatV3: + // format v2+ has a byte for block encoding. enc := Encoding(db.byte()) if db.err() != nil { return nil, errors.Wrap(db.err(), "verifying encoding") @@ -218,6 +229,9 @@ func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) { // Read offset and length. blk.offset = db.uvarint() + if version == chunkFormatV3 { + blk.uncompressedSize = db.uvarint() + } l := db.uvarint() blk.b = b[blk.offset : blk.offset+l] @@ -259,8 +273,8 @@ func (c *MemChunk) BytesWith(b []byte) ([]byte, error) { // Write the header (magicNum + version). eb.putBE32(magicNumber) eb.putByte(c.format) - if c.format == chunkFormatV2 { - // chunk format v2 has a byte for encoding. + if c.format > chunkFormatV1 { + // chunk format v2+ has a byte for encoding. eb.putByte(byte(c.encoding)) } @@ -296,6 +310,9 @@ func (c *MemChunk) BytesWith(b []byte) ([]byte, error) { eb.putVarint64(b.mint) eb.putVarint64(b.maxt) eb.putUvarint(b.offset) + if c.format == chunkFormatV3 { + eb.putUvarint(b.uncompressedSize) + } eb.putUvarint(len(b.b)) } eb.putHash(crc32Hash) @@ -441,10 +458,7 @@ func (c *MemChunk) cut() error { c.cutBlockSize += len(b) - c.head.entries = c.head.entries[:0] - c.head.mint = 0 // Will be set on first append. - c.head.size = 0 - + c.head.clear() return nil } diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index c9a96d1299f3e..f04b9b79a4c5e 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -207,53 +207,79 @@ func TestReadFormatV1(t *testing.T) { // 2) []byte loaded chunks <-> []byte loaded chunks func TestRoundtripV2(t *testing.T) { for _, enc := range testEncoding { - t.Run(enc.String(), func(t *testing.T) { - c := NewMemChunk(enc, testBlockSize, testTargetSize) - populated := fillChunk(c) + for _, version := range []byte{chunkFormatV2, chunkFormatV3} { + t.Run(enc.String(), func(t *testing.T) { + c := NewMemChunk(enc, testBlockSize, testTargetSize) + c.format = version + populated := fillChunk(c) - assertLines := func(c *MemChunk) { - require.Equal(t, enc, c.Encoding()) - it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline) + assertLines := func(c *MemChunk) { + require.Equal(t, enc, c.Encoding()) + it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline) + if err != nil { + t.Fatal(err) + } + + i := int64(0) + var data int64 + for it.Next() { + require.Equal(t, i, it.Entry().Timestamp.UnixNano()) + require.Equal(t, testdata.LogString(i), it.Entry().Line) + + data += int64(len(it.Entry().Line)) + i++ + } + require.Equal(t, populated, data) + } + + assertLines(c) + + // test MemChunk -> NewByteChunk loading + b, err := c.Bytes() if err != nil { t.Fatal(err) } - i := int64(0) - var data int64 - for it.Next() { - require.Equal(t, i, it.Entry().Timestamp.UnixNano()) - require.Equal(t, testdata.LogString(i), it.Entry().Line) - - data += int64(len(it.Entry().Line)) - i++ + r, err := NewByteChunk(b, testBlockSize, testTargetSize) + if err != nil { + t.Fatal(err) } - require.Equal(t, populated, data) - } + assertLines(r) - assertLines(c) + // test NewByteChunk -> NewByteChunk loading + rOut, err := r.Bytes() + require.Nil(t, err) - // test MemChunk -> NewByteChunk loading - b, err := c.Bytes() - if err != nil { - t.Fatal(err) - } + loaded, err := NewByteChunk(rOut, testBlockSize, testTargetSize) + require.Nil(t, err) - r, err := NewByteChunk(b, testBlockSize, testTargetSize) - if err != nil { - t.Fatal(err) - } - assertLines(r) + assertLines(loaded) + }) + } + + } +} - // test NewByteChunk -> NewByteChunk loading - rOut, err := r.Bytes() +func TestRoundtripV3(t *testing.T) { + + for _, enc := range testEncoding { + t.Run(enc.String(), func(t *testing.T) { + c := NewMemChunk(enc, testBlockSize, testTargetSize) + _ = fillChunk(c) + + b, err := c.Bytes() + require.Nil(t, err) + r, err := NewByteChunk(b, testBlockSize, testTargetSize) require.Nil(t, err) - loaded, err := NewByteChunk(rOut, testBlockSize, testTargetSize) + // have to populate then clear the head block or fail comparing against nil vs zero values + err = r.head.append(1, "1") require.Nil(t, err) + r.head.clear() - assertLines(loaded) - }) + require.Equal(t, c, r) + }) } }