Skip to content

Commit

Permalink
perf(blooms): Remove compression of .tar archived bloom blocks (#14159
Browse files Browse the repository at this point in the history
)

Decompression is a CPU intensive task, especially un-gzipping. The gain of compressing a tar archive of storage optimized binary blocks is neglectable (question: is it?).

In this example, the block of ~170MiB is ~3.3MiB bigger when not compressed, which is a ratio of ~2%

```bash
$ ls -las 2bc017f79711e12a-2bffc5dcc0e8e964_1726004114913-1726106283939-bc42f529.tar   
177048 -rw-rw-r-- 1 christian christian 181293056 Sep 18 13:49 2bc017f79711e12a-2bffc5dcc0e8e964_1726004114913-1726106283939-bc42f529.tar

$ ls -las 2bc017f79711e12a-2bffc5dcc0e8e964_1726004114913-1726106283939-bc42f529.tar.gz
173732 -rw-rw-r-- 1 christian christian 177897689 Sep 18 13:49 2bc017f79711e12a-2bffc5dcc0e8e964_1726004114913-1726106283939-bc42f529.tar.gz

$ qalc '(181293056 - 177897689) / 1024/ 1024'
((181293056 − 177897689) / 1024) / 1024 ≈ 3.238074303

$ qalc '181293056 / 177897689'
181293056 / 177897689 ≈ 1.019086066
```

After some consideration, we decided to store the encoding of the bloom block in the `BlockRef`. This means, that the changes in this PR do not break compatibility with existing blocks compressed with gzip, although new blocks will not be compressed any more.
However, the PR adds support for different compression algorithms, such as gzip, snappy, lz4, flate, and zstd. Compression is not configurable yet.

---

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Sep 20, 2024
1 parent 3f47f09 commit cdf084f
Show file tree
Hide file tree
Showing 18 changed files with 387 additions and 231 deletions.
5 changes: 4 additions & 1 deletion pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ import (
"github.com/grafana/loki/v3/pkg/util/ring"
)

// TODO(chaudum): Make configurable via (per-tenant?) setting.
var blockCompressionAlgo = compression.EncNone

type Builder struct {
services.Service

Expand Down Expand Up @@ -404,7 +407,7 @@ func (b *Builder) processTask(
blockCt++
blk := newBlocks.At()

built, err := bloomshipper.BlockFrom(tenant, task.Table.Addr(), blk)
built, err := bloomshipper.BlockFrom(blockCompressionAlgo, tenant, task.Table.Addr(), blk)
if err != nil {
level.Error(logger).Log("msg", "failed to build block", "err", err)
if err = blk.Reader().Cleanup(); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func genBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) {
block := v1.NewBlock(reader, v1.NewMetrics(nil))

buf := bytes.NewBuffer(nil)
if err := v1.TarGz(buf, block.Reader()); err != nil {
if err := v1.TarCompress(ref.Encoding, buf, block.Reader()); err != nil {
return bloomshipper.Block{}, err
}

Expand Down Expand Up @@ -1019,7 +1019,7 @@ func Test_deleteOutdatedMetas(t *testing.T) {
} {
t.Run(tc.name, func(t *testing.T) {
logger := log.NewNopLogger()
//logger := log.NewLogfmtLogger(os.Stdout)
// logger := log.NewLogfmtLogger(os.Stdout)

cfg := Config{
PlanningInterval: 1 * time.Hour,
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
Through: now,
Refs: groupRefs(t, chunkRefs),
Plan: plan.QueryPlan{AST: expr},
Blocks: []string{"bloom/invalid/block.tar.gz"},
Blocks: []string{"bloom/invalid/block.tar"},
}

ctx := user.InjectOrgID(context.Background(), tenantID)
Expand Down
5 changes: 1 addition & 4 deletions pkg/compression/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type Encoding byte
const (
EncNone Encoding = iota
EncGZIP
EncDumb
EncDumb // not supported
EncLZ4_64k
EncSnappy
EncLZ4_256k
Expand Down Expand Up @@ -41,8 +41,6 @@ func (e Encoding) String() string {
return "gzip"
case EncNone:
return "none"
case EncDumb:
return "dumb"
case EncLZ4_64k:
return "lz4-64k"
case EncLZ4_256k:
Expand Down Expand Up @@ -70,7 +68,6 @@ func ParseEncoding(enc string) (Encoding, error) {
}
}
return 0, fmt.Errorf("invalid encoding: %s, supported: %s", enc, SupportedEncoding())

}

// SupportedEncoding returns the list of supported Encoding.
Expand Down
50 changes: 50 additions & 0 deletions pkg/compression/fileext.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package compression

import "fmt"

const (
ExtNone = ""
ExtGZIP = ".gz"
ExtSnappy = ".sz"
ExtLZ4 = ".lz4"
ExtFlate = ".zz"
ExtZstd = ".zst"
)

func ToFileExtension(e Encoding) string {
switch e {
case EncNone:
return ExtNone
case EncGZIP:
return ExtGZIP
case EncLZ4_64k, EncLZ4_256k, EncLZ4_1M, EncLZ4_4M:
return ExtLZ4
case EncSnappy:
return ExtSnappy
case EncFlate:
return ExtFlate
case EncZstd:
return ExtZstd
default:
panic(fmt.Sprintf("invalid encoding: %d, supported: %s", e, SupportedEncoding()))
}
}

func FromFileExtension(ext string) Encoding {
switch ext {
case ExtNone:
return EncNone
case ExtGZIP:
return EncGZIP
case ExtLZ4:
return EncLZ4_4M
case ExtSnappy:
return EncSnappy
case ExtFlate:
return EncFlate
case ExtZstd:
return EncZstd
default:
panic(fmt.Sprintf("invalid file extension: %s", ext))
}
}
36 changes: 27 additions & 9 deletions pkg/storage/bloom/v1/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,34 @@ import (
"github.com/grafana/loki/v3/pkg/compression"
)

const (
ExtTar = ".tar"
)

type TarEntry struct {
Name string
Size int64
Body io.ReadSeeker
}

func TarGz(dst io.Writer, reader BlockReader) error {
func TarCompress(enc compression.Encoding, dst io.Writer, reader BlockReader) error {
comprPool := compression.GetWriterPool(enc)
comprWriter := comprPool.GetWriter(dst)
defer func() {
comprWriter.Close()
comprPool.PutWriter(comprWriter)
}()

return Tar(comprWriter, reader)
}

func Tar(dst io.Writer, reader BlockReader) error {
itr, err := reader.TarEntries()
if err != nil {
return errors.Wrap(err, "error getting tar entries")
}

gzipper := compression.GetWriterPool(compression.EncGZIP).GetWriter(dst)
defer gzipper.Close()

tarballer := tar.NewWriter(gzipper)
tarballer := tar.NewWriter(dst)
defer tarballer.Close()

for itr.Next() {
Expand All @@ -49,13 +61,19 @@ func TarGz(dst io.Writer, reader BlockReader) error {
return itr.Err()
}

func UnTarGz(dst string, r io.Reader) error {
gzipper, err := compression.GetReaderPool(compression.EncGZIP).GetReader(r)
func UnTarCompress(enc compression.Encoding, dst string, r io.Reader) error {
comprPool := compression.GetReaderPool(enc)
comprReader, err := comprPool.GetReader(r)
if err != nil {
return errors.Wrap(err, "error getting gzip reader")
return errors.Wrapf(err, "error getting %s reader", enc.String())
}
defer comprPool.PutReader(comprReader)

return UnTar(dst, comprReader)
}

tarballer := tar.NewReader(gzipper)
func UnTar(dst string, r io.Reader) error {
tarballer := tar.NewReader(r)

for {
header, err := tarballer.Next()
Expand Down
91 changes: 88 additions & 3 deletions pkg/storage/bloom/v1/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestArchive(t *testing.T) {
BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
encoding: compression.EncSnappy,
encoding: compression.EncNone,
},
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
Expand All @@ -40,9 +40,9 @@ func TestArchive(t *testing.T) {
reader := NewDirectoryBlockReader(dir1)

w := bytes.NewBuffer(nil)
require.Nil(t, TarGz(w, reader))
require.Nil(t, Tar(w, reader))

require.Nil(t, UnTarGz(dir2, w))
require.Nil(t, UnTar(dir2, w))

reader2 := NewDirectoryBlockReader(dir2)

Expand Down Expand Up @@ -78,3 +78,88 @@ func TestArchive(t *testing.T) {
require.Nil(t, err)
require.Equal(t, srcBloomsBytes, dstBloomsBytes)
}

func TestArchiveCompression(t *testing.T) {
t.Parallel()
for _, tc := range []struct {
enc compression.Encoding
}{
{compression.EncNone},
{compression.EncGZIP},
{compression.EncSnappy},
{compression.EncLZ4_64k},
{compression.EncLZ4_256k},
{compression.EncLZ4_1M},
{compression.EncLZ4_4M},
{compression.EncFlate},
{compression.EncZstd},
} {
t.Run(tc.enc.String(), func(t *testing.T) {
// for writing files to two dirs for comparison and ensuring they're equal
dir1 := t.TempDir()
dir2 := t.TempDir()

numSeries := 100
data, _ := MkBasicSeriesWithBlooms(numSeries, 0x0000, 0xffff, 0, 10000)

builder, err := NewBlockBuilder(
BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
encoding: compression.EncNone,
},
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
},
NewDirectoryBlockWriter(dir1),
)

require.Nil(t, err)
itr := v2.NewSliceIter[SeriesWithBlooms](data)
_, err = builder.BuildFrom(itr)
require.Nil(t, err)

reader := NewDirectoryBlockReader(dir1)

w := bytes.NewBuffer(nil)
require.Nil(t, TarCompress(tc.enc, w, reader))

require.Nil(t, UnTarCompress(tc.enc, dir2, w))

reader2 := NewDirectoryBlockReader(dir2)

// Check Index is byte for byte equivalent
srcIndex, err := reader.Index()
require.Nil(t, err)
_, err = srcIndex.Seek(0, io.SeekStart)
require.Nil(t, err)
dstIndex, err := reader2.Index()
require.Nil(t, err)
_, err = dstIndex.Seek(0, io.SeekStart)
require.Nil(t, err)

srcIndexBytes, err := io.ReadAll(srcIndex)
require.Nil(t, err)
dstIndexBytes, err := io.ReadAll(dstIndex)
require.Nil(t, err)
require.Equal(t, srcIndexBytes, dstIndexBytes)

// Check Blooms is byte for byte equivalent
srcBlooms, err := reader.Blooms()
require.Nil(t, err)
_, err = srcBlooms.Seek(0, io.SeekStart)
require.Nil(t, err)
dstBlooms, err := reader2.Blooms()
require.Nil(t, err)
_, err = dstBlooms.Seek(0, io.SeekStart)
require.Nil(t, err)

srcBloomsBytes, err := io.ReadAll(srcBlooms)
require.Nil(t, err)
dstBloomsBytes, err := io.ReadAll(dstBlooms)
require.Nil(t, err)
require.Equal(t, srcBloomsBytes, dstBloomsBytes)

})
}
}
8 changes: 4 additions & 4 deletions pkg/storage/stores/shipper/bloomshipper/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,15 @@ func loadBlockDirectories(root string, logger log.Logger) (keys []string, values
return nil
}

ref, err := resolver.ParseBlockKey(key(path))
// The block file extension (.tar) needs to be added so the key can be parsed.
// This is because the extension is stripped off when the tar archive is extracted.
ref, err := resolver.ParseBlockKey(key(path + blockExtension))
if err != nil {
return nil
}

if ok, clean := isBlockDir(path, logger); ok {
// the cache key must not contain the directory prefix
// therefore we use the defaultKeyResolver to resolve the block's address
key := defaultKeyResolver{}.Block(ref).Addr()
key := cacheKey(ref)
keys = append(keys, key)
values = append(values, NewBlockDirectory(ref, path))
level.Debug(logger).Log("msg", "found block directory", "path", path, "key", key)
Expand Down
8 changes: 5 additions & 3 deletions pkg/storage/stores/shipper/bloomshipper/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/go-kit/log"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/compression"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config"
)
Expand Down Expand Up @@ -63,7 +64,8 @@ func Test_LoadBlocksDirIntoCache(t *testing.T) {
wd := t.TempDir()

// plain file
fp, _ := os.Create(filepath.Join(wd, "regular-file.tar.gz"))
ext := blockExtension + compression.ExtGZIP
fp, _ := os.Create(filepath.Join(wd, "regular-file"+ext))
fp.Close()

// invalid directory
Expand Down Expand Up @@ -99,8 +101,8 @@ func Test_LoadBlocksDirIntoCache(t *testing.T) {

require.Equal(t, 1, len(c.entries))

key := validDir + ".tar.gz" // cache key must not contain directory prefix
elem, found := c.entries[key]
// cache key does neither contain directory prefix nor file extension suffix
elem, found := c.entries[validDir]
require.True(t, found)
blockDir := elem.Value.(*Entry).Value
require.Equal(t, filepath.Join(wd, validDir), blockDir.Path)
Expand Down
Loading

0 comments on commit cdf084f

Please sign in to comment.