Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(blooms): Ship chunkrefs in task payload #13677

Merged
merged 3 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func (b *Builder) processTask(
// Fetch blocks that aren't up to date but are in the desired fingerprint range
// to try and accelerate bloom creation.
level.Debug(logger).Log("msg", "loading series and blocks for gap", "blocks", len(gap.Blocks))
seriesItr, blocksIter, err := b.loadWorkForGap(ctx, task.Table, tenant, task.TSDB, gap)
seriesItr, blocksIter, err := b.loadWorkForGap(ctx, task.Table, gap)
if err != nil {
level.Error(logger).Log("msg", "failed to get series and blocks", "err", err)
return nil, fmt.Errorf("failed to get series and blocks: %w", err)
Expand Down Expand Up @@ -454,15 +454,9 @@ func (b *Builder) processTask(
func (b *Builder) loadWorkForGap(
ctx context.Context,
table config.DayTable,
tenant string,
id tsdb.Identifier,
gap protos.GapWithBlocks,
gap protos.Gap,
) (iter.Iterator[*v1.Series], iter.CloseResetIterator[*v1.SeriesWithBlooms], error) {
// load a series iterator for the gap
seriesItr, err := b.tsdbStore.LoadTSDB(ctx, table, tenant, id, gap.Bounds)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to load tsdb")
}
seriesItr := iter.NewCancelableIter[*v1.Series](ctx, iter.NewSliceIter[*v1.Series](gap.Series))

// load a blocks iterator for the gap
fetcher, err := b.bloomStore.Fetcher(table.ModelTime())
Expand Down
24 changes: 10 additions & 14 deletions pkg/bloombuild/common/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"strings"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
Expand All @@ -30,6 +29,11 @@ const (
gzipExtension = ".gz"
)

type ClosableForSeries interface {
sharding.ForSeries
Close() error
}

type TSDBStore interface {
UsersForPeriod(ctx context.Context, table config.DayTable) ([]string, error)
ResolveTSDBs(ctx context.Context, table config.DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error)
Expand All @@ -38,8 +42,7 @@ type TSDBStore interface {
table config.DayTable,
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
) (iter.Iterator[*v1.Series], error)
) (ClosableForSeries, error)
}

// BloomTSDBStore is a wrapper around the storage.Client interface which
Expand Down Expand Up @@ -90,8 +93,7 @@ func (b *BloomTSDBStore) LoadTSDB(
table config.DayTable,
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
) (iter.Iterator[*v1.Series], error) {
) (ClosableForSeries, error) {
withCompression := id.Name() + gzipExtension

data, err := b.storage.GetUserFile(ctx, table.Addr(), tenant, withCompression)
Expand All @@ -118,13 +120,8 @@ func (b *BloomTSDBStore) LoadTSDB(
}

idx := tsdb.NewTSDBIndex(reader)
defer func() {
if err := idx.Close(); err != nil {
level.Error(b.logger).Log("msg", "failed to close index", "err", err)
}
}()

return NewTSDBSeriesIter(ctx, tenant, idx, bounds)
return idx, nil
}

func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, bounds v1.FingerprintBounds) (iter.Iterator[*v1.Series], error) {
Expand Down Expand Up @@ -251,12 +248,11 @@ func (s *TSDBStores) LoadTSDB(
table config.DayTable,
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
) (iter.Iterator[*v1.Series], error) {
) (ClosableForSeries, error) {
store, err := s.storeForPeriod(table.DayTime)
if err != nil {
return nil, err
}

return store.LoadTSDB(ctx, table, tenant, id, bounds)
return store.LoadTSDB(ctx, table, tenant, id)
}
99 changes: 73 additions & 26 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,14 +365,37 @@ func (p *Planner) computeTasks(
return nil, nil, fmt.Errorf("failed to delete outdated metas during planning: %w", err)
}

// Resolve TSDBs
tsdbs, err := p.tsdbStore.ResolveTSDBs(ctx, table, tenant)
if err != nil {
level.Error(logger).Log("msg", "failed to resolve tsdbs", "err", err)
return nil, nil, fmt.Errorf("failed to resolve tsdbs: %w", err)
}

if len(tsdbs) == 0 {
return nil, metas, nil
}

openTSDBs, err := openAllTSDBs(ctx, table, tenant, p.tsdbStore, tsdbs)
if err != nil {
return nil, nil, fmt.Errorf("failed to open all tsdbs: %w", err)
}
defer func() {
for idx, reader := range openTSDBs {
if err := reader.Close(); err != nil {
level.Error(logger).Log("msg", "failed to close index", "err", err, "tsdb", idx.Name())
}
}
}()

for _, ownershipRange := range ownershipRanges {
logger := log.With(logger, "ownership", ownershipRange.String())

// Filter only the metas that overlap in the ownership range
metasInBounds := bloomshipper.FilterMetasOverlappingBounds(metas, ownershipRange)

// Find gaps in the TSDBs for this tenant/table
gaps, err := p.findOutdatedGaps(ctx, tenant, table, ownershipRange, metasInBounds, logger)
gaps, err := p.findOutdatedGaps(ctx, tenant, openTSDBs, ownershipRange, metasInBounds, logger)
if err != nil {
level.Error(logger).Log("msg", "failed to find outdated gaps", "err", err)
continue
Expand Down Expand Up @@ -453,6 +476,26 @@ func (p *Planner) processTenantTaskResults(
return tasksSucceed, nil
}

func openAllTSDBs(
ctx context.Context,
table config.DayTable,
tenant string,
store common.TSDBStore,
tsdbs []tsdb.SingleTenantTSDBIdentifier,
) (map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, error) {
openTSDBs := make(map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, len(tsdbs))
for _, idx := range tsdbs {
tsdb, err := store.LoadTSDB(ctx, table, tenant, idx)
if err != nil {
return nil, fmt.Errorf("failed to load tsdb: %w", err)
}

openTSDBs[idx] = tsdb
}

return openTSDBs, nil
}

// deleteOutdatedMetasAndBlocks filters out the outdated metas from the `metas` argument and deletes them from the store.
// It returns the up-to-date metas from the `metas` argument.
func (p *Planner) deleteOutdatedMetasAndBlocks(
Expand Down Expand Up @@ -655,28 +698,17 @@ func (p *Planner) tenants(ctx context.Context, table config.DayTable) (*iter.Sli
// This is a performance optimization to avoid expensive re-reindexing
type blockPlan struct {
tsdb tsdb.SingleTenantTSDBIdentifier
gaps []protos.GapWithBlocks
gaps []protos.Gap
}

func (p *Planner) findOutdatedGaps(
ctx context.Context,
tenant string,
table config.DayTable,
tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries,
ownershipRange v1.FingerprintBounds,
metas []bloomshipper.Meta,
logger log.Logger,
) ([]blockPlan, error) {
// Resolve TSDBs
tsdbs, err := p.tsdbStore.ResolveTSDBs(ctx, table, tenant)
if err != nil {
level.Error(logger).Log("msg", "failed to resolve tsdbs", "err", err)
return nil, fmt.Errorf("failed to resolve tsdbs: %w", err)
}

if len(tsdbs) == 0 {
return nil, nil
}

// Determine which TSDBs have gaps in the ownership range and need to
// be processed.
tsdbsWithGaps, err := gapsBetweenTSDBsAndMetas(ownershipRange, tsdbs, metas)
Expand All @@ -690,7 +722,7 @@ func (p *Planner) findOutdatedGaps(
return nil, nil
}

work, err := blockPlansForGaps(tsdbsWithGaps, metas)
work, err := blockPlansForGaps(ctx, tenant, tsdbsWithGaps, metas)
if err != nil {
level.Error(logger).Log("msg", "failed to create plan", "err", err)
return nil, fmt.Errorf("failed to create plan: %w", err)
Expand All @@ -701,18 +733,19 @@ func (p *Planner) findOutdatedGaps(

// Used to signal the gaps that need to be populated for a tsdb
type tsdbGaps struct {
tsdb tsdb.SingleTenantTSDBIdentifier
gaps []v1.FingerprintBounds
tsdbIdentifier tsdb.SingleTenantTSDBIdentifier
tsdb common.ClosableForSeries
gaps []v1.FingerprintBounds
}

// gapsBetweenTSDBsAndMetas returns if the metas are up-to-date with the TSDBs. This is determined by asserting
// that for each TSDB, there are metas covering the entire ownership range which were generated from that specific TSDB.
func gapsBetweenTSDBsAndMetas(
ownershipRange v1.FingerprintBounds,
tsdbs []tsdb.SingleTenantTSDBIdentifier,
tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries,
metas []bloomshipper.Meta,
) (res []tsdbGaps, err error) {
for _, db := range tsdbs {
for db, tsdb := range tsdbs {
id := db.Name()

relevantMetas := make([]v1.FingerprintBounds, 0, len(metas))
Expand All @@ -731,8 +764,9 @@ func gapsBetweenTSDBsAndMetas(

if len(gaps) > 0 {
res = append(res, tsdbGaps{
tsdb: db,
gaps: gaps,
tsdbIdentifier: db,
tsdb: tsdb,
gaps: gaps,
})
}
}
Expand All @@ -743,22 +777,35 @@ func gapsBetweenTSDBsAndMetas(
// blockPlansForGaps groups tsdb gaps we wish to fill with overlapping but out of date blocks.
// This allows us to expedite bloom generation by using existing blocks to fill in the gaps
// since many will contain the same chunks.
func blockPlansForGaps(tsdbs []tsdbGaps, metas []bloomshipper.Meta) ([]blockPlan, error) {
func blockPlansForGaps(
ctx context.Context,
tenant string,
tsdbs []tsdbGaps,
metas []bloomshipper.Meta,
) ([]blockPlan, error) {
plans := make([]blockPlan, 0, len(tsdbs))

for _, idx := range tsdbs {
plan := blockPlan{
tsdb: idx.tsdb,
gaps: make([]protos.GapWithBlocks, 0, len(idx.gaps)),
tsdb: idx.tsdbIdentifier,
gaps: make([]protos.Gap, 0, len(idx.gaps)),
}

for _, gap := range idx.gaps {
planGap := protos.GapWithBlocks{
planGap := protos.Gap{
Bounds: gap,
}

for _, meta := range metas {
seriesItr, err := common.NewTSDBSeriesIter(ctx, tenant, idx.tsdb, gap)
if err != nil {
return nil, fmt.Errorf("failed to load series from TSDB for gap (%s): %w", gap.String(), err)
}
planGap.Series, err = iter.Collect(seriesItr)
if err != nil {
return nil, fmt.Errorf("failed to collect series: %w", err)
}

for _, meta := range metas {
if meta.Bounds.Intersection(gap) == nil {
// this meta doesn't overlap the gap, skip
continue
Expand Down
Loading
Loading