diff --git a/pkg/bloomgateway/processor.go b/pkg/bloomgateway/processor.go index e95953c94bf42..f4b1ebe3749b2 100644 --- a/pkg/bloomgateway/processor.go +++ b/pkg/bloomgateway/processor.go @@ -10,6 +10,7 @@ import ( "github.com/pkg/errors" "github.com/grafana/dskit/concurrency" + "github.com/grafana/dskit/multierror" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/config" @@ -113,13 +114,14 @@ func (p *processor) processTasks(ctx context.Context, tenant string, day config. } func (p *processor) processBlocks(ctx context.Context, bqs []*bloomshipper.CloseableBlockQuerier, data []blockWithTasks) error { - + // We opportunistically close blocks during iteration to allow returning memory to the pool, etc, + // as soon as possible, but since we exit early on error, we need to ensure we close all blocks. + hasClosed := make([]bool, len(bqs)) defer func() { - for i := range bqs { - if bqs[i] == nil { - continue + for i, bq := range bqs { + if !hasClosed[i] { + _ = bq.Close() } - bqs[i].Close() } }() @@ -136,15 +138,21 @@ func (p *processor) processBlocks(ctx context.Context, bqs []*bloomshipper.Close return errors.Errorf("block and querier bounds differ: %s vs %s", block.ref.Bounds, bq.Bounds) } - err := p.processBlock(ctx, bq.BlockQuerier, block.tasks) - if err != nil { - return errors.Wrap(err, "processing block") - } - return nil + var errs multierror.MultiError + errs.Add( + errors.Wrap( + p.processBlock(ctx, bq, block.tasks), + "processing block", + ), + ) + errs.Add(bq.Close()) + hasClosed[i] = true + return errs.Err() }) } -func (p *processor) processBlock(_ context.Context, blockQuerier *v1.BlockQuerier, tasks []Task) error { +func (p *processor) processBlock(_ context.Context, bq *bloomshipper.CloseableBlockQuerier, tasks []Task) (err error) { + blockQuerier := bq.BlockQuerier schema, err := blockQuerier.Schema() if err != nil { return err diff --git a/pkg/storage/bloom/v1/block.go b/pkg/storage/bloom/v1/block.go index 8aaf21d5e7516..5ad4adbe7a0d6 100644 --- a/pkg/storage/bloom/v1/block.go +++ b/pkg/storage/bloom/v1/block.go @@ -136,6 +136,7 @@ func (bq *BlockQuerier) Schema() (Schema, error) { } func (bq *BlockQuerier) Reset() error { + bq.blooms.Reset() return bq.LazySeriesIter.Seek(0) } @@ -147,10 +148,6 @@ func (bq *BlockQuerier) Err() error { return bq.blooms.Err() } -func (bq *BlockQuerier) Close() { - bq.blooms.Close() -} - type BlockQuerierIter struct { *BlockQuerier } @@ -163,7 +160,11 @@ func (bq *BlockQuerier) Iter() *BlockQuerierIter { } func (b *BlockQuerierIter) Next() bool { - return b.LazySeriesIter.Next() + next := b.LazySeriesIter.Next() + if !next { + b.blooms.Reset() + } + return next } func (b *BlockQuerierIter) At() *SeriesWithBlooms { diff --git a/pkg/storage/bloom/v1/bloom.go b/pkg/storage/bloom/v1/bloom.go index dfd8b758c3385..cf7053b07308a 100644 --- a/pkg/storage/bloom/v1/bloom.go +++ b/pkg/storage/bloom/v1/bloom.go @@ -175,10 +175,12 @@ func (d *BloomPageDecoder) Relinquish(alloc mempool.Allocator) { data := d.data d.data = nil + d.Reset() // Reset for cleaning up residual references to data via `dec` if cap(data) > 0 { _ = alloc.Put(data) } + } func (d *BloomPageDecoder) Reset() { diff --git a/pkg/storage/bloom/v1/bloom_querier.go b/pkg/storage/bloom/v1/bloom_querier.go index ab30b74f8a9eb..9e9ef14acd479 100644 --- a/pkg/storage/bloom/v1/bloom_querier.go +++ b/pkg/storage/bloom/v1/bloom_querier.go @@ -162,6 +162,11 @@ func (it *LazyBloomIter) Err() error { } } -func (it *LazyBloomIter) Close() { - it.curPage.Relinquish(it.alloc) +func (it *LazyBloomIter) Reset() { + it.err = nil + it.curPageIndex = 0 + if it.curPage != nil { + it.curPage.Relinquish(it.alloc) + } + it.curPage = nil } diff --git a/pkg/storage/stores/shipper/bloomshipper/cache.go b/pkg/storage/stores/shipper/bloomshipper/cache.go index 8b7ba7d253a94..eab363866e883 100644 --- a/pkg/storage/stores/shipper/bloomshipper/cache.go +++ b/pkg/storage/stores/shipper/bloomshipper/cache.go @@ -8,6 +8,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/dskit/multierror" "github.com/pkg/errors" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" @@ -23,11 +24,12 @@ type CloseableBlockQuerier struct { } func (c *CloseableBlockQuerier) Close() error { - c.BlockQuerier.Close() + var err multierror.MultiError + err.Add(c.BlockQuerier.Reset()) if c.close != nil { - return c.close() + err.Add(c.close()) } - return nil + return err.Err() } func (c *CloseableBlockQuerier) SeriesIter() (v1.PeekingIterator[*v1.SeriesWithBlooms], error) {