From c8966c7365b60eec306aa75cc0430db89298e1b2 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 6 Jun 2024 12:35:45 -0700 Subject: [PATCH] Revert "perf: Introduce fixed size memory pool for bloom querier (#13039)" This reverts commit fc264310ce64fc082965a5d7f036e45a5a399c61. --- pkg/bloombuild/builder/spec_test.go | 4 +- pkg/bloomcompactor/spec_test.go | 4 +- pkg/bloomgateway/bloomgateway_test.go | 44 ++++++ pkg/bloomgateway/util_test.go | 2 +- pkg/loki/modules.go | 15 -- pkg/storage/bloom/v1/block.go | 8 +- pkg/storage/bloom/v1/bloom.go | 43 ++---- pkg/storage/bloom/v1/bloom_querier.go | 30 ++-- pkg/storage/bloom/v1/builder_test.go | 12 +- pkg/storage/bloom/v1/fuse_test.go | 6 +- pkg/storage/bloom/v1/index.go | 2 +- pkg/storage/bloom/v1/util.go | 31 +--- .../stores/shipper/bloomshipper/cache.go | 16 +-- .../shipper/bloomshipper/config/config.go | 67 --------- pkg/util/flagext/csv.go | 62 -------- pkg/util/flagext/csv_test.go | 79 ---------- pkg/util/mempool/bucket.go | 51 ------- pkg/util/mempool/metrics.go | 32 ----- pkg/util/mempool/pool.go | 135 ------------------ pkg/util/mempool/pool_test.go | 133 ----------------- tools/bloom/inspector/main.go | 2 +- 21 files changed, 98 insertions(+), 680 deletions(-) delete mode 100644 pkg/util/flagext/csv.go delete mode 100644 pkg/util/flagext/csv_test.go delete mode 100644 pkg/util/mempool/bucket.go delete mode 100644 pkg/util/mempool/metrics.go delete mode 100644 pkg/util/mempool/pool.go delete mode 100644 pkg/util/mempool/pool_test.go diff --git a/pkg/bloombuild/builder/spec_test.go b/pkg/bloombuild/builder/spec_test.go index 3397a2f60bfe4..40225dc45865b 100644 --- a/pkg/bloombuild/builder/spec_test.go +++ b/pkg/bloombuild/builder/spec_test.go @@ -74,7 +74,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser for i, b := range blocks { bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{ BlockRef: refs[i], - BlockQuerier: v1.NewBlockQuerier(b, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize), + BlockQuerier: v1.NewBlockQuerier(b, false, v1.DefaultMaxPageSize), }) } @@ -152,7 +152,7 @@ func TestSimpleBloomGenerator(t *testing.T) { expectedRefs := v1.PointerSlice(data) outputRefs := make([]*v1.SeriesWithBloom, 0, len(data)) for _, block := range outputBlocks { - bq := v1.NewBlockQuerier(block, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize) + bq := v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize) for bq.Next() { outputRefs = append(outputRefs, bq.At()) } diff --git a/pkg/bloomcompactor/spec_test.go b/pkg/bloomcompactor/spec_test.go index 373b99de68ad3..7e39b8dec57f0 100644 --- a/pkg/bloomcompactor/spec_test.go +++ b/pkg/bloomcompactor/spec_test.go @@ -74,7 +74,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser for i, b := range blocks { bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{ BlockRef: refs[i], - BlockQuerier: v1.NewBlockQuerier(b, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize), + BlockQuerier: v1.NewBlockQuerier(b, false, v1.DefaultMaxPageSize), }) } @@ -152,7 +152,7 @@ func TestSimpleBloomGenerator(t *testing.T) { expectedRefs := v1.PointerSlice(data) outputRefs := make([]*v1.SeriesWithBloom, 0, len(data)) for _, block := range outputBlocks { - bq := v1.NewBlockQuerier(block, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize) + bq := v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize) for bq.Next() { outputRefs = append(outputRefs, bq.At()) } diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index 8663bcf079590..15c9ca2be2d85 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -215,6 +215,50 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { } }) + t.Run("request cancellation does not result in channel locking", func(t *testing.T) { + now := mktime("2024-01-25 10:00") + + // replace store implementation and re-initialize workers and sub-services + refs, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff) + mockStore := newMockBloomStore(queriers, metas) + mockStore.delay = 2000 * time.Millisecond + + reg := prometheus.NewRegistry() + gw, err := New(cfg, mockStore, logger, reg) + require.NoError(t, err) + + err = services.StartAndAwaitRunning(context.Background(), gw) + require.NoError(t, err) + t.Cleanup(func() { + err = services.StopAndAwaitTerminated(context.Background(), gw) + require.NoError(t, err) + }) + + chunkRefs := createQueryInputFromBlockData(t, tenantID, data, 100) + + // saturate workers + // then send additional request + for i := 0; i < gw.cfg.WorkerConcurrency+1; i++ { + expr, err := syntax.ParseExpr(`{foo="bar"} |= "does not match"`) + require.NoError(t, err) + + req := &logproto.FilterChunkRefRequest{ + From: now.Add(-24 * time.Hour), + Through: now, + Refs: groupRefs(t, chunkRefs), + Plan: plan.QueryPlan{AST: expr}, + Blocks: stringSlice(refs), + } + + ctx, cancelFn := context.WithTimeout(context.Background(), 500*time.Millisecond) + ctx = user.InjectOrgID(ctx, tenantID) + t.Cleanup(cancelFn) + + res, err := gw.FilterChunkRefs(ctx, req) + require.ErrorContainsf(t, err, context.DeadlineExceeded.Error(), "%+v", res) + } + }) + t.Run("returns unfiltered chunk refs if no filters provided", func(t *testing.T) { now := mktime("2023-10-03 10:00") diff --git a/pkg/bloomgateway/util_test.go b/pkg/bloomgateway/util_test.go index 4bd9d9609d647..a3f219c326efd 100644 --- a/pkg/bloomgateway/util_test.go +++ b/pkg/bloomgateway/util_test.go @@ -399,7 +399,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time, // } // } querier := &bloomshipper.CloseableBlockQuerier{ - BlockQuerier: v1.NewBlockQuerier(block, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize), + BlockQuerier: v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize), BlockRef: blockRef, } queriers = append(queriers, querier) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 8f91e0d754427..22cd46743ea27 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -35,7 +35,6 @@ import ( "github.com/grafana/loki/v3/pkg/bloomcompactor" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" - v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/types" "github.com/grafana/loki/v3/pkg/analytics" @@ -80,7 +79,6 @@ import ( "github.com/grafana/loki/v3/pkg/util/httpreq" "github.com/grafana/loki/v3/pkg/util/limiter" util_log "github.com/grafana/loki/v3/pkg/util/log" - "github.com/grafana/loki/v3/pkg/util/mempool" "github.com/grafana/loki/v3/pkg/util/querylimits" lokiring "github.com/grafana/loki/v3/pkg/util/ring" serverutil "github.com/grafana/loki/v3/pkg/util/server" @@ -732,19 +730,6 @@ func (t *Loki) initBloomStore() (services.Service, error) { reg := prometheus.DefaultRegisterer bsCfg := t.Cfg.StorageConfig.BloomShipperConfig - // Set global BloomPageAllocator variable - switch bsCfg.MemoryManagement.BloomPageAllocationType { - case "simple": - bloomshipper.BloomPageAllocator = &v1.SimpleHeapAllocator{} - case "dynamic": - bloomshipper.BloomPageAllocator = v1.BloomPagePool - case "fixed": - bloomshipper.BloomPageAllocator = mempool.New("bloom-page-pool", bsCfg.MemoryManagement.BloomPageMemPoolBuckets, reg) - default: - // do nothing - bloomshipper.BloomPageAllocator = nil - } - var metasCache cache.Cache if t.Cfg.isTarget(IndexGateway) && cache.IsCacheConfigured(bsCfg.MetasCache) { metasCache, err = cache.New(bsCfg.MetasCache, reg, logger, stats.BloomMetasCache, constants.Loki) diff --git a/pkg/storage/bloom/v1/block.go b/pkg/storage/bloom/v1/block.go index 042c55a7a0666..ba661de79c498 100644 --- a/pkg/storage/bloom/v1/block.go +++ b/pkg/storage/bloom/v1/block.go @@ -117,11 +117,11 @@ type BlockQuerier struct { // will be returned to the pool for efficiency. This can only safely be used // when the underlying bloom bytes don't escape the decoder, i.e. // when loading blooms for querying (bloom-gw) but not for writing (bloom-compactor). -func NewBlockQuerier(b *Block, alloc Allocator, maxPageSize int) *BlockQuerier { +func NewBlockQuerier(b *Block, noCapture bool, maxPageSize int) *BlockQuerier { return &BlockQuerier{ block: b, series: NewLazySeriesIter(b), - blooms: NewLazyBloomIter(b, alloc, maxPageSize), + blooms: NewLazyBloomIter(b, noCapture, maxPageSize), } } @@ -173,7 +173,3 @@ func (bq *BlockQuerier) Err() error { return bq.blooms.Err() } - -func (bq *BlockQuerier) Close() { - bq.blooms.Close() -} diff --git a/pkg/storage/bloom/v1/bloom.go b/pkg/storage/bloom/v1/bloom.go index fc39133e81b05..aa51762d4e4ec 100644 --- a/pkg/storage/bloom/v1/bloom.go +++ b/pkg/storage/bloom/v1/bloom.go @@ -24,7 +24,7 @@ type Bloom struct { func (b *Bloom) Encode(enc *encoding.Encbuf) error { // divide by 8 b/c bloom capacity is measured in bits, but we want bytes - buf := bytes.NewBuffer(make([]byte, 0, int(b.Capacity()/8))) + buf := bytes.NewBuffer(BloomPagePool.Get(int(b.Capacity() / 8))) // TODO(owen-d): have encoder implement writer directly so we don't need // to indirect via a buffer @@ -36,6 +36,7 @@ func (b *Bloom) Encode(enc *encoding.Encbuf) error { data := buf.Bytes() enc.PutUvarint(len(data)) // length of bloom filter enc.PutBytes(data) + BloomPagePool.Put(data[:0]) // release to pool return nil } @@ -63,14 +64,11 @@ func (b *Bloom) Decode(dec *encoding.Decbuf) error { return nil } -func LazyDecodeBloomPage(r io.Reader, alloc Allocator, pool chunkenc.ReaderPool, page BloomPageHeader) (*BloomPageDecoder, error) { - data, err := alloc.Get(page.Len) - if err != nil { - return nil, errors.Wrap(err, "allocating buffer") - } - defer alloc.Put(data) +func LazyDecodeBloomPage(r io.Reader, pool chunkenc.ReaderPool, page BloomPageHeader) (*BloomPageDecoder, error) { + data := BloomPagePool.Get(page.Len)[:page.Len] + defer BloomPagePool.Put(data) - _, err = io.ReadFull(r, data) + _, err := io.ReadFull(r, data) if err != nil { return nil, errors.Wrap(err, "reading bloom page") } @@ -86,10 +84,7 @@ func LazyDecodeBloomPage(r io.Reader, alloc Allocator, pool chunkenc.ReaderPool, } defer pool.PutReader(decompressor) - b, err := alloc.Get(page.DecompressedLen) - if err != nil { - return nil, errors.Wrap(err, "allocating buffer") - } + b := BloomPagePool.Get(page.DecompressedLen)[:page.DecompressedLen] if _, err = io.ReadFull(decompressor, b); err != nil { return nil, errors.Wrap(err, "decompressing bloom page") @@ -101,18 +96,14 @@ func LazyDecodeBloomPage(r io.Reader, alloc Allocator, pool chunkenc.ReaderPool, } // shortcut to skip allocations when we know the page is not compressed -func LazyDecodeBloomPageNoCompression(r io.Reader, alloc Allocator, page BloomPageHeader) (*BloomPageDecoder, error) { +func LazyDecodeBloomPageNoCompression(r io.Reader, page BloomPageHeader) (*BloomPageDecoder, error) { // data + checksum if page.Len != page.DecompressedLen+4 { return nil, errors.New("the Len and DecompressedLen of the page do not match") } + data := BloomPagePool.Get(page.Len)[:page.Len] - data, err := alloc.Get(page.Len) - if err != nil { - return nil, errors.Wrap(err, "allocating buffer") - } - - _, err = io.ReadFull(r, data) + _, err := io.ReadFull(r, data) if err != nil { return nil, errors.Wrap(err, "reading bloom page") } @@ -167,16 +158,12 @@ type BloomPageDecoder struct { // This can only safely be used when the underlying bloom // bytes don't escape the decoder: // on reads in the bloom-gw but not in the bloom-compactor -func (d *BloomPageDecoder) Relinquish(alloc Allocator) { - if d == nil { - return - } - +func (d *BloomPageDecoder) Relinquish() { data := d.data d.data = nil if cap(data) > 0 { - _ = alloc.Put(data) + BloomPagePool.Put(data) } } @@ -290,7 +277,7 @@ func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) (uint32, error) { // BloomPageDecoder returns a decoder for the given page index. // It may skip the page if it's too large. // NB(owen-d): if `skip` is true, err _must_ be nil. -func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, alloc Allocator, pageIdx int, maxPageSize int, metrics *Metrics) (res *BloomPageDecoder, skip bool, err error) { +func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize int, metrics *Metrics) (res *BloomPageDecoder, skip bool, err error) { if pageIdx < 0 || pageIdx >= len(b.pageHeaders) { metrics.pagesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Inc() metrics.bytesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Add(float64(b.pageHeaders[pageIdx].DecompressedLen)) @@ -313,9 +300,9 @@ func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, alloc Allocator, pageIdx } if b.schema.encoding == chunkenc.EncNone { - res, err = LazyDecodeBloomPageNoCompression(r, alloc, page) + res, err = LazyDecodeBloomPageNoCompression(r, page) } else { - res, err = LazyDecodeBloomPage(r, alloc, b.schema.DecompressorPool(), page) + res, err = LazyDecodeBloomPage(r, b.schema.DecompressorPool(), page) } if err != nil { diff --git a/pkg/storage/bloom/v1/bloom_querier.go b/pkg/storage/bloom/v1/bloom_querier.go index b90bae8a046bb..8de9a33e713f0 100644 --- a/pkg/storage/bloom/v1/bloom_querier.go +++ b/pkg/storage/bloom/v1/bloom_querier.go @@ -7,11 +7,11 @@ type BloomQuerier interface { } type LazyBloomIter struct { + usePool bool + b *Block m int // max page size in bytes - alloc Allocator - // state initialized bool err error @@ -24,11 +24,11 @@ type LazyBloomIter struct { // will be returned to the pool for efficiency. // This can only safely be used when the underlying bloom // bytes don't escape the decoder. -func NewLazyBloomIter(b *Block, alloc Allocator, maxSize int) *LazyBloomIter { +func NewLazyBloomIter(b *Block, pool bool, maxSize int) *LazyBloomIter { return &LazyBloomIter{ - b: b, - m: maxSize, - alloc: alloc, + usePool: pool, + b: b, + m: maxSize, } } @@ -53,14 +53,16 @@ func (it *LazyBloomIter) LoadOffset(offset BloomOffset) (skip bool) { // drop the current page if it exists and // we're using the pool - it.curPage.Relinquish(it.alloc) + if it.curPage != nil && it.usePool { + it.curPage.Relinquish() + } r, err := it.b.reader.Blooms() if err != nil { it.err = errors.Wrap(err, "getting blooms reader") return false } - decoder, skip, err := it.b.blooms.BloomPageDecoder(r, it.alloc, offset.Page, it.m, it.b.metrics) + decoder, skip, err := it.b.blooms.BloomPageDecoder(r, offset.Page, it.m, it.b.metrics) if err != nil { it.err = errors.Wrap(err, "loading bloom page") return false @@ -104,7 +106,6 @@ func (it *LazyBloomIter) next() bool { var skip bool it.curPage, skip, err = it.b.blooms.BloomPageDecoder( r, - it.alloc, it.curPageIndex, it.m, it.b.metrics, @@ -129,8 +130,11 @@ func (it *LazyBloomIter) next() bool { // we've exhausted the current page, progress to next it.curPageIndex++ - // drop the current page if it exists - it.curPage.Relinquish(it.alloc) + // drop the current page if it exists and + // we're using the pool + if it.usePool { + it.curPage.Relinquish() + } it.curPage = nil continue } @@ -157,7 +161,3 @@ func (it *LazyBloomIter) Err() error { return nil } } - -func (it *LazyBloomIter) Close() { - it.curPage.Relinquish(it.alloc) -} diff --git a/pkg/storage/bloom/v1/builder_test.go b/pkg/storage/bloom/v1/builder_test.go index 45461824970ab..56d03cbd7c930 100644 --- a/pkg/storage/bloom/v1/builder_test.go +++ b/pkg/storage/bloom/v1/builder_test.go @@ -117,7 +117,7 @@ func TestBlockBuilder_RoundTrip(t *testing.T) { } block := NewBlock(tc.reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize) + querier := NewBlockQuerier(block, false, DefaultMaxPageSize) err = block.LoadHeaders() require.Nil(t, err) @@ -218,7 +218,7 @@ func TestMergeBuilder(t *testing.T) { itr := NewSliceIter[SeriesWithBloom](data[min:max]) _, err = builder.BuildFrom(itr) require.Nil(t, err) - blocks = append(blocks, NewPeekingIter[*SeriesWithBloom](NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), &SimpleHeapAllocator{}, DefaultMaxPageSize))) + blocks = append(blocks, NewPeekingIter[*SeriesWithBloom](NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), false, DefaultMaxPageSize))) } // We're not testing the ability to extend a bloom in this test @@ -252,7 +252,7 @@ func TestMergeBuilder(t *testing.T) { require.Nil(t, err) block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize) + querier := NewBlockQuerier(block, false, DefaultMaxPageSize) EqualIterators[*SeriesWithBloom]( t, @@ -296,7 +296,7 @@ func TestBlockReset(t *testing.T) { _, err = builder.BuildFrom(itr) require.Nil(t, err) block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize) + querier := NewBlockQuerier(block, false, DefaultMaxPageSize) rounds := make([][]model.Fingerprint, 2) @@ -362,7 +362,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) { _, err = builder.BuildFrom(itr) require.Nil(t, err) block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize) + querier := NewBlockQuerier(block, false, DefaultMaxPageSize) // rather than use the block querier directly, collect it's data // so we can use it in a few places later @@ -423,7 +423,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) { // ensure the new block contains one copy of all the data // by comparing it against an iterator over the source data - mergedBlockQuerier := NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), &SimpleHeapAllocator{}, DefaultMaxPageSize) + mergedBlockQuerier := NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), false, DefaultMaxPageSize) sourceItr := NewSliceIter[*SeriesWithBloom](PointerSlice[SeriesWithBloom](xs)) EqualIterators[*SeriesWithBloom]( diff --git a/pkg/storage/bloom/v1/fuse_test.go b/pkg/storage/bloom/v1/fuse_test.go index 3df65a8da27c4..a0dc23001e939 100644 --- a/pkg/storage/bloom/v1/fuse_test.go +++ b/pkg/storage/bloom/v1/fuse_test.go @@ -76,7 +76,7 @@ func TestFusedQuerier(t *testing.T) { require.NoError(t, err) require.False(t, itr.Next()) block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, BloomPagePool, DefaultMaxPageSize) + querier := NewBlockQuerier(block, true, DefaultMaxPageSize) n := 2 nReqs := numSeries / n @@ -215,7 +215,7 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) { require.False(t, itr.Next()) block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, BloomPagePool, 1000) + querier := NewBlockQuerier(block, true, 1000) for fp := model.Fingerprint(0); fp < model.Fingerprint(numSeries); fp++ { err := querier.Seek(fp) @@ -264,7 +264,7 @@ func setupBlockForBenchmark(b *testing.B) (*BlockQuerier, [][]Request, []chan Ou _, err = builder.BuildFrom(itr) require.Nil(b, err) block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, BloomPagePool, DefaultMaxPageSize) + querier := NewBlockQuerier(block, true, DefaultMaxPageSize) numRequestChains := 100 seriesPerRequest := 100 diff --git a/pkg/storage/bloom/v1/index.go b/pkg/storage/bloom/v1/index.go index eed6d21ce5c9c..caadfa26ddf74 100644 --- a/pkg/storage/bloom/v1/index.go +++ b/pkg/storage/bloom/v1/index.go @@ -166,7 +166,7 @@ func (b *BlockIndex) NewSeriesPageDecoder(r io.ReadSeeker, header SeriesPageHead return nil, errors.Wrap(err, "seeking to series page") } - data, _ := SeriesPagePool.Get(header.Len) + data := SeriesPagePool.Get(header.Len)[:header.Len] defer SeriesPagePool.Put(data) _, err = io.ReadFull(r, data) if err != nil { diff --git a/pkg/storage/bloom/v1/util.go b/pkg/storage/bloom/v1/util.go index 92e64048fa5bd..22fb47e43e799 100644 --- a/pkg/storage/bloom/v1/util.go +++ b/pkg/storage/bloom/v1/util.go @@ -44,7 +44,7 @@ var ( // buffer pool for bloom pages // 128KB 256KB 512KB 1MB 2MB 4MB 8MB 16MB 32MB 64MB 128MB - BloomPagePool = &BytePool{ + BloomPagePool = BytePool{ pool: pool.New( 128<<10, 128<<20, 2, func(size int) interface{} { @@ -53,38 +53,15 @@ var ( } ) -// Allocator handles byte slices for bloom queriers. -// It exists to reduce the cost of allocations and allows to re-use already allocated memory. -type Allocator interface { - Get(size int) ([]byte, error) - Put([]byte) bool -} - -// SimpleHeapAllocator allocates a new byte slice every time and does not re-cycle buffers. -type SimpleHeapAllocator struct{} - -func (a *SimpleHeapAllocator) Get(size int) ([]byte, error) { - return make([]byte, size), nil -} - -func (a *SimpleHeapAllocator) Put([]byte) bool { - return true -} - -// BytePool uses a sync.Pool to re-cycle already allocated buffers. type BytePool struct { pool *pool.Pool } -// Get implement Allocator -func (p *BytePool) Get(size int) ([]byte, error) { - return p.pool.Get(size).([]byte)[:size], nil +func (p *BytePool) Get(size int) []byte { + return p.pool.Get(size).([]byte)[:0] } - -// Put implement Allocator -func (p *BytePool) Put(b []byte) bool { +func (p *BytePool) Put(b []byte) { p.pool.Put(b) - return true } func newCRC32() hash.Hash32 { diff --git a/pkg/storage/stores/shipper/bloomshipper/cache.go b/pkg/storage/stores/shipper/bloomshipper/cache.go index c1e3964fe8fc0..6ff6ef64948e3 100644 --- a/pkg/storage/stores/shipper/bloomshipper/cache.go +++ b/pkg/storage/stores/shipper/bloomshipper/cache.go @@ -15,8 +15,6 @@ import ( "github.com/grafana/loki/v3/pkg/util" ) -var BloomPageAllocator v1.Allocator - type CloseableBlockQuerier struct { BlockRef *v1.BlockQuerier @@ -24,7 +22,6 @@ type CloseableBlockQuerier struct { } func (c *CloseableBlockQuerier) Close() error { - c.BlockQuerier.Close() if c.close != nil { return c.close() } @@ -160,24 +157,15 @@ func (b *BlockDirectory) resolveSize() error { // BlockQuerier returns a new block querier from the directory. // The passed function `close` is called when the the returned querier is closed. + func (b BlockDirectory) BlockQuerier( usePool bool, close func() error, maxPageSize int, metrics *v1.Metrics, ) *CloseableBlockQuerier { - - var alloc v1.Allocator - if usePool && BloomPageAllocator != nil { - alloc = BloomPageAllocator - } else { - alloc = &v1.SimpleHeapAllocator{} - } - - bq := v1.NewBlockQuerier(b.Block(metrics), alloc, maxPageSize) - return &CloseableBlockQuerier{ - BlockQuerier: bq, + BlockQuerier: v1.NewBlockQuerier(b.Block(metrics), usePool, maxPageSize), BlockRef: b.BlockRef, close: close, } diff --git a/pkg/storage/stores/shipper/bloomshipper/config/config.go b/pkg/storage/stores/shipper/bloomshipper/config/config.go index 6de144a3f84bf..72d8f8557b095 100644 --- a/pkg/storage/stores/shipper/bloomshipper/config/config.go +++ b/pkg/storage/stores/shipper/bloomshipper/config/config.go @@ -4,16 +4,11 @@ package config import ( "errors" "flag" - "fmt" - "slices" - "strings" "time" "github.com/grafana/dskit/flagext" "github.com/grafana/loki/v3/pkg/storage/chunk/cache" - lokiflagext "github.com/grafana/loki/v3/pkg/util/flagext" - "github.com/grafana/loki/v3/pkg/util/mempool" ) type Config struct { @@ -23,7 +18,6 @@ type Config struct { BlocksCache BlocksCacheConfig `yaml:"blocks_cache"` MetasCache cache.Config `yaml:"metas_cache"` MetasLRUCache cache.EmbeddedCacheConfig `yaml:"metas_lru_cache"` - MemoryManagement MemoryManagementConfig `yaml:"memory_management" doc:"hidden"` // This will always be set to true when flags are registered. // In tests, where config is created as literal, it can be set manually. @@ -40,7 +34,6 @@ func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { c.BlocksCache.RegisterFlagsWithPrefixAndDefaults(prefix+"blocks-cache.", "Cache for bloom blocks. ", f, 24*time.Hour) c.MetasCache.RegisterFlagsWithPrefix(prefix+"metas-cache.", "Cache for bloom metas. ", f) c.MetasLRUCache.RegisterFlagsWithPrefix(prefix+"metas-lru-cache.", "In-memory LRU cache for bloom metas. ", f) - c.MemoryManagement.RegisterFlagsWithPrefix(prefix+"memory-management.", f) // always cache LIST operations c.CacheListOps = true @@ -50,9 +43,6 @@ func (c *Config) Validate() error { if len(c.WorkingDirectory) == 0 { return errors.New("at least one working directory must be specified") } - if err := c.MemoryManagement.Validate(); err != nil { - return err - } return nil } @@ -91,60 +81,3 @@ func (cfg *BlocksCacheConfig) Validate() error { } return nil } - -var ( - // the default that describes a 4GiB memory pool - defaultMemPoolBuckets = mempool.Buckets{ - {Size: 128, Capacity: 64 << 10}, // 8MiB -- for tests - {Size: 512, Capacity: 2 << 20}, // 1024MiB - {Size: 128, Capacity: 8 << 20}, // 1024MiB - {Size: 32, Capacity: 32 << 20}, // 1024MiB - {Size: 8, Capacity: 128 << 20}, // 1024MiB - } - types = supportedAllocationTypes{ - "simple", "simple heap allocations using Go's make([]byte, n) and no re-cycling of buffers", - "dynamic", "a buffer pool with variable sized buckets and best effort re-cycling of buffers using Go's sync.Pool", - "fixed", "a fixed size memory pool with configurable slab sizes, see mem-pool-buckets", - } -) - -type MemoryManagementConfig struct { - BloomPageAllocationType string `yaml:"bloom_page_alloc_type"` - BloomPageMemPoolBuckets lokiflagext.CSV[mempool.Bucket] `yaml:"bloom_page_mem_pool_buckets"` -} - -func (cfg *MemoryManagementConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - f.StringVar(&cfg.BloomPageAllocationType, prefix+"alloc-type", "dynamic", fmt.Sprintf("One of: %s", strings.Join(types.descriptions(), ", "))) - - _ = cfg.BloomPageMemPoolBuckets.Set(defaultMemPoolBuckets.String()) - f.Var(&cfg.BloomPageMemPoolBuckets, prefix+"mem-pool-buckets", "Comma separated list of buckets in the format {size}x{bytes}") -} - -func (cfg *MemoryManagementConfig) Validate() error { - if !slices.Contains(types.names(), cfg.BloomPageAllocationType) { - msg := fmt.Sprintf("bloom_page_alloc_type must be one of: %s", strings.Join(types.descriptions(), ", ")) - return errors.New(msg) - } - if cfg.BloomPageAllocationType == "fixed" && len(cfg.BloomPageMemPoolBuckets) == 0 { - return errors.New("fixed memory pool requires at least one bucket") - } - return nil -} - -type supportedAllocationTypes []string - -func (t supportedAllocationTypes) names() []string { - names := make([]string, 0, len(t)/2) - for i := 0; i < len(t); i += 2 { - names = append(names, t[i]) - } - return names -} - -func (t supportedAllocationTypes) descriptions() []string { - names := make([]string, 0, len(t)/2) - for i := 0; i < len(t); i += 2 { - names = append(names, fmt.Sprintf("%s (%s)", t[i], t[i+1])) - } - return names -} diff --git a/pkg/util/flagext/csv.go b/pkg/util/flagext/csv.go deleted file mode 100644 index 6ed5f9bad11a0..0000000000000 --- a/pkg/util/flagext/csv.go +++ /dev/null @@ -1,62 +0,0 @@ -package flagext - -import ( - "strings" -) - -type ListValue interface { - String() string - Parse(s string) (any, error) -} - -// StringSliceCSV is a slice of strings that is parsed from a comma-separated string -// It implements flag.Value and yaml Marshalers -type CSV[T ListValue] []T - -// String implements flag.Value -func (v CSV[T]) String() string { - s := make([]string, 0, len(v)) - for i := range v { - s = append(s, v[i].String()) - } - return strings.Join(s, ",") -} - -// Set implements flag.Value -func (v *CSV[T]) Set(s string) error { - if len(s) == 0 { - *v = nil - return nil - } - var zero T - values := strings.Split(s, ",") - *v = make(CSV[T], 0, len(values)) - for _, val := range values { - el, err := zero.Parse(val) - if err != nil { - return err - } - *v = append(*v, el.(T)) - } - return nil -} - -// String implements flag.Getter -func (v CSV[T]) Get() []T { - return v -} - -// UnmarshalYAML implements yaml.Unmarshaler. -func (v *CSV[T]) UnmarshalYAML(unmarshal func(interface{}) error) error { - var s string - if err := unmarshal(&s); err != nil { - return err - } - - return v.Set(s) -} - -// MarshalYAML implements yaml.Marshaler. -func (v CSV[T]) MarshalYAML() (interface{}, error) { - return v.String(), nil -} diff --git a/pkg/util/flagext/csv_test.go b/pkg/util/flagext/csv_test.go deleted file mode 100644 index aca4ea8a77eef..0000000000000 --- a/pkg/util/flagext/csv_test.go +++ /dev/null @@ -1,79 +0,0 @@ -package flagext - -import ( - "strconv" - "testing" - - "github.com/stretchr/testify/require" -) - -type customType int - -// Parse implements ListValue. -func (l customType) Parse(s string) (any, error) { - v, err := strconv.Atoi(s) - if err != nil { - return customType(0), err - } - return customType(v), nil -} - -// String implements ListValue. -func (l customType) String() string { - return strconv.Itoa(int(l)) -} - -var _ ListValue = customType(0) - -func Test_CSV(t *testing.T) { - for _, tc := range []struct { - in string - err bool - out []customType - }{ - { - in: "", - err: false, - out: nil, - }, - { - in: ",", - err: true, - out: []customType{}, - }, - { - in: "1", - err: false, - out: []customType{1}, - }, - { - in: "1,2", - err: false, - out: []customType{1, 2}, - }, - { - in: "1,", - err: true, - out: []customType{}, - }, - { - in: ",1", - err: true, - out: []customType{}, - }, - } { - t.Run(tc.in, func(t *testing.T) { - var v CSV[customType] - - err := v.Set(tc.in) - if tc.err { - require.NotNil(t, err) - } else { - require.Nil(t, err) - require.Equal(t, tc.out, v.Get()) - } - - }) - } - -} diff --git a/pkg/util/mempool/bucket.go b/pkg/util/mempool/bucket.go deleted file mode 100644 index a041eb49e3f8f..0000000000000 --- a/pkg/util/mempool/bucket.go +++ /dev/null @@ -1,51 +0,0 @@ -package mempool - -import ( - "errors" - "fmt" - "strconv" - "strings" - - "github.com/c2h5oh/datasize" -) - -type Bucket struct { - Size int - Capacity uint64 -} - -func (b Bucket) Parse(s string) (any, error) { - parts := strings.Split(s, "x") - if len(parts) != 2 { - return nil, errors.New("bucket must be in format {count}x{bytes}") - } - - size, err := strconv.Atoi(parts[0]) - if err != nil { - return nil, err - } - - capacity, err := datasize.ParseString(parts[1]) - if err != nil { - panic(err.Error()) - } - - return Bucket{ - Size: size, - Capacity: uint64(capacity), - }, nil -} - -func (b Bucket) String() string { - return fmt.Sprintf("%dx%s", b.Size, datasize.ByteSize(b.Capacity).String()) -} - -type Buckets []Bucket - -func (b Buckets) String() string { - s := make([]string, 0, len(b)) - for i := range b { - s = append(s, b[i].String()) - } - return strings.Join(s, ",") -} diff --git a/pkg/util/mempool/metrics.go b/pkg/util/mempool/metrics.go deleted file mode 100644 index f7d5a52eb0d91..0000000000000 --- a/pkg/util/mempool/metrics.go +++ /dev/null @@ -1,32 +0,0 @@ -package mempool - -import ( - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - - "github.com/grafana/loki/v3/pkg/util/constants" -) - -type metrics struct { - availableBuffersPerSlab *prometheus.CounterVec - errorsCounter *prometheus.CounterVec -} - -func newMetrics(r prometheus.Registerer, name string) *metrics { - return &metrics{ - availableBuffersPerSlab: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ - Namespace: constants.Loki, - Subsystem: "mempool", - Name: "available_buffers_per_slab", - Help: "The amount of available buffers per slab.", - ConstLabels: prometheus.Labels{"pool": name}, - }, []string{"slab"}), - errorsCounter: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ - Namespace: constants.Loki, - Subsystem: "mempool", - Name: "errors_total", - Help: "The total amount of errors returned from the pool.", - ConstLabels: prometheus.Labels{"pool": name}, - }, []string{"slab", "reason"}), - } -} diff --git a/pkg/util/mempool/pool.go b/pkg/util/mempool/pool.go deleted file mode 100644 index b42d8d9237677..0000000000000 --- a/pkg/util/mempool/pool.go +++ /dev/null @@ -1,135 +0,0 @@ -package mempool - -import ( - "errors" - "fmt" - "sync" - "unsafe" - - "github.com/dustin/go-humanize" - "github.com/prometheus/client_golang/prometheus" -) - -var ( - errSlabExhausted = errors.New("slab exhausted") - - reasonSizeExceeded = "size-exceeded" - reasonSlabExhausted = "slab-exhausted" -) - -type slab struct { - buffer chan unsafe.Pointer - size, count int - mtx sync.Mutex - metrics *metrics - name string -} - -func newSlab(bufferSize, bufferCount int, m *metrics) *slab { - name := humanize.Bytes(uint64(bufferSize)) - m.availableBuffersPerSlab.WithLabelValues(name).Add(0) // initialize metric with value 0 - - return &slab{ - size: bufferSize, - count: bufferCount, - metrics: m, - name: name, - } -} - -func (s *slab) init() { - s.buffer = make(chan unsafe.Pointer, s.count) - for i := 0; i < s.count; i++ { - buf := make([]byte, 0, s.size) - ptr := unsafe.Pointer(unsafe.SliceData(buf)) - s.buffer <- ptr - } - s.metrics.availableBuffersPerSlab.WithLabelValues(s.name).Add(float64(s.count)) -} - -func (s *slab) get(size int) ([]byte, error) { - s.mtx.Lock() - if s.buffer == nil { - s.init() - } - defer s.mtx.Unlock() - - // wait for available buffer on channel - var buf []byte - select { - case ptr := <-s.buffer: - buf = unsafe.Slice((*byte)(ptr), s.size) - default: - s.metrics.errorsCounter.WithLabelValues(s.name, reasonSlabExhausted).Inc() - return nil, errSlabExhausted - } - - // Taken from https://github.com/ortuman/nuke/blob/main/monotonic_arena.go#L37-L48 - // This piece of code will be translated into a runtime.memclrNoHeapPointers - // invocation by the compiler, which is an assembler optimized implementation. - // Architecture specific code can be found at src/runtime/memclr_$GOARCH.s - // in Go source (since https://codereview.appspot.com/137880043). - for i := range buf { - buf[i] = 0 - } - - return buf[:size], nil -} - -func (s *slab) put(buf []byte) { - if s.buffer == nil { - panic("slab is not initialized") - } - - ptr := unsafe.Pointer(unsafe.SliceData(buf)) - s.buffer <- ptr -} - -// MemPool is an Allocator implementation that uses a fixed size memory pool -// that is split into multiple slabs of different buffer sizes. -// Buffers are re-cycled and need to be returned back to the pool, otherwise -// the pool runs out of available buffers. -type MemPool struct { - slabs []*slab - metrics *metrics -} - -func New(name string, buckets []Bucket, r prometheus.Registerer) *MemPool { - a := &MemPool{ - slabs: make([]*slab, 0, len(buckets)), - metrics: newMetrics(r, name), - } - for _, b := range buckets { - a.slabs = append(a.slabs, newSlab(int(b.Capacity), b.Size, a.metrics)) - } - return a -} - -// Get satisfies Allocator interface -// Allocating a buffer from an exhausted pool/slab, or allocating a buffer that -// exceeds the largest slab size will return an error. -func (a *MemPool) Get(size int) ([]byte, error) { - for i := 0; i < len(a.slabs); i++ { - if a.slabs[i].size < size { - continue - } - return a.slabs[i].get(size) - } - a.metrics.errorsCounter.WithLabelValues("pool", reasonSizeExceeded).Inc() - return nil, fmt.Errorf("no slab found for size: %d", size) -} - -// Put satisfies Allocator interface -// Every buffer allocated with Get(size int) needs to be returned to the pool -// using Put(buffer []byte) so it can be re-cycled. -func (a *MemPool) Put(buffer []byte) bool { - size := cap(buffer) - for i := 0; i < len(a.slabs); i++ { - if a.slabs[i].size < size { - continue - } - a.slabs[i].put(buffer) - return true - } - return false -} diff --git a/pkg/util/mempool/pool_test.go b/pkg/util/mempool/pool_test.go deleted file mode 100644 index da0fc361dd4a4..0000000000000 --- a/pkg/util/mempool/pool_test.go +++ /dev/null @@ -1,133 +0,0 @@ -package mempool - -import ( - "math/rand" - "sync" - "testing" - "time" - "unsafe" - - "github.com/stretchr/testify/require" -) - -func TestMemPool(t *testing.T) { - - t.Run("empty pool", func(t *testing.T) { - pool := New("test", []Bucket{}, nil) - _, err := pool.Get(256) - require.Error(t, err) - }) - - t.Run("requested size too big", func(t *testing.T) { - pool := New("test", []Bucket{ - {Size: 1, Capacity: 128}, - }, nil) - _, err := pool.Get(256) - require.Error(t, err) - }) - - t.Run("requested size within bucket", func(t *testing.T) { - pool := New("test", []Bucket{ - {Size: 1, Capacity: 128}, - {Size: 1, Capacity: 256}, - {Size: 1, Capacity: 512}, - }, nil) - res, err := pool.Get(200) - require.NoError(t, err) - require.Equal(t, 200, len(res)) - require.Equal(t, 256, cap(res)) - - res, err = pool.Get(300) - require.NoError(t, err) - require.Equal(t, 300, len(res)) - require.Equal(t, 512, cap(res)) - }) - - t.Run("buffer is cleared when returned", func(t *testing.T) { - pool := New("test", []Bucket{ - {Size: 1, Capacity: 64}, - }, nil) - res, err := pool.Get(8) - require.NoError(t, err) - require.Equal(t, 8, len(res)) - source := []byte{0, 1, 2, 3, 4, 5, 6, 7} - copy(res, source) - - pool.Put(res) - - res, err = pool.Get(8) - require.NoError(t, err) - require.Equal(t, 8, len(res)) - require.Equal(t, make([]byte, 8), res) - }) - - t.Run("pool returns error when no buffer is available", func(t *testing.T) { - pool := New("test", []Bucket{ - {Size: 1, Capacity: 64}, - }, nil) - buf1, _ := pool.Get(32) - require.Equal(t, 32, len(buf1)) - - _, err := pool.Get(16) - require.ErrorContains(t, err, errSlabExhausted.Error()) - }) - - t.Run("test ring buffer returns same backing array", func(t *testing.T) { - pool := New("test", []Bucket{ - {Size: 2, Capacity: 128}, - }, nil) - res1, _ := pool.Get(32) - ptr1 := unsafe.Pointer(unsafe.SliceData(res1)) - - res2, _ := pool.Get(64) - ptr2 := unsafe.Pointer(unsafe.SliceData(res2)) - - pool.Put(res2) - pool.Put(res1) - - res3, _ := pool.Get(48) - ptr3 := unsafe.Pointer(unsafe.SliceData(res3)) - - res4, _ := pool.Get(96) - ptr4 := unsafe.Pointer(unsafe.SliceData(res4)) - - require.Equal(t, ptr1, ptr4) - require.Equal(t, ptr2, ptr3) - }) - - t.Run("concurrent access", func(t *testing.T) { - pool := New("test", []Bucket{ - {Size: 32, Capacity: 2 << 10}, - {Size: 16, Capacity: 4 << 10}, - {Size: 8, Capacity: 8 << 10}, - {Size: 4, Capacity: 16 << 10}, - {Size: 2, Capacity: 32 << 10}, - }, nil) - - var wg sync.WaitGroup - numWorkers := 256 - n := 10 - - for i := 0; i < numWorkers; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for i := 0; i < n; i++ { - s := 2 << rand.Intn(5) - buf1, err1 := pool.Get(s) - buf2, err2 := pool.Get(s) - if err2 == nil { - pool.Put(buf2) - } - time.Sleep(time.Millisecond * time.Duration(rand.Intn(10))) - if err1 == nil { - pool.Put(buf1) - } - } - }() - } - - wg.Wait() - t.Log("finished") - }) -} diff --git a/tools/bloom/inspector/main.go b/tools/bloom/inspector/main.go index 90aa8c29c2cbb..dfcc7c79cd86d 100644 --- a/tools/bloom/inspector/main.go +++ b/tools/bloom/inspector/main.go @@ -18,7 +18,7 @@ func main() { r := v1.NewDirectoryBlockReader(path) b := v1.NewBlock(r, v1.NewMetrics(nil)) - q := v1.NewBlockQuerier(b, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize) + q := v1.NewBlockQuerier(b, true, v1.DefaultMaxPageSize) md, err := q.Metadata() if err != nil {