Skip to content

Commit

Permalink
Make max bloom page size configurable
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Mar 25, 2024
1 parent e4a5733 commit 3e6365a
Show file tree
Hide file tree
Showing 12 changed files with 51 additions and 46 deletions.
5 changes: 5 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2362,6 +2362,11 @@ bloom_shipper:
# CLI flag: -bloom.shipper.working-directory
[working_directory: <string> | default = "bloom-shipper"]

# Maximum size of bloom pages that should be queried. Larger pages than this
# limit are skipped when querying blooms to limit memory usage.
# CLI flag: -bloom.max-query-page-size
[max_query_page_size: <int> | default = 32MiB]

blocks_downloading_queue:
# The count of parallel workers that download Bloom Blocks.
# CLI flag: -bloom.shipper.blocks-downloading-queue.workers-count
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomcompactor/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser
for i, b := range blocks {
bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{
BlockRef: refs[i],
BlockQuerier: v1.NewBlockQuerier(b),
BlockQuerier: v1.NewBlockQuerier(b, v1.DefaultMaxPageSize),
})
}

Expand Down Expand Up @@ -152,7 +152,7 @@ func TestSimpleBloomGenerator(t *testing.T) {
expectedRefs := v1.PointerSlice(data)
outputRefs := make([]*v1.SeriesWithBloom, 0, len(data))
for _, block := range outputBlocks {
bq := block.Querier()
bq := v1.NewBlockQuerier(block, v1.DefaultMaxPageSize)
for bq.Next() {
outputRefs = append(outputRefs, bq.At())
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time,
// }
// }
querier := &bloomshipper.CloseableBlockQuerier{
BlockQuerier: v1.NewBlockQuerier(block),
BlockQuerier: v1.NewBlockQuerier(block, v1.DefaultMaxPageSize),
BlockRef: blockRef,
}
queriers = append(queriers, querier)
Expand Down
17 changes: 2 additions & 15 deletions pkg/storage/bloom/v1/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,6 @@ func combineChecksums(index, blooms uint32) uint32 {
return index ^ blooms
}

// convenience method
func (b *Block) Querier() *BlockQuerier {
return NewBlockQuerier(b)
}

func (b *Block) Series() *LazySeriesIter {
return NewLazySeriesIter(b)
}

func (b *Block) Blooms() *LazyBloomIter {
return NewLazyBloomIter(b)
}

func (b *Block) Metadata() (BlockMetadata, error) {
if err := b.LoadHeaders(); err != nil {
return BlockMetadata{}, err
Expand All @@ -123,11 +110,11 @@ type BlockQuerier struct {
cur *SeriesWithBloom
}

func NewBlockQuerier(b *Block) *BlockQuerier {
func NewBlockQuerier(b *Block, maxSize int) *BlockQuerier {
return &BlockQuerier{
block: b,
series: NewLazySeriesIter(b),
blooms: NewLazyBloomIter(b),
blooms: NewLazyBloomIter(b, maxSize),
}
}

Expand Down
11 changes: 5 additions & 6 deletions pkg/storage/bloom/v1/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ import (
// NB(chaudum): Some block pages are way bigger than others (400MiB and
// bigger), and loading multiple pages into memory in parallel can cause the
// gateways to OOM.
// Figure out a decent maximum page size that we can process.
// TODO(chaudum): Make max page size configurable
var maxPageSize = 32 << 20 // 32MB
var ErrPageTooLarge = errors.Errorf("bloom page too large: size limit is %.1fMiB", float64(maxPageSize)/float64(1<<20))
// Figure out a decent default maximum page size that we can process.
var DefaultMaxPageSize = 32 << 20 // 32MB
var ErrPageTooLarge = errors.Errorf("bloom page too large")

type Bloom struct {
filter.ScalableBloomFilter
Expand Down Expand Up @@ -259,15 +258,15 @@ func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) (uint32, error) {
return checksum, nil
}

func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int) (*BloomPageDecoder, error) {
func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx, maxSize int) (*BloomPageDecoder, error) {
if pageIdx < 0 || pageIdx >= len(b.pageHeaders) {
return nil, fmt.Errorf("invalid page (%d) for bloom page decoding", pageIdx)
}

page := b.pageHeaders[pageIdx]
// fmt.Printf("pageIdx=%d page=%+v size=%.2fMiB\n", pageIdx, page, float64(page.Len)/float64(1<<20))

if page.Len > maxPageSize {
if page.Len > maxSize {
return nil, ErrPageTooLarge
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/storage/bloom/v1/bloom_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type BloomQuerier interface {

type LazyBloomIter struct {
b *Block
m int // max page size in bytes

// state
initialized bool
Expand All @@ -16,9 +17,10 @@ type LazyBloomIter struct {
curPage *BloomPageDecoder
}

func NewLazyBloomIter(b *Block) *LazyBloomIter {
func NewLazyBloomIter(b *Block, maxSize int) *LazyBloomIter {
return &LazyBloomIter{
b: b,
m: maxSize,
}
}

Expand All @@ -44,7 +46,7 @@ func (it *LazyBloomIter) Seek(offset BloomOffset) {
it.err = errors.Wrap(err, "getting blooms reader")
return
}
decoder, err := it.b.blooms.BloomPageDecoder(r, offset.Page)
decoder, err := it.b.blooms.BloomPageDecoder(r, offset.Page, it.m)
if err != nil {
it.err = errors.Wrap(err, "loading bloom page")
return
Expand Down Expand Up @@ -83,6 +85,7 @@ func (it *LazyBloomIter) next() bool {
it.curPage, err = it.b.blooms.BloomPageDecoder(
r,
it.curPageIndex,
it.m,
)
if err != nil {
it.err = err
Expand Down
12 changes: 6 additions & 6 deletions pkg/storage/bloom/v1/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestBlockBuilder_RoundTrip(t *testing.T) {
}

block := NewBlock(tc.reader)
querier := NewBlockQuerier(block)
querier := NewBlockQuerier(block, DefaultMaxPageSize)

err = block.LoadHeaders()
require.Nil(t, err)
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestMergeBuilder(t *testing.T) {
itr := NewSliceIter[SeriesWithBloom](data[min:max])
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
blocks = append(blocks, NewPeekingIter[*SeriesWithBloom](NewBlockQuerier(NewBlock(reader))))
blocks = append(blocks, NewPeekingIter[*SeriesWithBloom](NewBlockQuerier(NewBlock(reader), DefaultMaxPageSize)))
}

// We're not testing the ability to extend a bloom in this test
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestMergeBuilder(t *testing.T) {
require.Nil(t, err)

block := NewBlock(reader)
querier := NewBlockQuerier(block)
querier := NewBlockQuerier(block, DefaultMaxPageSize)

EqualIterators[*SeriesWithBloom](
t,
Expand Down Expand Up @@ -296,7 +296,7 @@ func TestBlockReset(t *testing.T) {
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
block := NewBlock(reader)
querier := NewBlockQuerier(block)
querier := NewBlockQuerier(block, DefaultMaxPageSize)

rounds := make([][]model.Fingerprint, 2)

Expand Down Expand Up @@ -362,7 +362,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
block := NewBlock(reader)
querier := NewBlockQuerier(block)
querier := NewBlockQuerier(block, DefaultMaxPageSize)

// rather than use the block querier directly, collect it's data
// so we can use it in a few places later
Expand Down Expand Up @@ -423,7 +423,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {

// ensure the new block contains one copy of all the data
// by comparing it against an iterator over the source data
mergedBlockQuerier := NewBlockQuerier(NewBlock(reader))
mergedBlockQuerier := NewBlockQuerier(NewBlock(reader), DefaultMaxPageSize)
sourceItr := NewSliceIter[*SeriesWithBloom](PointerSlice[SeriesWithBloom](xs))

EqualIterators[*SeriesWithBloom](
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/bloom/v1/fuse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestFusedQuerier(t *testing.T) {
require.NoError(t, err)
require.False(t, itr.Next())
block := NewBlock(reader)
querier := NewBlockQuerier(block)
querier := NewBlockQuerier(block, DefaultMaxPageSize)

n := 2
nReqs := numSeries / n
Expand Down Expand Up @@ -143,7 +143,7 @@ func setupBlockForBenchmark(b *testing.B) (*BlockQuerier, [][]Request, []chan Ou
_, err = builder.BuildFrom(itr)
require.Nil(b, err)
block := NewBlock(reader)
querier := NewBlockQuerier(block)
querier := NewBlockQuerier(block, DefaultMaxPageSize)

numRequestChains := 100
seriesPerRequest := 100
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/stores/shipper/bloomshipper/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ func (b *BlockDirectory) resolveSize() error {

// BlockQuerier returns a new block querier from the directory.
// The passed function `close` is called when the the returned querier is closed.
func (b BlockDirectory) BlockQuerier(close func() error) *CloseableBlockQuerier {
func (b BlockDirectory) BlockQuerier(maxSize int, close func() error) *CloseableBlockQuerier {
return &CloseableBlockQuerier{
BlockQuerier: v1.NewBlockQuerier(b.Block()),
BlockQuerier: v1.NewBlockQuerier(b.Block(), maxSize),
BlockRef: b.BlockRef,
close: close,
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

type Config struct {
WorkingDirectory string `yaml:"working_directory"`
MaxQueryPageSize flagext.Bytes `yaml:"max_query_page_size"`
BlocksDownloadingQueue DownloadingQueueConfig `yaml:"blocks_downloading_queue"`
BlocksCache BlocksCacheConfig `yaml:"blocks_cache"`
MetasCache cache.Config `yaml:"metas_cache"`
Expand All @@ -31,6 +32,8 @@ func (cfg *DownloadingQueueConfig) RegisterFlagsWithPrefix(prefix string, f *fla

func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&c.WorkingDirectory, prefix+"shipper.working-directory", "bloom-shipper", "Working directory to store downloaded Bloom Blocks.")
_ = c.MaxQueryPageSize.Set("32MiB") // default should match the one set in pkg/storage/bloom/v1/bloom.go
f.Var(&c.MaxQueryPageSize, prefix+"max-query-page-size", "Maximum size of bloom pages that should be queried. Larger pages than this limit are skipped when querying blooms to limit memory usage.")
c.BlocksDownloadingQueue.RegisterFlagsWithPrefix(prefix+"shipper.blocks-downloading-queue.", f)
c.BlocksCache.RegisterFlagsWithPrefixAndDefaults(prefix+"blocks-cache.", "Cache for bloom blocks. ", f, 24*time.Hour)
c.MetasCache.RegisterFlagsWithPrefix(prefix+"metas-cache.", "Cache for bloom metas. ", f)
Expand Down
18 changes: 12 additions & 6 deletions pkg/storage/stores/shipper/bloomshipper/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,12 @@ func (f *Fetcher) FetchBlocks(ctx context.Context, refs []BlockRef, opts ...Fetc
}
found++
f.metrics.blocksFound.Inc()
results[i] = dir.BlockQuerier(func() error {
return f.blocksCache.Release(ctx, key)
})
results[i] = dir.BlockQuerier(
f.cfg.maxBloomPageSize,
func() error {
return f.blocksCache.Release(ctx, key)
},
)
}

// fetchAsync defines whether the function may return early or whether it
Expand Down Expand Up @@ -248,9 +251,12 @@ func (f *Fetcher) FetchBlocks(ctx context.Context, refs []BlockRef, opts ...Fetc
case res := <-responses:
found++
key := f.client.Block(refs[res.idx]).Addr()
results[res.idx] = res.item.BlockQuerier(func() error {
return f.blocksCache.Release(ctx, key)
})
results[res.idx] = res.item.BlockQuerier(
f.cfg.maxBloomPageSize,
func() error {
return f.blocksCache.Release(ctx, key)
},
)
}
}

Expand Down
10 changes: 6 additions & 4 deletions pkg/storage/stores/shipper/bloomshipper/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ type Store interface {
}

type bloomStoreConfig struct {
workingDir string
numWorkers int
workingDir string
numWorkers int
maxBloomPageSize int
}

// Compiler check to ensure bloomStoreEntry implements the Store interface
Expand Down Expand Up @@ -184,8 +185,9 @@ func NewBloomStore(

// TODO(chaudum): Remove wrapper
cfg := bloomStoreConfig{
workingDir: storageConfig.BloomShipperConfig.WorkingDirectory,
numWorkers: storageConfig.BloomShipperConfig.BlocksDownloadingQueue.WorkersCount,
workingDir: storageConfig.BloomShipperConfig.WorkingDirectory,
numWorkers: storageConfig.BloomShipperConfig.BlocksDownloadingQueue.WorkersCount,
maxBloomPageSize: int(storageConfig.BloomShipperConfig.MaxQueryPageSize),
}

if err := util.EnsureDirectory(cfg.workingDir); err != nil {
Expand Down

0 comments on commit 3e6365a

Please sign in to comment.