Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Re-introduce fixed size memory pool for bloom querier #13172

Merged
merged 5 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/bloombuild/builder/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/grafana/loki/v3/pkg/chunkenc"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/util/mempool"
)

func blocksFromSchema(t *testing.T, n int, options v1.BlockOptions) (res []*v1.Block, data []v1.SeriesWithBlooms, refs []bloomshipper.BlockRef) {
Expand Down Expand Up @@ -74,7 +75,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser
for i, b := range blocks {
bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{
BlockRef: refs[i],
BlockQuerier: v1.NewBlockQuerier(b, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize),
BlockQuerier: v1.NewBlockQuerier(b, &mempool.SimpleHeapAllocator{}, v1.DefaultMaxPageSize),
})
}

Expand Down Expand Up @@ -152,7 +153,7 @@ func TestSimpleBloomGenerator(t *testing.T) {
expectedRefs := v1.PointerSlice(data)
outputRefs := make([]*v1.SeriesWithBlooms, 0, len(data))
for _, block := range outputBlocks {
bq := v1.NewBlockQuerier(block, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize).Iter()
bq := v1.NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, v1.DefaultMaxPageSize).Iter()
for bq.Next() {
outputRefs = append(outputRefs, bq.At())
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
bloomshipperconfig "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/storage/types"
"github.com/grafana/loki/v3/pkg/util/mempool"
)

var testDay = parseDayTime("2023-09-01")
Expand Down Expand Up @@ -411,7 +412,7 @@ func createPlanner(
reg := prometheus.NewPedanticRegistry()
metasCache := cache.NewNoopCache()
blocksCache := bloomshipper.NewFsBlocksCache(storageCfg.BloomShipperConfig.BlocksCache, reg, logger)
bloomStore, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, storage.ClientMetrics{}, metasCache, blocksCache, reg, logger)
bloomStore, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, storage.ClientMetrics{}, metasCache, blocksCache, &mempool.SimpleHeapAllocator{}, reg, logger)
require.NoError(t, err)

planner, err := New(cfg, limits, schemaCfg, storageCfg, storage.ClientMetrics{}, bloomStore, logger, reg)
Expand Down
3 changes: 2 additions & 1 deletion pkg/bloomcompactor/retention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config"
"github.com/grafana/loki/v3/pkg/storage/types"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/mempool"
lokiring "github.com/grafana/loki/v3/pkg/util/ring"
"github.com/grafana/loki/v3/pkg/validation"
)
Expand Down Expand Up @@ -822,7 +823,7 @@ func NewMockBloomStoreWithWorkDir(t *testing.T, workDir string) (*bloomshipper.B
metasCache := cache.NewMockCache()
blocksCache := bloomshipper.NewFsBlocksCache(storageConfig.BloomShipperConfig.BlocksCache, prometheus.NewPedanticRegistry(), logger)

store, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageConfig, metrics, metasCache, blocksCache, reg, logger)
store, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageConfig, metrics, metasCache, blocksCache, &mempool.SimpleHeapAllocator{}, reg, logger)
if err == nil {
t.Cleanup(store.Stop)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/bloomcompactor/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/grafana/loki/v3/pkg/chunkenc"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/util/mempool"
)

func blocksFromSchema(t *testing.T, n int, options v1.BlockOptions) (res []*v1.Block, data []v1.SeriesWithBlooms, refs []bloomshipper.BlockRef) {
Expand Down Expand Up @@ -74,7 +75,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser
for i, b := range blocks {
bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{
BlockRef: refs[i],
BlockQuerier: v1.NewBlockQuerier(b, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize),
BlockQuerier: v1.NewBlockQuerier(b, &mempool.SimpleHeapAllocator{}, v1.DefaultMaxPageSize),
})
}

Expand Down Expand Up @@ -152,7 +153,7 @@ func TestSimpleBloomGenerator(t *testing.T) {
expectedRefs := v1.PointerSlice(data)
outputRefs := make([]*v1.SeriesWithBlooms, 0, len(data))
for _, block := range outputBlocks {
bq := v1.NewBlockQuerier(block, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize).Iter()
bq := v1.NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, v1.DefaultMaxPageSize).Iter()
for bq.Next() {
outputRefs = append(outputRefs, bq.At())
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
bloomshipperconfig "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config"
"github.com/grafana/loki/v3/pkg/storage/types"
"github.com/grafana/loki/v3/pkg/util/mempool"
"github.com/grafana/loki/v3/pkg/validation"
)

Expand Down Expand Up @@ -92,7 +93,7 @@ func setupBloomStore(t *testing.T) *bloomshipper.BloomStore {

reg := prometheus.NewRegistry()
blocksCache := bloomshipper.NewFsBlocksCache(storageCfg.BloomShipperConfig.BlocksCache, nil, logger)
store, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, cm, nil, blocksCache, reg, logger)
store, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, cm, nil, blocksCache, &mempool.SimpleHeapAllocator{}, reg, logger)
require.NoError(t, err)
t.Cleanup(store.Stop)

Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (p *processor) processTasks(ctx context.Context, tenant string, day config.
// after iteration for performance (alloc reduction).
// This is safe to do here because we do not capture
// the underlying bloom []byte outside of iteration
bloomshipper.WithPool(true),
bloomshipper.WithPool(p.store.Allocator()),
)
duration = time.Since(startBlocks)
level.Debug(p.logger).Log("msg", "fetched blocks", "count", len(refs), "duration", duration, "err", err)
Expand Down
5 changes: 5 additions & 0 deletions pkg/bloomgateway/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/util/mempool"
)

var _ bloomshipper.Store = &dummyStore{}
Expand Down Expand Up @@ -76,6 +77,10 @@ func (s *dummyStore) Client(_ model.Time) (bloomshipper.Client, error) {
return nil, nil
}

func (s *dummyStore) Allocator() mempool.Allocator {
return nil
}

func (s *dummyStore) Stop() {
}

Expand Down
37 changes: 22 additions & 15 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (

"github.com/grafana/loki/v3/pkg/bloomcompactor"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/types"

"github.com/grafana/loki/v3/pkg/analytics"
Expand Down Expand Up @@ -732,19 +731,6 @@ func (t *Loki) initBloomStore() (services.Service, error) {
reg := prometheus.DefaultRegisterer
bsCfg := t.Cfg.StorageConfig.BloomShipperConfig

// Set global BloomPageAllocator variable
switch bsCfg.MemoryManagement.BloomPageAllocationType {
case "simple":
bloomshipper.BloomPageAllocator = &v1.SimpleHeapAllocator{}
case "dynamic":
bloomshipper.BloomPageAllocator = v1.BloomPagePool
case "fixed":
bloomshipper.BloomPageAllocator = mempool.New("bloom-page-pool", bsCfg.MemoryManagement.BloomPageMemPoolBuckets, reg)
default:
// do nothing
bloomshipper.BloomPageAllocator = nil
}

var metasCache cache.Cache
if t.Cfg.isTarget(IndexGateway) && cache.IsCacheConfigured(bsCfg.MetasCache) {
metasCache, err = cache.New(bsCfg.MetasCache, reg, logger, stats.BloomMetasCache, constants.Loki)
Expand All @@ -768,7 +754,28 @@ func (t *Loki) initBloomStore() (services.Service, error) {
level.Warn(logger).Log("msg", "failed to preload blocks cache", "err", err)
}

t.BloomStore, err = bloomshipper.NewBloomStore(t.Cfg.SchemaConfig.Configs, t.Cfg.StorageConfig, t.ClientMetrics, metasCache, blocksCache, reg, logger)
var pageAllocator mempool.Allocator

// Set global BloomPageAllocator variable
switch bsCfg.MemoryManagement.BloomPageAllocationType {
case "simple":
pageAllocator = &mempool.SimpleHeapAllocator{}
case "dynamic":
// sync buffer pool for bloom pages
// 128KB 256KB 512KB 1MB 2MB 4MB 8MB 16MB 32MB 64MB 128MB
pageAllocator = mempool.NewBytePoolAllocator(
128<<10, 128<<20, 2,
func(size int) interface{} {
return make([]byte, size)
})
case "fixed":
pageAllocator = mempool.New("bloom-page-pool", bsCfg.MemoryManagement.BloomPageMemPoolBuckets, reg)
default:
// should not happen as the type is validated upfront
return nil, fmt.Errorf("failed to create bloom store: invalid allocator type")
}

t.BloomStore, err = bloomshipper.NewBloomStore(t.Cfg.SchemaConfig.Configs, t.Cfg.StorageConfig, t.ClientMetrics, metasCache, blocksCache, pageAllocator, reg, logger)
if err != nil {
return nil, fmt.Errorf("failed to create bloom store: %w", err)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/bloom/v1/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"

"github.com/pkg/errors"

"github.com/grafana/loki/v3/pkg/util/mempool"
)

type BlockMetadata struct {
Expand Down Expand Up @@ -117,7 +119,7 @@ type BlockQuerier struct {
// blooms for querying (bloom-gateway), but not for writing (bloom-compactor).
// Therefore, when calling NewBlockQuerier on the write path, you should always
// pass the SimpleHeapAllocator implementation of the Allocator interface.
func NewBlockQuerier(b *Block, alloc Allocator, maxPageSize int) *BlockQuerier {
func NewBlockQuerier(b *Block, alloc mempool.Allocator, maxPageSize int) *BlockQuerier {
return &BlockQuerier{
block: b,
LazySeriesIter: NewLazySeriesIter(b),
Expand Down
9 changes: 5 additions & 4 deletions pkg/storage/bloom/v1/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter"
"github.com/grafana/loki/v3/pkg/util/encoding"
"github.com/grafana/loki/v3/pkg/util/mempool"
)

// NB(chaudum): Some block pages are way bigger than others (400MiB and
Expand Down Expand Up @@ -63,7 +64,7 @@ func (b *Bloom) Decode(dec *encoding.Decbuf) error {
return nil
}

func LazyDecodeBloomPage(r io.Reader, alloc Allocator, pool chunkenc.ReaderPool, page BloomPageHeader) (*BloomPageDecoder, error) {
func LazyDecodeBloomPage(r io.Reader, alloc mempool.Allocator, pool chunkenc.ReaderPool, page BloomPageHeader) (*BloomPageDecoder, error) {
data, err := alloc.Get(page.Len)
if err != nil {
return nil, errors.Wrap(err, "allocating buffer")
Expand Down Expand Up @@ -101,7 +102,7 @@ func LazyDecodeBloomPage(r io.Reader, alloc Allocator, pool chunkenc.ReaderPool,
}

// shortcut to skip allocations when we know the page is not compressed
func LazyDecodeBloomPageNoCompression(r io.Reader, alloc Allocator, page BloomPageHeader) (*BloomPageDecoder, error) {
func LazyDecodeBloomPageNoCompression(r io.Reader, alloc mempool.Allocator, page BloomPageHeader) (*BloomPageDecoder, error) {
// data + checksum
if page.Len != page.DecompressedLen+4 {
return nil, errors.New("the Len and DecompressedLen of the page do not match")
Expand Down Expand Up @@ -167,7 +168,7 @@ type BloomPageDecoder struct {
// This can only safely be used when the underlying bloom
// bytes don't escape the decoder:
// on reads in the bloom-gw but not in the bloom-compactor
func (d *BloomPageDecoder) Relinquish(alloc Allocator) {
func (d *BloomPageDecoder) Relinquish(alloc mempool.Allocator) {
if d == nil {
return
}
Expand Down Expand Up @@ -284,7 +285,7 @@ func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) (uint32, error) {
// BloomPageDecoder returns a decoder for the given page index.
// It may skip the page if it's too large.
// NB(owen-d): if `skip` is true, err _must_ be nil.
func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, alloc Allocator, pageIdx int, maxPageSize int, metrics *Metrics) (res *BloomPageDecoder, skip bool, err error) {
func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, alloc mempool.Allocator, pageIdx int, maxPageSize int, metrics *Metrics) (res *BloomPageDecoder, skip bool, err error) {
if pageIdx < 0 || pageIdx >= len(b.pageHeaders) {
metrics.pagesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Inc()
metrics.bytesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Add(float64(b.pageHeaders[pageIdx].DecompressedLen))
Expand Down
10 changes: 7 additions & 3 deletions pkg/storage/bloom/v1/bloom_querier.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package v1

import "github.com/pkg/errors"
import (
"github.com/pkg/errors"

"github.com/grafana/loki/v3/pkg/util/mempool"
)

type BloomQuerier interface {
Seek(BloomOffset) (*Bloom, error)
Expand All @@ -10,7 +14,7 @@ type LazyBloomIter struct {
b *Block
m int // max page size in bytes

alloc Allocator
alloc mempool.Allocator

// state
initialized bool
Expand All @@ -24,7 +28,7 @@ type LazyBloomIter struct {
// will be returned to the pool for efficiency.
// This can only safely be used when the underlying bloom
// bytes don't escape the decoder.
func NewLazyBloomIter(b *Block, alloc Allocator, maxSize int) *LazyBloomIter {
func NewLazyBloomIter(b *Block, alloc mempool.Allocator, maxSize int) *LazyBloomIter {
return &LazyBloomIter{
b: b,
m: maxSize,
Expand Down
15 changes: 8 additions & 7 deletions pkg/storage/bloom/v1/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter"
"github.com/grafana/loki/v3/pkg/util/encoding"
"github.com/grafana/loki/v3/pkg/util/mempool"
)

var blockEncodings = []chunkenc.Encoding{
Expand Down Expand Up @@ -121,7 +122,7 @@ func TestBlockBuilder_RoundTrip(t *testing.T) {
}

block := NewBlock(tc.reader, NewMetrics(nil))
querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize).Iter()
querier := NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize).Iter()

err = block.LoadHeaders()
require.Nil(t, err)
Expand Down Expand Up @@ -239,7 +240,7 @@ func TestMergeBuilder(t *testing.T) {
itr := NewSliceIter[SeriesWithBlooms](data[min:max])
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
blocks = append(blocks, NewPeekingIter[*SeriesWithBlooms](NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), &SimpleHeapAllocator{}, DefaultMaxPageSize).Iter()))
blocks = append(blocks, NewPeekingIter[*SeriesWithBlooms](NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize).Iter()))
}

// We're not testing the ability to extend a bloom in this test
Expand Down Expand Up @@ -280,7 +281,7 @@ func TestMergeBuilder(t *testing.T) {
require.Nil(t, err)

block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize)
querier := NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize)

EqualIterators[*SeriesWithBlooms](
t,
Expand Down Expand Up @@ -372,7 +373,7 @@ func TestMergeBuilderFingerprintCollision(t *testing.T) {
require.Nil(t, err)

block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize)
querier := NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize)

require.True(t, querier.Next())
require.Equal(t,
Expand Down Expand Up @@ -417,7 +418,7 @@ func TestBlockReset(t *testing.T) {
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize)
querier := NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize)

rounds := make([][]model.Fingerprint, 2)

Expand Down Expand Up @@ -482,7 +483,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize).Iter()
querier := NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize).Iter()

// rather than use the block querier directly, collect it's data
// so we can use it in a few places later
Expand Down Expand Up @@ -552,7 +553,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {

// ensure the new block contains one copy of all the data
// by comparing it against an iterator over the source data
mergedBlockQuerier := NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), &SimpleHeapAllocator{}, DefaultMaxPageSize)
mergedBlockQuerier := NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize)
sourceItr := NewSliceIter[*SeriesWithBlooms](PointerSlice[SeriesWithBlooms](xs))

EqualIterators[*SeriesWithBlooms](
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/bloom/v1/fuse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,15 @@ import (

"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter"
"github.com/grafana/loki/v3/pkg/util/mempool"
)

var BloomPagePool = mempool.New("test", []mempool.Bucket{
{Size: 16, Capacity: 128 << 10},
{Size: 16, Capacity: 256 << 10},
{Size: 16, Capacity: 512 << 10},
}, nil)

// TODO(owen-d): this is unhinged from the data it represents. I'm leaving this solely so I don't
// have to refactor tests here in order to fix this elsewhere, but it can/should be fixed --
// the skip & n len are hardcoded based on data that's passed to it elsewhere.
Expand Down
Loading