diff --git a/pkg/storage/bloom/v1/filter/scalable.go b/pkg/storage/bloom/v1/filter/scalable.go index 2be2348f9ff5e..b6078c6dad336 100644 --- a/pkg/storage/bloom/v1/filter/scalable.go +++ b/pkg/storage/bloom/v1/filter/scalable.go @@ -110,6 +110,13 @@ func (s *ScalableBloomFilter) K() uint { return s.filters[len(s.filters)-1].K() } +func (s *ScalableBloomFilter) Count() (ct int) { + for _, filter := range s.filters { + ct += int(filter.Count()) + } + return +} + // FillRatio returns the average ratio of set bits across every filter. func (s *ScalableBloomFilter) FillRatio() float64 { var sum, count float64 diff --git a/pkg/storage/bloom/v1/fuse.go b/pkg/storage/bloom/v1/fuse.go index 3e14e57c58065..264e456161a05 100644 --- a/pkg/storage/bloom/v1/fuse.go +++ b/pkg/storage/bloom/v1/fuse.go @@ -300,6 +300,13 @@ func (fq *FusedQuerier) runSeries(schema Schema, series *SeriesWithOffsets, reqs // Test each bloom individually bloom := fq.bq.blooms.At() for j, req := range reqs { + // TODO(owen-d): this is a stopgap to avoid filtering broken blooms until we find their cause. + // In the case we don't have any data in the bloom, don't filter any chunks. + if bloom.ScalableBloomFilter.Count() == 0 { + for k := range inputs[j].InBlooms { + inputs[j].found[k] = true + } + } // shortcut: series level removal // we can skip testing chunk keys individually if the bloom doesn't match diff --git a/pkg/storage/bloom/v1/fuse_test.go b/pkg/storage/bloom/v1/fuse_test.go index 47c9348b3fe5c..9db4154ca2903 100644 --- a/pkg/storage/bloom/v1/fuse_test.go +++ b/pkg/storage/bloom/v1/fuse_test.go @@ -356,6 +356,69 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) { } } +func TestFusedQuerierSkipsEmptyBlooms(t *testing.T) { + // references for linking in memory reader+writer + indexBuf := bytes.NewBuffer(nil) + bloomsBuf := bytes.NewBuffer(nil) + writer := NewMemoryBlockWriter(indexBuf, bloomsBuf) + reader := NewByteReader(indexBuf, bloomsBuf) + + builder, err := NewBlockBuilder( + BlockOptions{ + Schema: Schema{ + version: DefaultSchemaVersion, + encoding: chunkenc.EncNone, + }, + SeriesPageSize: 100, + BloomPageSize: 10 << 10, + }, + writer, + ) + require.Nil(t, err) + + data := SeriesWithBlooms{ + Series: &Series{ + Fingerprint: 0, + Chunks: []ChunkRef{ + { + From: 0, + Through: 10, + Checksum: 0x1234, + }, + }, + }, + Blooms: v2.NewSliceIter([]*Bloom{ + // simulate empty bloom + { + *filter.NewScalableBloomFilter(1024, 0.01, 0.8), + }, + }), + } + + itr := v2.NewSliceIter[SeriesWithBlooms]([]SeriesWithBlooms{data}) + _, err = builder.BuildFrom(itr) + require.NoError(t, err) + require.False(t, itr.Next()) + block := NewBlock(reader, NewMetrics(nil)) + ch := make(chan Output, 1) + req := Request{ + Fp: data.Series.Fingerprint, + Chks: data.Series.Chunks, + Search: keysToBloomTest([][]byte{[]byte("foobar")}), + Response: ch, + Recorder: NewBloomRecorder(context.Background(), "unknown"), + } + err = NewBlockQuerier(block, BloomPagePool, DefaultMaxPageSize).Fuse( + []v2.PeekIterator[Request]{ + v2.NewPeekIter[Request](v2.NewSliceIter[Request]([]Request{req})), + }, + log.NewNopLogger(), + ).Run() + require.NoError(t, err) + x := <-ch + require.Equal(t, 0, len(x.Removals)) +} + func setupBlockForBenchmark(b *testing.B) (*BlockQuerier, [][]Request, []chan Output) { indexBuf := bytes.NewBuffer(nil) bloomsBuf := bytes.NewBuffer(nil)