Skip to content

Commit

Permalink
perf(blooms): always return bloom pages to allocator (#13288)
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d authored Jun 24, 2024
1 parent 0ddf1e6 commit 0cb3ff1
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 21 deletions.
30 changes: 19 additions & 11 deletions pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pkg/errors"

"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/multierror"

v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/config"
Expand Down Expand Up @@ -113,13 +114,14 @@ func (p *processor) processTasks(ctx context.Context, tenant string, day config.
}

func (p *processor) processBlocks(ctx context.Context, bqs []*bloomshipper.CloseableBlockQuerier, data []blockWithTasks) error {

// We opportunistically close blocks during iteration to allow returning memory to the pool, etc,
// as soon as possible, but since we exit early on error, we need to ensure we close all blocks.
hasClosed := make([]bool, len(bqs))
defer func() {
for i := range bqs {
if bqs[i] == nil {
continue
for i, bq := range bqs {
if !hasClosed[i] {
_ = bq.Close()
}
bqs[i].Close()
}
}()

Expand All @@ -136,15 +138,21 @@ func (p *processor) processBlocks(ctx context.Context, bqs []*bloomshipper.Close
return errors.Errorf("block and querier bounds differ: %s vs %s", block.ref.Bounds, bq.Bounds)
}

err := p.processBlock(ctx, bq.BlockQuerier, block.tasks)
if err != nil {
return errors.Wrap(err, "processing block")
}
return nil
var errs multierror.MultiError
errs.Add(
errors.Wrap(
p.processBlock(ctx, bq, block.tasks),
"processing block",
),
)
errs.Add(bq.Close())
hasClosed[i] = true
return errs.Err()
})
}

func (p *processor) processBlock(_ context.Context, blockQuerier *v1.BlockQuerier, tasks []Task) error {
func (p *processor) processBlock(_ context.Context, bq *bloomshipper.CloseableBlockQuerier, tasks []Task) (err error) {
blockQuerier := bq.BlockQuerier
schema, err := blockQuerier.Schema()
if err != nil {
return err
Expand Down
11 changes: 6 additions & 5 deletions pkg/storage/bloom/v1/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func (bq *BlockQuerier) Schema() (Schema, error) {
}

func (bq *BlockQuerier) Reset() error {
bq.blooms.Reset()
return bq.LazySeriesIter.Seek(0)
}

Expand All @@ -147,10 +148,6 @@ func (bq *BlockQuerier) Err() error {
return bq.blooms.Err()
}

func (bq *BlockQuerier) Close() {
bq.blooms.Close()
}

type BlockQuerierIter struct {
*BlockQuerier
}
Expand All @@ -163,7 +160,11 @@ func (bq *BlockQuerier) Iter() *BlockQuerierIter {
}

func (b *BlockQuerierIter) Next() bool {
return b.LazySeriesIter.Next()
next := b.LazySeriesIter.Next()
if !next {
b.blooms.Reset()
}
return next
}

func (b *BlockQuerierIter) At() *SeriesWithBlooms {
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/bloom/v1/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,12 @@ func (d *BloomPageDecoder) Relinquish(alloc mempool.Allocator) {

data := d.data
d.data = nil
d.Reset() // Reset for cleaning up residual references to data via `dec`

if cap(data) > 0 {
_ = alloc.Put(data)
}

}

func (d *BloomPageDecoder) Reset() {
Expand Down
9 changes: 7 additions & 2 deletions pkg/storage/bloom/v1/bloom_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ func (it *LazyBloomIter) Err() error {
}
}

func (it *LazyBloomIter) Close() {
it.curPage.Relinquish(it.alloc)
func (it *LazyBloomIter) Reset() {
it.err = nil
it.curPageIndex = 0
if it.curPage != nil {
it.curPage.Relinquish(it.alloc)
}
it.curPage = nil
}
8 changes: 5 additions & 3 deletions pkg/storage/stores/shipper/bloomshipper/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/multierror"
"github.com/pkg/errors"

v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
Expand All @@ -23,11 +24,12 @@ type CloseableBlockQuerier struct {
}

func (c *CloseableBlockQuerier) Close() error {
c.BlockQuerier.Close()
var err multierror.MultiError
err.Add(c.BlockQuerier.Reset())
if c.close != nil {
return c.close()
err.Add(c.close())
}
return nil
return err.Err()
}

func (c *CloseableBlockQuerier) SeriesIter() (v1.PeekingIterator[*v1.SeriesWithBlooms], error) {
Expand Down

0 comments on commit 0cb3ff1

Please sign in to comment.