Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Re-introduce fixed size memory pool for bloom querier #13172

Merged
merged 5 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Builder struct {
logger log.Logger

tsdbStore common.TSDBStore
bloomStore bloomshipper.Store
bloomStore bloomshipper.StoreBase
chunkLoader ChunkLoader

client protos.PlannerForBuilderClient
Expand All @@ -51,7 +51,7 @@ func New(
storeCfg storage.Config,
storageMetrics storage.ClientMetrics,
fetcherProvider stores.ChunkFetcherProvider,
bloomStore bloomshipper.Store,
bloomStore bloomshipper.StoreBase,
logger log.Logger,
r prometheus.Registerer,
) (*Builder, error) {
Expand Down
5 changes: 3 additions & 2 deletions pkg/bloombuild/builder/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/grafana/loki/v3/pkg/chunkenc"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/util/mempool"
)

func blocksFromSchema(t *testing.T, n int, options v1.BlockOptions) (res []*v1.Block, data []v1.SeriesWithBlooms, refs []bloomshipper.BlockRef) {
Expand Down Expand Up @@ -74,7 +75,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser
for i, b := range blocks {
bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{
BlockRef: refs[i],
BlockQuerier: v1.NewBlockQuerier(b, false, v1.DefaultMaxPageSize),
BlockQuerier: v1.NewBlockQuerier(b, &mempool.SimpleHeapAllocator{}, v1.DefaultMaxPageSize),
})
}

Expand Down Expand Up @@ -152,7 +153,7 @@ func TestSimpleBloomGenerator(t *testing.T) {
expectedRefs := v1.PointerSlice(data)
outputRefs := make([]*v1.SeriesWithBlooms, 0, len(data))
for _, block := range outputBlocks {
bq := v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize).Iter()
bq := v1.NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, v1.DefaultMaxPageSize).Iter()
for bq.Next() {
outputRefs = append(outputRefs, bq.At())
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Planner struct {
schemaCfg config.SchemaConfig

tsdbStore common.TSDBStore
bloomStore bloomshipper.Store
bloomStore bloomshipper.StoreBase

tasksQueue *queue.RequestQueue
activeUsers *util.ActiveUsersCleanupService
Expand All @@ -57,7 +57,7 @@ func New(
schemaCfg config.SchemaConfig,
storeCfg storage.Config,
storageMetrics storage.ClientMetrics,
bloomStore bloomshipper.Store,
bloomStore bloomshipper.StoreBase,
logger log.Logger,
r prometheus.Registerer,
) (*Planner, error) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
bloomshipperconfig "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/storage/types"
"github.com/grafana/loki/v3/pkg/util/mempool"
)

var testDay = parseDayTime("2023-09-01")
Expand Down Expand Up @@ -411,7 +412,7 @@ func createPlanner(
reg := prometheus.NewPedanticRegistry()
metasCache := cache.NewNoopCache()
blocksCache := bloomshipper.NewFsBlocksCache(storageCfg.BloomShipperConfig.BlocksCache, reg, logger)
bloomStore, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, storage.ClientMetrics{}, metasCache, blocksCache, reg, logger)
bloomStore, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, storage.ClientMetrics{}, metasCache, blocksCache, &mempool.SimpleHeapAllocator{}, reg, logger)
require.NoError(t, err)

planner, err := New(cfg, limits, schemaCfg, storageCfg, storage.ClientMetrics{}, bloomStore, logger, reg)
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Compactor struct {
retentionManager *RetentionManager

// temporary workaround until bloomStore has implemented read/write shipper interface
bloomStore bloomshipper.Store
bloomStore bloomshipper.StoreBase

sharding util_ring.TenantSharding

Expand All @@ -69,7 +69,7 @@ func New(
ring ring.ReadRing,
ringLifeCycler *ring.BasicLifecycler,
limits Limits,
store bloomshipper.StoreWithMetrics,
store bloomshipper.Store,
logger log.Logger,
r prometheus.Registerer,
) (*Compactor, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomcompactor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

type SimpleBloomController struct {
tsdbStore TSDBStore
bloomStore bloomshipper.Store
bloomStore bloomshipper.StoreBase
chunkLoader ChunkLoader
metrics *Metrics
limits Limits
Expand All @@ -32,7 +32,7 @@ type SimpleBloomController struct {

func NewSimpleBloomController(
tsdbStore TSDBStore,
blockStore bloomshipper.Store,
blockStore bloomshipper.StoreBase,
chunkLoader ChunkLoader,
limits Limits,
metrics *Metrics,
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomcompactor/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type RetentionLimits interface {
type RetentionManager struct {
cfg RetentionConfig
limits RetentionLimits
bloomStore bloomshipper.Store
bloomStore bloomshipper.StoreBase
sharding retentionSharding
metrics *Metrics
logger log.Logger
Expand All @@ -108,7 +108,7 @@ type RetentionManager struct {
func NewRetentionManager(
cfg RetentionConfig,
limits RetentionLimits,
bloomStore bloomshipper.Store,
bloomStore bloomshipper.StoreBase,
sharding retentionSharding,
metrics *Metrics,
logger log.Logger,
Expand Down
3 changes: 2 additions & 1 deletion pkg/bloomcompactor/retention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config"
"github.com/grafana/loki/v3/pkg/storage/types"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/mempool"
lokiring "github.com/grafana/loki/v3/pkg/util/ring"
"github.com/grafana/loki/v3/pkg/validation"
)
Expand Down Expand Up @@ -822,7 +823,7 @@ func NewMockBloomStoreWithWorkDir(t *testing.T, workDir string) (*bloomshipper.B
metasCache := cache.NewMockCache()
blocksCache := bloomshipper.NewFsBlocksCache(storageConfig.BloomShipperConfig.BlocksCache, prometheus.NewPedanticRegistry(), logger)

store, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageConfig, metrics, metasCache, blocksCache, reg, logger)
store, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageConfig, metrics, metasCache, blocksCache, &mempool.SimpleHeapAllocator{}, reg, logger)
if err == nil {
t.Cleanup(store.Stop)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/bloomcompactor/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/grafana/loki/v3/pkg/chunkenc"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/util/mempool"
)

func blocksFromSchema(t *testing.T, n int, options v1.BlockOptions) (res []*v1.Block, data []v1.SeriesWithBlooms, refs []bloomshipper.BlockRef) {
Expand Down Expand Up @@ -74,7 +75,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser
for i, b := range blocks {
bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{
BlockRef: refs[i],
BlockQuerier: v1.NewBlockQuerier(b, false, v1.DefaultMaxPageSize),
BlockQuerier: v1.NewBlockQuerier(b, &mempool.SimpleHeapAllocator{}, v1.DefaultMaxPageSize),
})
}

Expand Down Expand Up @@ -152,7 +153,7 @@ func TestSimpleBloomGenerator(t *testing.T) {
expectedRefs := v1.PointerSlice(data)
outputRefs := make([]*v1.SeriesWithBlooms, 0, len(data))
for _, block := range outputBlocks {
bq := v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize).Iter()
bq := v1.NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, v1.DefaultMaxPageSize).Iter()
for bq.Next() {
outputRefs = append(outputRefs, bq.At())
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Gateway struct {

queue *queue.RequestQueue
activeUsers *util.ActiveUsersCleanupService
bloomStore bloomshipper.StoreWithMetrics
bloomStore bloomshipper.Store

pendingTasks *atomic.Int64

Expand All @@ -72,7 +72,7 @@ func (l *fixedQueueLimits) MaxConsumers(_ string, _ int) int {
}

// New returns a new instance of the Bloom Gateway.
func New(cfg Config, store bloomshipper.StoreWithMetrics, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) {
func New(cfg Config, store bloomshipper.Store, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) {
utillog.WarnExperimentalUse("Bloom Gateway", logger)
g := &Gateway{
cfg: cfg,
Expand Down
3 changes: 2 additions & 1 deletion pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
bloomshipperconfig "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config"
"github.com/grafana/loki/v3/pkg/storage/types"
"github.com/grafana/loki/v3/pkg/util/mempool"
"github.com/grafana/loki/v3/pkg/validation"
)

Expand Down Expand Up @@ -92,7 +93,7 @@ func setupBloomStore(t *testing.T) *bloomshipper.BloomStore {

reg := prometheus.NewRegistry()
blocksCache := bloomshipper.NewFsBlocksCache(storageCfg.BloomShipperConfig.BlocksCache, nil, logger)
store, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, cm, nil, blocksCache, reg, logger)
store, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, cm, nil, blocksCache, &mempool.SimpleHeapAllocator{}, reg, logger)
require.NoError(t, err)
t.Cleanup(store.Stop)

Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (p *processor) processTasks(ctx context.Context, tenant string, day config.
// after iteration for performance (alloc reduction).
// This is safe to do here because we do not capture
// the underlying bloom []byte outside of iteration
bloomshipper.WithPool(true),
bloomshipper.WithPool(p.store.Allocator()),
)
duration = time.Since(startBlocks)
level.Debug(p.logger).Log("msg", "fetched blocks", "count", len(refs), "duration", duration, "err", err)
Expand Down
21 changes: 16 additions & 5 deletions pkg/bloomgateway/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,21 @@ import (
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/util/mempool"
)

var _ bloomshipper.Store = &dummyStore{}
var _ bloomshipper.StoreBase = &dummyStore{}

// refs and blocks must be in 1-1 correspondence.
func newMockBloomStore(refs []bloomshipper.BlockRef, blocks []*v1.Block, metas []bloomshipper.Meta) *dummyStore {
allocator := mempool.New("bloompages", mempool.Buckets{
{Size: 32, Capacity: 512 << 10},
}, nil)
return &dummyStore{
refs: refs,
blocks: blocks,
metas: metas,
refs: refs,
blocks: blocks,
metas: metas,
allocator: allocator,
}
}

Expand All @@ -38,6 +43,8 @@ type dummyStore struct {
refs []bloomshipper.BlockRef
blocks []*v1.Block

allocator mempool.Allocator

// mock how long it takes to serve block queriers
delay time.Duration
// mock response error when serving block queriers in ForEach
Expand Down Expand Up @@ -76,6 +83,10 @@ func (s *dummyStore) Client(_ model.Time) (bloomshipper.Client, error) {
return nil, nil
}

func (s *dummyStore) Allocator() mempool.Allocator {
return s.allocator
}

func (s *dummyStore) Stop() {
}

Expand All @@ -92,7 +103,7 @@ func (s *dummyStore) FetchBlocks(_ context.Context, refs []bloomshipper.BlockRef
if ref.Bounds.Equal(s.refs[i].Bounds) {
blockCopy := *block
bq := &bloomshipper.CloseableBlockQuerier{
BlockQuerier: v1.NewBlockQuerier(&blockCopy, false, v1.DefaultMaxPageSize),
BlockQuerier: v1.NewBlockQuerier(&blockCopy, s.Allocator(), v1.DefaultMaxPageSize),
BlockRef: s.refs[i],
}
result = append(result, bq)
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomgateway/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type blockWithSeries struct {
}

type defaultBlockResolver struct {
store bloomshipper.Store
store bloomshipper.StoreBase
logger log.Logger
}

Expand Down Expand Up @@ -123,7 +123,7 @@ func unassignedSeries(mapped []blockWithSeries, series []*logproto.GroupedChunkR
return skipped
}

func NewBlockResolver(store bloomshipper.Store, logger log.Logger) BlockResolver {
func NewBlockResolver(store bloomshipper.StoreBase, logger log.Logger) BlockResolver {
return &defaultBlockResolver{
store: store,
logger: logger,
Expand Down
1 change: 1 addition & 0 deletions pkg/bloomgateway/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time,
// t.Log(i, j, string(keys[i][j]))
// }
// }

blocks = append(blocks, block)
metas = append(metas, meta)
blockRefs = append(blockRefs, blockRef)
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ type Loki struct {
querierAPI *querier.QuerierAPI
ingesterQuerier *querier.IngesterQuerier
Store storage.Store
BloomStore bloomshipper.StoreWithMetrics
BloomStore bloomshipper.Store
tableManager *index.TableManager
frontend Frontend
ruler *base_ruler.Ruler
Expand Down
20 changes: 19 additions & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ import (
"github.com/grafana/loki/v3/pkg/util/httpreq"
"github.com/grafana/loki/v3/pkg/util/limiter"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/mempool"
"github.com/grafana/loki/v3/pkg/util/querylimits"
lokiring "github.com/grafana/loki/v3/pkg/util/ring"
serverutil "github.com/grafana/loki/v3/pkg/util/server"
Expand Down Expand Up @@ -753,7 +754,24 @@ func (t *Loki) initBloomStore() (services.Service, error) {
level.Warn(logger).Log("msg", "failed to preload blocks cache", "err", err)
}

t.BloomStore, err = bloomshipper.NewBloomStore(t.Cfg.SchemaConfig.Configs, t.Cfg.StorageConfig, t.ClientMetrics, metasCache, blocksCache, reg, logger)
var pageAllocator mempool.Allocator

// Set global BloomPageAllocator variable
switch bsCfg.MemoryManagement.BloomPageAllocationType {
case "simple":
pageAllocator = &mempool.SimpleHeapAllocator{}
case "dynamic":
// sync buffer pool for bloom pages
// 128KB 256KB 512KB 1MB 2MB 4MB 8MB 16MB 32MB 64MB 128MB
pageAllocator = mempool.NewBytePoolAllocator(128<<10, 128<<20, 2)
case "fixed":
pageAllocator = mempool.New("bloom-page-pool", bsCfg.MemoryManagement.BloomPageMemPoolBuckets, reg)
default:
// should not happen as the type is validated upfront
return nil, fmt.Errorf("failed to create bloom store: invalid allocator type")
}

t.BloomStore, err = bloomshipper.NewBloomStore(t.Cfg.SchemaConfig.Configs, t.Cfg.StorageConfig, t.ClientMetrics, metasCache, blocksCache, pageAllocator, reg, logger)
if err != nil {
return nil, fmt.Errorf("failed to create bloom store: %w", err)
}
Expand Down
23 changes: 15 additions & 8 deletions pkg/storage/bloom/v1/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"

"github.com/pkg/errors"

"github.com/grafana/loki/v3/pkg/util/mempool"
)

type BlockMetadata struct {
Expand Down Expand Up @@ -110,17 +112,18 @@ type BlockQuerier struct {
}

// NewBlockQuerier returns a new BlockQuerier for the given block.
// WARNING: If noCapture is true, the underlying byte slice of the bloom page
// will be returned to the pool for efficiency. This can only safely be used
// when the underlying bloom bytes don't escape the decoder, i.e.
// when loading blooms for querying (bloom-gw) but not for writing (bloom-compactor).
// When usePool is true, the bloom MUST NOT be captured by the caller. Rather,
// it should be discarded before another call to Next().
func NewBlockQuerier(b *Block, usePool bool, maxPageSize int) *BlockQuerier {
// WARNING: You can pass an implementation of Allocator that is responsible for
// whether the underlying byte slice of the bloom page will be returned to the
// pool for efficiency or not. Returning to the pool can only safely be used
// when the underlying bloom bytes don't escape the decoder, i.e. when loading
// blooms for querying (bloom-gateway), but not for writing (bloom-compactor).
// Therefore, when calling NewBlockQuerier on the write path, you should always
// pass the SimpleHeapAllocator implementation of the Allocator interface.
func NewBlockQuerier(b *Block, alloc mempool.Allocator, maxPageSize int) *BlockQuerier {
return &BlockQuerier{
block: b,
LazySeriesIter: NewLazySeriesIter(b),
blooms: NewLazyBloomIter(b, usePool, maxPageSize),
blooms: NewLazyBloomIter(b, alloc, maxPageSize),
}
}

Expand All @@ -144,6 +147,10 @@ func (bq *BlockQuerier) Err() error {
return bq.blooms.Err()
}

func (bq *BlockQuerier) Close() {
bq.blooms.Close()
}

type BlockQuerierIter struct {
*BlockQuerier
}
Expand Down
Loading
Loading