Skip to content

Commit

Permalink
feat(blooms): Blooms/v2 encoding multipart series (#13093)
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d authored Jun 6, 2024
1 parent c9ae0b7 commit fbe7c55
Show file tree
Hide file tree
Showing 56 changed files with 2,505 additions and 3,522 deletions.
28 changes: 14 additions & 14 deletions pkg/bloombuild/builder/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,9 @@ func newBatchedBlockLoader(
}

// compiler checks
var _ v1.Iterator[*v1.SeriesWithBloom] = &blockLoadingIter{}
var _ v1.CloseableIterator[*v1.SeriesWithBloom] = &blockLoadingIter{}
var _ v1.ResettableIterator[*v1.SeriesWithBloom] = &blockLoadingIter{}
var _ v1.Iterator[*v1.SeriesWithBlooms] = &blockLoadingIter{}
var _ v1.CloseableIterator[*v1.SeriesWithBlooms] = &blockLoadingIter{}
var _ v1.ResettableIterator[*v1.SeriesWithBlooms] = &blockLoadingIter{}

// TODO(chaudum): testware
func newBlockLoadingIter(ctx context.Context, blocks []bloomshipper.BlockRef, fetcher FetchFunc[bloomshipper.BlockRef, *bloomshipper.CloseableBlockQuerier], batchSize int) *blockLoadingIter {
Expand All @@ -196,13 +196,13 @@ type blockLoadingIter struct {
// internals
initialized bool
err error
iter v1.Iterator[*v1.SeriesWithBloom]
iter v1.Iterator[*v1.SeriesWithBlooms]
loader *batchedLoader[bloomshipper.BlockRef, *bloomshipper.CloseableBlockQuerier, *bloomshipper.CloseableBlockQuerier]
loaded map[io.Closer]struct{}
}

// At implements v1.Iterator.
func (i *blockLoadingIter) At() *v1.SeriesWithBloom {
func (i *blockLoadingIter) At() *v1.SeriesWithBlooms {
if !i.initialized {
panic("iterator not initialized")
}
Expand All @@ -229,7 +229,7 @@ func (i *blockLoadingIter) init() {
i.overlapping = overlappingBlocksIter(i.inputs)

// set initial iter
i.iter = v1.NewEmptyIter[*v1.SeriesWithBloom]()
i.iter = v1.NewEmptyIter[*v1.SeriesWithBlooms]()

// set "match all" filter function if not present
if i.filter == nil {
Expand All @@ -249,22 +249,22 @@ func (i *blockLoadingIter) loadNext() bool {
loader := newBatchedBlockLoader(i.ctx, i.fetcher, blockRefs, i.batchSize)
filtered := v1.NewFilterIter[*bloomshipper.CloseableBlockQuerier](loader, i.filter)

iters := make([]v1.PeekingIterator[*v1.SeriesWithBloom], 0, len(blockRefs))
iters := make([]v1.PeekingIterator[*v1.SeriesWithBlooms], 0, len(blockRefs))
for filtered.Next() {
bq := filtered.At()
i.loaded[bq] = struct{}{}
iter, err := bq.SeriesIter()
if err != nil {
i.err = err
i.iter = v1.NewEmptyIter[*v1.SeriesWithBloom]()
i.iter = v1.NewEmptyIter[*v1.SeriesWithBlooms]()
return false
}
iters = append(iters, iter)
}

if err := filtered.Err(); err != nil {
i.err = err
i.iter = v1.NewEmptyIter[*v1.SeriesWithBloom]()
i.iter = v1.NewEmptyIter[*v1.SeriesWithBlooms]()
return false
}

Expand All @@ -278,12 +278,12 @@ func (i *blockLoadingIter) loadNext() bool {
// two overlapping blocks can conceivably have the same series, so we need to dedupe,
// preferring the one with the most chunks already indexed since we'll have
// to add fewer chunks to the bloom
i.iter = v1.NewDedupingIter[*v1.SeriesWithBloom, *v1.SeriesWithBloom](
func(a, b *v1.SeriesWithBloom) bool {
i.iter = v1.NewDedupingIter[*v1.SeriesWithBlooms, *v1.SeriesWithBlooms](
func(a, b *v1.SeriesWithBlooms) bool {
return a.Series.Fingerprint == b.Series.Fingerprint
},
v1.Identity[*v1.SeriesWithBloom],
func(a, b *v1.SeriesWithBloom) *v1.SeriesWithBloom {
v1.Identity[*v1.SeriesWithBlooms],
func(a, b *v1.SeriesWithBlooms) *v1.SeriesWithBlooms {
if len(a.Series.Chunks) > len(b.Series.Chunks) {
return a
}
Expand All @@ -294,7 +294,7 @@ func (i *blockLoadingIter) loadNext() bool {
return i.iter.Next()
}

i.iter = v1.NewEmptyIter[*v1.SeriesWithBloom]()
i.iter = v1.NewEmptyIter[*v1.SeriesWithBlooms]()
i.err = i.overlapping.Err()
return false
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/bloombuild/builder/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"testing"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
Expand Down Expand Up @@ -208,3 +209,12 @@ func TestOverlappingBlocksIter(t *testing.T) {
})
}
}

func genBlockRef(min, max model.Fingerprint) bloomshipper.BlockRef {
bounds := v1.NewBounds(min, max)
return bloomshipper.BlockRef{
Ref: bloomshipper.Ref{
Bounds: bounds,
},
}
}
2 changes: 1 addition & 1 deletion pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func (b *Builder) loadWorkForGap(
tenant string,
id tsdb.Identifier,
gap protos.GapWithBlocks,
) (v1.Iterator[*v1.Series], v1.CloseableResettableIterator[*v1.SeriesWithBloom], error) {
) (v1.Iterator[*v1.Series], v1.CloseableResettableIterator[*v1.SeriesWithBlooms], error) {
// load a series iterator for the gap
seriesItr, err := b.tsdbStore.LoadTSDB(ctx, table, tenant, id, gap.Bounds)
if err != nil {
Expand Down
57 changes: 21 additions & 36 deletions pkg/bloombuild/builder/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"io"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -45,7 +44,7 @@ type SimpleBloomGenerator struct {
userID string
store v1.Iterator[*v1.Series]
chunkLoader ChunkLoader
blocksIter v1.ResettableIterator[*v1.SeriesWithBloom]
blocksIter v1.ResettableIterator[*v1.SeriesWithBlooms]

// options to build blocks with
opts v1.BlockOptions
Expand All @@ -68,7 +67,7 @@ func NewSimpleBloomGenerator(
opts v1.BlockOptions,
store v1.Iterator[*v1.Series],
chunkLoader ChunkLoader,
blocksIter v1.ResettableIterator[*v1.SeriesWithBloom],
blocksIter v1.ResettableIterator[*v1.SeriesWithBlooms],
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
reporter func(model.Fingerprint),
metrics *Metrics,
Expand Down Expand Up @@ -98,44 +97,30 @@ func NewSimpleBloomGenerator(
}
}

func (s *SimpleBloomGenerator) populator(ctx context.Context) func(series *v1.Series, bloom *v1.Bloom) (int, bool, error) {
return func(series *v1.Series, bloom *v1.Bloom) (int, bool, error) {
start := time.Now()
func (s *SimpleBloomGenerator) populator(ctx context.Context) v1.BloomPopulatorFunc {
return func(
series *v1.Series,
srcBlooms v1.SizedIterator[*v1.Bloom],
toAdd v1.ChunkRefs,
ch chan *v1.BloomCreation,
) {
level.Debug(s.logger).Log(
"msg", "populating bloom filter",
"stage", "before",
"fp", series.Fingerprint,
"chunks", len(series.Chunks),
)
chunkItersWithFP, err := s.chunkLoader.Load(ctx, s.userID, series)
if err != nil {
return 0, false, errors.Wrapf(err, "failed to load chunks for series: %+v", series)
}

bytesAdded, skip, err := s.tokenizer.Populate(
&v1.SeriesWithBloom{
Series: series,
Bloom: bloom,
},
chunkItersWithFP.itr,
)
chunkItersWithFP := s.chunkLoader.Load(ctx, s.userID, &v1.Series{
Fingerprint: series.Fingerprint,
Chunks: toAdd,
})

level.Debug(s.logger).Log(
"msg", "populating bloom filter",
"stage", "after",
"fp", series.Fingerprint,
"chunks", len(series.Chunks),
"series_bytes", bytesAdded,
"duration", time.Since(start),
"err", err,
)
s.tokenizer.Populate(srcBlooms, chunkItersWithFP.itr, ch)

if s.reporter != nil {
s.reporter(series.Fingerprint)
}
return bytesAdded, skip, err
}

}

func (s *SimpleBloomGenerator) Generate(ctx context.Context) *LazyBlockBuilderIterator {
Expand Down Expand Up @@ -179,10 +164,10 @@ type LazyBlockBuilderIterator struct {
ctx context.Context
opts v1.BlockOptions
metrics *Metrics
populate func(*v1.Series, *v1.Bloom) (int, bool, error)
populate v1.BloomPopulatorFunc
readWriterFn func() (v1.BlockWriter, v1.BlockReader)
series v1.PeekingIterator[*v1.Series]
blocks v1.ResettableIterator[*v1.SeriesWithBloom]
blocks v1.ResettableIterator[*v1.SeriesWithBlooms]

bytesAdded int
curr *v1.Block
Expand All @@ -193,10 +178,10 @@ func NewLazyBlockBuilderIterator(
ctx context.Context,
opts v1.BlockOptions,
metrics *Metrics,
populate func(*v1.Series, *v1.Bloom) (int, bool, error),
populate v1.BloomPopulatorFunc,
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
series v1.PeekingIterator[*v1.Series],
blocks v1.ResettableIterator[*v1.SeriesWithBloom],
blocks v1.ResettableIterator[*v1.SeriesWithBlooms],
) *LazyBlockBuilderIterator {
return &LazyBlockBuilderIterator{
ctx: ctx,
Expand Down Expand Up @@ -270,7 +255,7 @@ type ChunkItersByFingerprint struct {

// ChunkLoader loads chunks from a store
type ChunkLoader interface {
Load(ctx context.Context, userID string, series *v1.Series) (*ChunkItersByFingerprint, error)
Load(ctx context.Context, userID string, series *v1.Series) *ChunkItersByFingerprint
}

// StoreChunkLoader loads chunks from a store
Expand All @@ -286,7 +271,7 @@ func NewStoreChunkLoader(fetcherProvider stores.ChunkFetcherProvider, metrics *M
}
}

func (s *StoreChunkLoader) Load(ctx context.Context, userID string, series *v1.Series) (*ChunkItersByFingerprint, error) {
func (s *StoreChunkLoader) Load(ctx context.Context, userID string, series *v1.Series) *ChunkItersByFingerprint {
// NB(owen-d): This is probably unnecessary as we should only have one fetcher
// because we'll only be working on a single index period at a time, but this should protect
// us in the case of refactoring/changing this and likely isn't a perf bottleneck.
Expand Down Expand Up @@ -317,5 +302,5 @@ func (s *StoreChunkLoader) Load(ctx context.Context, userID string, series *v1.S
return &ChunkItersByFingerprint{
fp: series.Fingerprint,
itr: newBatchedChunkLoader(ctx, fetchers, inputs, s.metrics, batchedLoaderDefaultBatchSize),
}, nil
}
}
30 changes: 11 additions & 19 deletions pkg/bloombuild/builder/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
)

func blocksFromSchema(t *testing.T, n int, options v1.BlockOptions) (res []*v1.Block, data []v1.SeriesWithBloom, refs []bloomshipper.BlockRef) {
func blocksFromSchema(t *testing.T, n int, options v1.BlockOptions) (res []*v1.Block, data []v1.SeriesWithBlooms, refs []bloomshipper.BlockRef) {
return blocksFromSchemaWithRange(t, n, options, 0, 0xffff)
}

// splits 100 series across `n` non-overlapping blocks.
// uses options to build blocks with.
func blocksFromSchemaWithRange(t *testing.T, n int, options v1.BlockOptions, fromFP, throughFp model.Fingerprint) (res []*v1.Block, data []v1.SeriesWithBloom, refs []bloomshipper.BlockRef) {
func blocksFromSchemaWithRange(t *testing.T, n int, options v1.BlockOptions, fromFP, throughFp model.Fingerprint) (res []*v1.Block, data []v1.SeriesWithBlooms, refs []bloomshipper.BlockRef) {
if 100%n != 0 {
panic("100 series must be evenly divisible by n")
}

numSeries := 100
data, _ = v1.MkBasicSeriesWithBlooms(numSeries, 0, fromFP, throughFp, 0, 10000)
data, _ = v1.MkBasicSeriesWithBlooms(numSeries, fromFP, throughFp, 0, 10000)

seriesPerBlock := numSeries / n

Expand All @@ -46,7 +46,7 @@ func blocksFromSchemaWithRange(t *testing.T, n int, options v1.BlockOptions, fro

minIdx, maxIdx := i*seriesPerBlock, (i+1)*seriesPerBlock

itr := v1.NewSliceIter[v1.SeriesWithBloom](data[minIdx:maxIdx])
itr := v1.NewSliceIter[v1.SeriesWithBlooms](data[minIdx:maxIdx])
_, err = builder.BuildFrom(itr)
require.Nil(t, err)

Expand All @@ -62,11 +62,11 @@ func blocksFromSchemaWithRange(t *testing.T, n int, options v1.BlockOptions, fro
// doesn't actually load any chunks
type dummyChunkLoader struct{}

func (dummyChunkLoader) Load(_ context.Context, _ string, series *v1.Series) (*ChunkItersByFingerprint, error) {
func (dummyChunkLoader) Load(_ context.Context, _ string, series *v1.Series) *ChunkItersByFingerprint {
return &ChunkItersByFingerprint{
fp: series.Fingerprint,
itr: v1.NewEmptyIter[v1.ChunkRefWithIter](),
}, nil
}
}

func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Series], blocks []*v1.Block, refs []bloomshipper.BlockRef) *SimpleBloomGenerator {
Expand Down Expand Up @@ -132,9 +132,9 @@ func TestSimpleBloomGenerator(t *testing.T) {
} {
t.Run(fmt.Sprintf("%s/%s", tc.desc, enc), func(t *testing.T) {
sourceBlocks, data, refs := blocksFromSchemaWithRange(t, 2, tc.fromSchema, 0x00000, 0x6ffff)
storeItr := v1.NewMapIter[v1.SeriesWithBloom, *v1.Series](
v1.NewSliceIter[v1.SeriesWithBloom](data),
func(swb v1.SeriesWithBloom) *v1.Series {
storeItr := v1.NewMapIter[v1.SeriesWithBlooms, *v1.Series](
v1.NewSliceIter[v1.SeriesWithBlooms](data),
func(swb v1.SeriesWithBlooms) *v1.Series {
return swb.Series
},
)
Expand All @@ -150,9 +150,9 @@ func TestSimpleBloomGenerator(t *testing.T) {

// Check all the input series are present in the output blocks.
expectedRefs := v1.PointerSlice(data)
outputRefs := make([]*v1.SeriesWithBloom, 0, len(data))
outputRefs := make([]*v1.SeriesWithBlooms, 0, len(data))
for _, block := range outputBlocks {
bq := v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize)
bq := v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize).Iter()
for bq.Next() {
outputRefs = append(outputRefs, bq.At())
}
Expand All @@ -164,13 +164,5 @@ func TestSimpleBloomGenerator(t *testing.T) {
})
}
}
}

func genBlockRef(min, max model.Fingerprint) bloomshipper.BlockRef {
bounds := v1.NewBounds(min, max)
return bloomshipper.BlockRef{
Ref: bloomshipper.Ref{
Bounds: bounds,
},
}
}
2 changes: 1 addition & 1 deletion pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (p *Planner) loadWork(
if err != nil {
return nil, fmt.Errorf("error loading tenants: %w", err)
}
level.Debug(p.logger).Log("msg", "loaded tenants", "table", table, "tenants", tenants.Len())
level.Debug(p.logger).Log("msg", "loaded tenants", "table", table, "tenants", tenants.Remaining())

for tenants.Next() && tenants.Err() == nil && ctx.Err() == nil {
p.metrics.tenantsDiscovered.Inc()
Expand Down
Loading

0 comments on commit fbe7c55

Please sign in to comment.