Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(blooms): Make max bloom page size for querying configurable #12337

Merged
merged 1 commit into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2362,6 +2362,11 @@ bloom_shipper:
# CLI flag: -bloom.shipper.working-directory
[working_directory: <string> | default = "bloom-shipper"]

# Maximum size of bloom pages that should be queried. Larger pages than this
# limit are skipped when querying blooms to limit memory usage.
# CLI flag: -bloom.max-query-page-size
[max_query_page_size: <int> | default = 64MiB]

blocks_downloading_queue:
# The count of parallel workers that download Bloom Blocks.
# CLI flag: -bloom.shipper.blocks-downloading-queue.workers-count
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomcompactor/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser
for i, b := range blocks {
bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{
BlockRef: refs[i],
BlockQuerier: v1.NewBlockQuerier(b, false),
BlockQuerier: v1.NewBlockQuerier(b, false, v1.DefaultMaxPageSize),
})
}

Expand Down Expand Up @@ -152,7 +152,7 @@ func TestSimpleBloomGenerator(t *testing.T) {
expectedRefs := v1.PointerSlice(data)
outputRefs := make([]*v1.SeriesWithBloom, 0, len(data))
for _, block := range outputBlocks {
bq := v1.NewBlockQuerier(block, false)
bq := v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize)
for bq.Next() {
outputRefs = append(outputRefs, bq.At())
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time,
// }
// }
querier := &bloomshipper.CloseableBlockQuerier{
BlockQuerier: v1.NewBlockQuerier(block, false),
BlockQuerier: v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize),
BlockRef: blockRef,
}
queriers = append(queriers, querier)
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/bloom/v1/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ type BlockQuerier struct {
// will be returned to the pool for efficiency. This can only safely be used
// when the underlying bloom bytes don't escape the decoder, i.e.
// when loading blooms for querying (bloom-gw) but not for writing (bloom-compactor).
func NewBlockQuerier(b *Block, noCapture bool) *BlockQuerier {
func NewBlockQuerier(b *Block, noCapture bool, maxPageSize int) *BlockQuerier {
return &BlockQuerier{
block: b,
series: NewLazySeriesIter(b),
blooms: NewLazyBloomIter(b, noCapture),
blooms: NewLazyBloomIter(b, noCapture, maxPageSize),
}
}

Expand Down
11 changes: 5 additions & 6 deletions pkg/storage/bloom/v1/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ import (
// NB(chaudum): Some block pages are way bigger than others (400MiB and
// bigger), and loading multiple pages into memory in parallel can cause the
// gateways to OOM.
// Figure out a decent maximum page size that we can process.
// TODO(chaudum): Make max page size configurable
var maxPageSize = 64 << 20 // 64MB
var ErrPageTooLarge = errors.Errorf("bloom page too large: size limit is %.1fMiB", float64(maxPageSize)/float64(1<<20))
// Figure out a decent default maximum page size that we can process.
var DefaultMaxPageSize = 64 << 20 // 64MB
var ErrPageTooLarge = errors.Errorf("bloom page too large")

type Bloom struct {
filter.ScalableBloomFilter
Expand Down Expand Up @@ -276,7 +275,7 @@ func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) (uint32, error) {
return checksum, nil
}

func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, metrics *Metrics) (res *BloomPageDecoder, err error) {
func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize int, metrics *Metrics) (res *BloomPageDecoder, err error) {
if pageIdx < 0 || pageIdx >= len(b.pageHeaders) {
metrics.pagesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Inc()
metrics.bytesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Add(float64(b.pageHeaders[pageIdx].DecompressedLen))
Expand All @@ -292,7 +291,7 @@ func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, metrics *Met
return nil, ErrPageTooLarge
}

if _, err := r.Seek(int64(page.Offset), io.SeekStart); err != nil {
if _, err = r.Seek(int64(page.Offset), io.SeekStart); err != nil {
metrics.pagesSkipped.WithLabelValues(pageTypeBloom, skipReasonErr).Inc()
metrics.bytesSkipped.WithLabelValues(pageTypeBloom, skipReasonErr).Add(float64(page.DecompressedLen))
return nil, errors.Wrap(err, "seeking to bloom page")
Expand Down
7 changes: 5 additions & 2 deletions pkg/storage/bloom/v1/bloom_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type LazyBloomIter struct {
usePool bool

b *Block
m int // max page size in bytes

// state
initialized bool
Expand All @@ -23,10 +24,11 @@ type LazyBloomIter struct {
// will be returned to the pool for efficiency.
// This can only safely be used when the underlying bloom
// bytes don't escape the decoder.
func NewLazyBloomIter(b *Block, pool bool) *LazyBloomIter {
func NewLazyBloomIter(b *Block, pool bool, maxSize int) *LazyBloomIter {
return &LazyBloomIter{
usePool: pool,
b: b,
m: maxSize,
}
}

Expand Down Expand Up @@ -58,7 +60,7 @@ func (it *LazyBloomIter) Seek(offset BloomOffset) {
it.err = errors.Wrap(err, "getting blooms reader")
return
}
decoder, err := it.b.blooms.BloomPageDecoder(r, offset.Page, it.b.metrics)
decoder, err := it.b.blooms.BloomPageDecoder(r, offset.Page, it.m, it.b.metrics)
if err != nil {
it.err = errors.Wrap(err, "loading bloom page")
return
Expand Down Expand Up @@ -97,6 +99,7 @@ func (it *LazyBloomIter) next() bool {
it.curPage, err = it.b.blooms.BloomPageDecoder(
r,
it.curPageIndex,
it.m,
it.b.metrics,
)
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions pkg/storage/bloom/v1/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestBlockBuilder_RoundTrip(t *testing.T) {
}

block := NewBlock(tc.reader, NewMetrics(nil))
querier := NewBlockQuerier(block, false)
querier := NewBlockQuerier(block, false, DefaultMaxPageSize)

err = block.LoadHeaders()
require.Nil(t, err)
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestMergeBuilder(t *testing.T) {
itr := NewSliceIter[SeriesWithBloom](data[min:max])
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
blocks = append(blocks, NewPeekingIter[*SeriesWithBloom](NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), false)))
blocks = append(blocks, NewPeekingIter[*SeriesWithBloom](NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), false, DefaultMaxPageSize)))
}

// We're not testing the ability to extend a bloom in this test
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestMergeBuilder(t *testing.T) {
require.Nil(t, err)

block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, false)
querier := NewBlockQuerier(block, false, DefaultMaxPageSize)

EqualIterators[*SeriesWithBloom](
t,
Expand Down Expand Up @@ -296,7 +296,7 @@ func TestBlockReset(t *testing.T) {
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, false)
querier := NewBlockQuerier(block, false, DefaultMaxPageSize)

rounds := make([][]model.Fingerprint, 2)

Expand Down Expand Up @@ -362,7 +362,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, false)
querier := NewBlockQuerier(block, false, DefaultMaxPageSize)

// rather than use the block querier directly, collect it's data
// so we can use it in a few places later
Expand Down Expand Up @@ -423,7 +423,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {

// ensure the new block contains one copy of all the data
// by comparing it against an iterator over the source data
mergedBlockQuerier := NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), false)
mergedBlockQuerier := NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), false, DefaultMaxPageSize)
sourceItr := NewSliceIter[*SeriesWithBloom](PointerSlice[SeriesWithBloom](xs))

EqualIterators[*SeriesWithBloom](
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/bloom/v1/fuse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestFusedQuerier(t *testing.T) {
require.NoError(t, err)
require.False(t, itr.Next())
block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, true)
querier := NewBlockQuerier(block, true, DefaultMaxPageSize)

n := 2
nReqs := numSeries / n
Expand Down Expand Up @@ -143,7 +143,7 @@ func setupBlockForBenchmark(b *testing.B) (*BlockQuerier, [][]Request, []chan Ou
_, err = builder.BuildFrom(itr)
require.Nil(b, err)
block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, true)
querier := NewBlockQuerier(block, true, DefaultMaxPageSize)

numRequestChains := 100
seriesPerRequest := 100
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/stores/shipper/bloomshipper/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,15 @@ func (b *BlockDirectory) resolveSize() error {

// BlockQuerier returns a new block querier from the directory.
// The passed function `close` is called when the the returned querier is closed.

func (b BlockDirectory) BlockQuerier(
usePool bool,
close func() error,
maxPageSize int,
metrics *v1.Metrics,
) *CloseableBlockQuerier {
return &CloseableBlockQuerier{
BlockQuerier: v1.NewBlockQuerier(b.Block(metrics), usePool),
BlockQuerier: v1.NewBlockQuerier(b.Block(metrics), usePool, maxPageSize),
BlockRef: b.BlockRef,
close: close,
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

type Config struct {
WorkingDirectory string `yaml:"working_directory"`
MaxQueryPageSize flagext.Bytes `yaml:"max_query_page_size"`
BlocksDownloadingQueue DownloadingQueueConfig `yaml:"blocks_downloading_queue"`
BlocksCache BlocksCacheConfig `yaml:"blocks_cache"`
MetasCache cache.Config `yaml:"metas_cache"`
Expand All @@ -31,6 +32,8 @@ func (cfg *DownloadingQueueConfig) RegisterFlagsWithPrefix(prefix string, f *fla

func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&c.WorkingDirectory, prefix+"shipper.working-directory", "bloom-shipper", "Working directory to store downloaded Bloom Blocks.")
_ = c.MaxQueryPageSize.Set("64MiB") // default should match the one set in pkg/storage/bloom/v1/bloom.go
f.Var(&c.MaxQueryPageSize, prefix+"max-query-page-size", "Maximum size of bloom pages that should be queried. Larger pages than this limit are skipped when querying blooms to limit memory usage.")
c.BlocksDownloadingQueue.RegisterFlagsWithPrefix(prefix+"shipper.blocks-downloading-queue.", f)
c.BlocksCache.RegisterFlagsWithPrefixAndDefaults(prefix+"blocks-cache.", "Cache for bloom blocks. ", f, 24*time.Hour)
c.MetasCache.RegisterFlagsWithPrefix(prefix+"metas-cache.", "Cache for bloom metas. ", f)
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func (f *Fetcher) FetchBlocks(ctx context.Context, refs []BlockRef, opts ...Fetc
func() error {
return f.blocksCache.Release(ctx, key)
},
f.cfg.maxBloomPageSize,
f.bloomMetrics,
)
}
Expand Down Expand Up @@ -277,6 +278,7 @@ func (f *Fetcher) FetchBlocks(ctx context.Context, refs []BlockRef, opts ...Fetc
func() error {
return f.blocksCache.Release(ctx, key)
},
f.cfg.maxBloomPageSize,
f.bloomMetrics,
)
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/storage/stores/shipper/bloomshipper/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ type StoreWithMetrics interface {
}

type bloomStoreConfig struct {
workingDir string
numWorkers int
workingDir string
numWorkers int
maxBloomPageSize int
}

// Compiler check to ensure bloomStoreEntry implements the Store interface
Expand Down Expand Up @@ -192,8 +193,9 @@ func NewBloomStore(

// TODO(chaudum): Remove wrapper
cfg := bloomStoreConfig{
workingDir: storageConfig.BloomShipperConfig.WorkingDirectory,
numWorkers: storageConfig.BloomShipperConfig.BlocksDownloadingQueue.WorkersCount,
workingDir: storageConfig.BloomShipperConfig.WorkingDirectory,
numWorkers: storageConfig.BloomShipperConfig.BlocksDownloadingQueue.WorkersCount,
maxBloomPageSize: int(storageConfig.BloomShipperConfig.MaxQueryPageSize),
}

if err := util.EnsureDirectory(cfg.workingDir); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion tools/bloom/inspector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func main() {

r := v1.NewDirectoryBlockReader(path)
b := v1.NewBlock(r, v1.NewMetrics(nil))
q := v1.NewBlockQuerier(b, true)
q := v1.NewBlockQuerier(b, true, v1.DefaultMaxPageSize)

md, err := q.Metadata()
if err != nil {
Expand Down
Loading