From 4a2048f1a95b200a3426b8c44385e82a0657b784 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Mon, 11 Nov 2024 15:07:15 +0100 Subject: [PATCH] fix(blooms): Copy chunks from ForSeries --- .../planner/strategies/chunksize.go | 73 ++++++++----------- 1 file changed, 31 insertions(+), 42 deletions(-) diff --git a/pkg/bloombuild/planner/strategies/chunksize.go b/pkg/bloombuild/planner/strategies/chunksize.go index 91f7223440d74..456183aa62ef5 100644 --- a/pkg/bloombuild/planner/strategies/chunksize.go +++ b/pkg/bloombuild/planner/strategies/chunksize.go @@ -155,20 +155,16 @@ func getBlocksMatchingBounds(metas []bloomshipper.Meta, bounds v1.FingerprintBou return deduped, nil } -type seriesWithChunks struct { - tsdb tsdb.SingleTenantTSDBIdentifier - fp model.Fingerprint - chunks []index.ChunkMeta -} - type seriesBatch struct { - series []seriesWithChunks + tsdb tsdb.SingleTenantTSDBIdentifier + series []*v1.Series size uint64 } -func newSeriesBatch() seriesBatch { +func newSeriesBatch(tsdb tsdb.SingleTenantTSDBIdentifier) seriesBatch { return seriesBatch{ - series: make([]seriesWithChunks, 0, 100), + tsdb: tsdb, + series: make([]*v1.Series, 0, 100), } } @@ -179,31 +175,14 @@ func (b *seriesBatch) Bounds() v1.FingerprintBounds { // We assume that the series are sorted by fingerprint. // This is guaranteed since series are iterated in order by the TSDB. - return v1.NewBounds(b.series[0].fp, b.series[len(b.series)-1].fp) + return v1.NewBounds(b.series[0].Fingerprint, b.series[len(b.series)-1].Fingerprint) } func (b *seriesBatch) V1Series() []*v1.Series { - series := make([]*v1.Series, 0, len(b.series)) - for _, s := range b.series { - res := &v1.Series{ - Fingerprint: s.fp, - Chunks: make(v1.ChunkRefs, 0, len(s.chunks)), - } - for _, chk := range s.chunks { - res.Chunks = append(res.Chunks, v1.ChunkRef{ - From: model.Time(chk.MinTime), - Through: model.Time(chk.MaxTime), - Checksum: chk.Checksum, - }) - } - - series = append(series, res) - } - - return series + return b.series } -func (b *seriesBatch) Append(s seriesWithChunks, size uint64) { +func (b *seriesBatch) Append(s *v1.Series, size uint64) { b.series = append(b.series, s) b.size += size } @@ -217,10 +196,7 @@ func (b *seriesBatch) Size() uint64 { } func (b *seriesBatch) TSDB() tsdb.SingleTenantTSDBIdentifier { - if len(b.series) == 0 { - return tsdb.SingleTenantTSDBIdentifier{} - } - return b.series[0].tsdb + return b.tsdb } func (s *ChunkSizeStrategy) sizedSeriesIter( @@ -230,9 +206,14 @@ func (s *ChunkSizeStrategy) sizedSeriesIter( targetTaskSizeBytes uint64, ) (iter.Iterator[seriesBatch], int, error) { batches := make([]seriesBatch, 0, 100) - currentBatch := newSeriesBatch() + var currentBatch seriesBatch for _, idx := range tsdbsWithGaps { + if currentBatch.Len() > 0 { + batches = append(batches, currentBatch) + } + currentBatch = newSeriesBatch(idx.tsdbIdentifier) + for _, gap := range idx.gaps { if err := idx.tsdb.ForSeries( ctx, @@ -253,14 +234,22 @@ func (s *ChunkSizeStrategy) sizedSeriesIter( // AND Adding this series to the batch would exceed the target task size. if currentBatch.Len() > 0 && currentBatch.Size()+seriesSize > targetTaskSizeBytes { batches = append(batches, currentBatch) - currentBatch = newSeriesBatch() + currentBatch = newSeriesBatch(idx.tsdbIdentifier) + } + + res := &v1.Series{ + Fingerprint: fp, + Chunks: make(v1.ChunkRefs, 0, len(chks)), + } + for _, chk := range chks { + res.Chunks = append(res.Chunks, v1.ChunkRef{ + From: model.Time(chk.MinTime), + Through: model.Time(chk.MaxTime), + Checksum: chk.Checksum, + }) } - currentBatch.Append(seriesWithChunks{ - tsdb: idx.tsdbIdentifier, - fp: fp, - chunks: chks, - }, seriesSize) + currentBatch.Append(res, seriesSize) return false } }, @@ -269,10 +258,10 @@ func (s *ChunkSizeStrategy) sizedSeriesIter( return nil, 0, err } - // Add the last batch for this TSDB if it's not empty. + // Add the last batch for this gap if it's not empty. if currentBatch.Len() > 0 { batches = append(batches, currentBatch) - currentBatch = newSeriesBatch() + currentBatch = newSeriesBatch(idx.tsdbIdentifier) } } }