diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index feaec4849c..d0275d9849 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -141,15 +141,6 @@ func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.Path return err } - ctx := context.Background() - v := verifier.NewManager(reg, verifier.Config{ - Logger: logger, - Bkt: bkt, - BackupBkt: backupBkt, - Fetcher: fetcher, - DeleteDelay: time.Duration(*deleteDelay), - }, r) - var idMatcher func(ulid.ULID) bool = nil if len(*ids) > 0 { idsMap := map[string]struct{}{} @@ -169,11 +160,12 @@ func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.Path } } + v := verifier.NewManager(reg, logger, bkt, backupBkt, fetcher, time.Duration(*deleteDelay), r) if *repair { - return v.VerifyAndRepair(ctx, idMatcher) + return v.VerifyAndRepair(context.Background(), idMatcher) } - return v.Verify(ctx, idMatcher) + return v.Verify(context.Background(), idMatcher) }) } diff --git a/pkg/block/index.go b/pkg/block/index.go index c51251b2de..12ffe5835b 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "hash/crc32" + "math" "math/rand" "path/filepath" "sort" @@ -29,7 +30,7 @@ import ( // VerifyIndex does a full run over a block index and verifies that it fulfills the order invariants. func VerifyIndex(logger log.Logger, fn string, minTime int64, maxTime int64) error { - stats, err := GatherIndexIssueStats(logger, fn, minTime, maxTime) + stats, err := GatherIndexHealthStats(logger, fn, minTime, maxTime) if err != nil { return err } @@ -37,9 +38,9 @@ func VerifyIndex(logger log.Logger, fn string, minTime int64, maxTime int64) err return stats.AnyErr() } -type Stats struct { +type HealthStats struct { // TotalSeries represents total number of series in block. - TotalSeries int + TotalSeries int64 // OutOfOrderSeries represents number of series that have out of order chunks. OutOfOrderSeries int @@ -60,12 +61,41 @@ type Stats struct { // OutOfOrderLabels represents the number of postings that contained out // of order labels, a bug present in Prometheus 2.8.0 and below. OutOfOrderLabels int + + // Debug Statistics. + SeriesMinLifeDuration time.Duration + SeriesAvgLifeDuration time.Duration + SeriesMaxLifeDuration time.Duration + + SeriesMinLifeDurationWithoutSingleSampleSeries time.Duration + SeriesAvgLifeDurationWithoutSingleSampleSeries time.Duration + SeriesMaxLifeDurationWithoutSingleSampleSeries time.Duration + + SeriesMinChunks int64 + SeriesAvgChunks int64 + SeriesMaxChunks int64 + + TotalChunks int64 + + ChunkMinDuration time.Duration + ChunkAvgDuration time.Duration + ChunkMaxDuration time.Duration + + ChunkMinSize int64 + ChunkAvgSize int64 + ChunkMaxSize int64 + + SingleSampleSeries int64 + SingleSampleChunks int64 + + LabelNamesCount int64 + MetricLabelValuesCount int64 } -// PrometheusIssue5372Err returns an error if the Stats object indicates +// PrometheusIssue5372Err returns an error if the HealthStats object indicates // postings with out of order labels. This is corrected by Prometheus Issue // #5372 and affects Prometheus versions 2.8.0 and below. -func (i Stats) PrometheusIssue5372Err() error { +func (i HealthStats) PrometheusIssue5372Err() error { if i.OutOfOrderLabels > 0 { return errors.Errorf("index contains %d postings with out of order labels", i.OutOfOrderLabels) @@ -74,7 +104,7 @@ func (i Stats) PrometheusIssue5372Err() error { } // Issue347OutsideChunksErr returns error if stats indicates issue347 block issue, that is repaired explicitly before compaction (on plan block). -func (i Stats) Issue347OutsideChunksErr() error { +func (i HealthStats) Issue347OutsideChunksErr() error { if i.Issue347OutsideChunks > 0 { return errors.Errorf("found %d chunks outside the block time range introduced by https://github.com/prometheus/tsdb/issues/347", i.Issue347OutsideChunks) } @@ -82,7 +112,7 @@ func (i Stats) Issue347OutsideChunksErr() error { } // CriticalErr returns error if stats indicates critical block issue, that might solved only by manual repair procedure. -func (i Stats) CriticalErr() error { +func (i HealthStats) CriticalErr() error { var errMsg []string if i.OutOfOrderSeries > 0 { @@ -113,7 +143,7 @@ func (i Stats) CriticalErr() error { } // AnyErr returns error if stats indicates any block issue. -func (i Stats) AnyErr() error { +func (i HealthStats) AnyErr() error { var errMsg []string if err := i.CriticalErr(); err != nil { @@ -135,11 +165,44 @@ func (i Stats) AnyErr() error { return nil } -// GatherIndexIssueStats returns useful counters as well as outsider chunks (chunks outside of block time range) that +type minMaxSumInt64 struct { + sum int64 + min int64 + max int64 + + cnt int64 +} + +func newMinMaxSumInt64() minMaxSumInt64 { + return minMaxSumInt64{ + min: math.MaxInt64, + max: math.MinInt64, + } +} + +func (n *minMaxSumInt64) Add(v int64) { + n.cnt++ + n.sum += v + if n.min > v { + n.min = v + } + if n.max < v { + n.max = v + } +} + +func (n *minMaxSumInt64) Avg() int64 { + if n.cnt == 0 { + return 0 + } + return n.sum / n.cnt +} + +// GatherIndexHealthStats returns useful counters as well as outsider chunks (chunks outside of block time range) that // helps to assess index health. // It considers https://github.com/prometheus/tsdb/issues/347 as something that Thanos can handle. -// See Stats.Issue347OutsideChunks for details. -func GatherIndexIssueStats(logger log.Logger, fn string, minTime int64, maxTime int64) (stats Stats, err error) { +// See HealthStats.Issue347OutsideChunks for details. +func GatherIndexHealthStats(logger log.Logger, fn string, minTime int64, maxTime int64) (stats HealthStats, err error) { r, err := index.NewFileReader(fn) if err != nil { return stats, errors.Wrap(err, "open index file") @@ -154,8 +217,26 @@ func GatherIndexIssueStats(logger log.Logger, fn string, minTime int64, maxTime lastLset labels.Labels lset labels.Labels chks []chunks.Meta + + seriesLifeDuration = newMinMaxSumInt64() + seriesLifeDurationWithoutSingleSampleSeries = newMinMaxSumInt64() + seriesChunks = newMinMaxSumInt64() + chunkDuration = newMinMaxSumInt64() + chunkSize = newMinMaxSumInt64() ) + lnames, err := r.LabelNames() + if err != nil { + return stats, errors.Wrap(err, "label names") + } + stats.LabelNamesCount = int64(len(lnames)) + + lvals, err := r.LabelValues("__name__") + if err != nil { + return stats, errors.Wrap(err, "metric label values") + } + stats.MetricLabelValuesCount = int64(len(lvals)) + // Per series. for p.Next() { lastLset = append(lastLset[:0], lset...) @@ -189,8 +270,23 @@ func GatherIndexIssueStats(logger log.Logger, fn string, minTime int64, maxTime } ooo := 0 + seriesLifeTimeMs := int64(0) // Per chunk in series. for i, c := range chks { + stats.TotalChunks++ + + chkDur := c.MaxTime - c.MinTime + seriesLifeTimeMs += chkDur + chunkDuration.Add(chkDur) + if chkDur == 0 { + stats.SingleSampleChunks++ + } + + // Approximate size. + if i < len(chks)-2 { + chunkSize.Add(int64(chks[i+1].Ref - c.Ref)) + } + // Chunk vs the block ranges. if c.MinTime < minTime || c.MaxTime > maxTime { stats.OutsideChunks++ @@ -226,11 +322,39 @@ func GatherIndexIssueStats(logger log.Logger, fn string, minTime int64, maxTime stats.OutOfOrderSeries++ stats.OutOfOrderChunks += ooo } + + seriesChunks.Add(int64(len(chks))) + seriesLifeDuration.Add(seriesLifeTimeMs) + + if seriesLifeTimeMs == 0 { + stats.SingleSampleSeries++ + } else { + seriesLifeDurationWithoutSingleSampleSeries.Add(seriesLifeTimeMs) + } } if p.Err() != nil { return stats, errors.Wrap(err, "walk postings") } + stats.SeriesMaxLifeDuration = time.Duration(seriesLifeDuration.max) * time.Millisecond + stats.SeriesAvgLifeDuration = time.Duration(seriesLifeDuration.Avg()) * time.Millisecond + stats.SeriesMinLifeDuration = time.Duration(seriesLifeDuration.min) * time.Millisecond + + stats.SeriesMaxLifeDurationWithoutSingleSampleSeries = time.Duration(seriesLifeDurationWithoutSingleSampleSeries.max) * time.Millisecond + stats.SeriesAvgLifeDurationWithoutSingleSampleSeries = time.Duration(seriesLifeDurationWithoutSingleSampleSeries.Avg()) * time.Millisecond + stats.SeriesMinLifeDurationWithoutSingleSampleSeries = time.Duration(seriesLifeDurationWithoutSingleSampleSeries.min) * time.Millisecond + + stats.SeriesMaxChunks = seriesChunks.max + stats.SeriesAvgChunks = seriesChunks.Avg() + stats.SeriesMinChunks = seriesChunks.min + + stats.ChunkMaxSize = chunkSize.max + stats.ChunkAvgSize = chunkSize.Avg() + stats.ChunkMinSize = chunkSize.min + + stats.ChunkMaxDuration = time.Duration(chunkDuration.max) * time.Millisecond + stats.ChunkAvgDuration = time.Duration(chunkDuration.Avg()) * time.Millisecond + stats.ChunkMinDuration = time.Duration(chunkDuration.min) * time.Millisecond return stats, nil } diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 5fdec0b3b7..414ae7cefc 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -726,7 +726,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp } // Ensure all input blocks are valid. - stats, err := block.GatherIndexIssueStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime) + stats, err := block.GatherIndexHealthStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime) if err != nil { return false, ulid.ULID{}, errors.Wrapf(err, "gather index issues for block %s", bdir) } diff --git a/pkg/verifier/duplicated_compaction.go b/pkg/verifier/duplicated_compaction.go index f53dd3cbcf..d90fa38e3d 100644 --- a/pkg/verifier/duplicated_compaction.go +++ b/pkg/verifier/duplicated_compaction.go @@ -4,10 +4,7 @@ package verifier import ( - "context" - "fmt" - "strings" "time" @@ -26,14 +23,14 @@ type DuplicatedCompactionBlocks struct{} func (DuplicatedCompactionBlocks) IssueID() string { return "duplicated_compaction" } -func (DuplicatedCompactionBlocks) VerifyRepair(ctx context.Context, conf Config, idMatcher func(ulid.ULID) bool, repair bool) error { +func (DuplicatedCompactionBlocks) VerifyRepair(ctx Context, idMatcher func(ulid.ULID) bool, repair bool) error { if idMatcher != nil { return errors.Errorf("id matching is not supported") } - level.Info(conf.Logger).Log("msg", "started verifying issue", "with-repair", repair) + level.Info(ctx.Logger).Log("msg", "started verifying issue", "with-repair", repair) - overlaps, err := fetchOverlaps(ctx, conf.Fetcher) + overlaps, err := fetchOverlaps(ctx, ctx.Fetcher) if err != nil { return errors.Wrap(err, "fetch overlaps") } @@ -57,7 +54,7 @@ func (DuplicatedCompactionBlocks) VerifyRepair(ctx context.Context, conf Config, // Loop over duplicates sets. for _, d := range dups { - level.Warn(conf.Logger).Log("msg", "found duplicated blocks", "group", k, "range-min", r.Min, "range-max", r.Max, "kill", sprintMetas(d[1:])) + level.Warn(ctx.Logger).Log("msg", "found duplicated blocks", "group", k, "range-min", r.Min, "range-max", r.Max, "kill", sprintMetas(d[1:])) for _, m := range d[1:] { if _, ok := toKillLookup[m.ULID]; ok { @@ -70,25 +67,25 @@ func (DuplicatedCompactionBlocks) VerifyRepair(ctx context.Context, conf Config, } if len(dups) == 0 { - level.Warn(conf.Logger).Log("msg", "found overlapped blocks, but all of the blocks are unique. Seems like unrelated issue. Ignoring overlap", "group", k, + level.Warn(ctx.Logger).Log("msg", "found overlapped blocks, but all of the blocks are unique. Seems like unrelated issue. Ignoring overlap", "group", k, "range", fmt.Sprintf("%v", r), "overlap", sprintMetas(blocks)) } } } - level.Warn(conf.Logger).Log("msg", "Found duplicated blocks that are ok to be removed", "ULIDs", fmt.Sprintf("%v", toKill), "num", len(toKill)) + level.Warn(ctx.Logger).Log("msg", "Found duplicated blocks that are ok to be removed", "ULIDs", fmt.Sprintf("%v", toKill), "num", len(toKill)) if !repair { return nil } for i, id := range toKill { - if err := BackupAndDelete(ctx, conf, id); err != nil { + if err := BackupAndDelete(ctx, id); err != nil { return err } - level.Info(conf.Logger).Log("msg", "Removed duplicated block", "id", id, "to-be-removed", len(toKill)-(i+1), "removed", i+1) + level.Info(ctx.Logger).Log("msg", "Removed duplicated block", "id", id, "to-be-removed", len(toKill)-(i+1), "removed", i+1) } - level.Info(conf.Logger).Log("msg", "Removed all duplicated blocks. You might want to rerun this verify to check if there is still any unrelated overlap") + level.Info(ctx.Logger).Log("msg", "Removed all duplicated blocks. You might want to rerun this verify to check if there is still any unrelated overlap") return nil } diff --git a/pkg/verifier/index_issue.go b/pkg/verifier/index_issue.go index 1bf3af9b4b..e42def264d 100644 --- a/pkg/verifier/index_issue.go +++ b/pkg/verifier/index_issue.go @@ -4,7 +4,6 @@ package verifier import ( - "context" "fmt" "io/ioutil" "os" @@ -29,10 +28,10 @@ type IndexKnownIssues struct{} func (IndexKnownIssues) IssueID() string { return "index_known_issues" } -func (IndexKnownIssues) VerifyRepair(ctx context.Context, conf Config, idMatcher func(ulid.ULID) bool, repair bool) error { - level.Info(conf.Logger).Log("msg", "started verifying issue", "with-repair", repair) +func (IndexKnownIssues) VerifyRepair(ctx Context, idMatcher func(ulid.ULID) bool, repair bool) error { + level.Info(ctx.Logger).Log("msg", "started verifying issue", "with-repair", repair) - metas, _, err := conf.Fetcher.Fetch(ctx) + metas, _, err := ctx.Fetcher.Fetch(ctx) if err != nil { return err } @@ -48,24 +47,25 @@ func (IndexKnownIssues) VerifyRepair(ctx context.Context, conf Config, idMatcher } defer func() { if err := os.RemoveAll(tmpdir); err != nil { - level.Warn(conf.Logger).Log("msg", "failed to delete dir", "tmpdir", tmpdir, "err", err) + level.Warn(ctx.Logger).Log("msg", "failed to delete dir", "tmpdir", tmpdir, "err", err) } }() - if err = objstore.DownloadFile(ctx, conf.Logger, conf.Bkt, path.Join(id.String(), block.IndexFilename), filepath.Join(tmpdir, block.IndexFilename)); err != nil { + if err = objstore.DownloadFile(ctx, ctx.Logger, ctx.Bkt, path.Join(id.String(), block.IndexFilename), filepath.Join(tmpdir, block.IndexFilename)); err != nil { return errors.Wrapf(err, "download index file %s", path.Join(id.String(), block.IndexFilename)) } - stats, err := block.GatherIndexIssueStats(conf.Logger, filepath.Join(tmpdir, block.IndexFilename), meta.MinTime, meta.MaxTime) + stats, err := block.GatherIndexHealthStats(ctx.Logger, filepath.Join(tmpdir, block.IndexFilename), meta.MinTime, meta.MaxTime) if err != nil { return errors.Wrapf(err, "gather index issues %s", id) } + level.Debug(ctx.Logger).Log("stats", fmt.Sprintf("%+v", stats), "id", id) if err = stats.AnyErr(); err == nil { return nil } - level.Warn(conf.Logger).Log("msg", "detected issue", "id", id, "err", err) + level.Warn(ctx.Logger).Log("msg", "detected issue", "id", id, "err", err) if !repair { // Only verify. @@ -73,26 +73,26 @@ func (IndexKnownIssues) VerifyRepair(ctx context.Context, conf Config, idMatcher } if stats.OutOfOrderChunks > stats.DuplicatedChunks { - level.Warn(conf.Logger).Log("msg", "detected overlaps are not entirely by duplicated chunks. We are able to repair only duplicates", "id", id) + level.Warn(ctx.Logger).Log("msg", "detected overlaps are not entirely by duplicated chunks. We are able to repair only duplicates", "id", id) } if stats.OutsideChunks > (stats.CompleteOutsideChunks + stats.Issue347OutsideChunks) { - level.Warn(conf.Logger).Log("msg", "detected outsiders are not all 'complete' outsiders or outsiders from https://github.com/prometheus/tsdb/issues/347. We can safely delete only these outsiders", "id", id) + level.Warn(ctx.Logger).Log("msg", "detected outsiders are not all 'complete' outsiders or outsiders from https://github.com/prometheus/tsdb/issues/347. We can safely delete only these outsiders", "id", id) } if meta.Thanos.Downsample.Resolution > 0 { return errors.New("cannot repair downsampled blocks") } - level.Info(conf.Logger).Log("msg", "downloading block for repair", "id", id) - if err = block.Download(ctx, conf.Logger, conf.Bkt, id, path.Join(tmpdir, id.String())); err != nil { + level.Info(ctx.Logger).Log("msg", "downloading block for repair", "id", id) + if err = block.Download(ctx, ctx.Logger, ctx.Bkt, id, path.Join(tmpdir, id.String())); err != nil { return errors.Wrapf(err, "download block %s", id) } - level.Info(conf.Logger).Log("msg", "downloaded block to be repaired", "id", id, "issue") + level.Info(ctx.Logger).Log("msg", "downloaded block to be repaired", "id", id, "issue") - level.Info(conf.Logger).Log("msg", "repairing block", "id", id, "issue") + level.Info(ctx.Logger).Log("msg", "repairing block", "id", id, "issue") resid, err := block.Repair( - conf.Logger, + ctx.Logger, tmpdir, id, metadata.BucketRepairSource, @@ -103,26 +103,26 @@ func (IndexKnownIssues) VerifyRepair(ctx context.Context, conf Config, idMatcher if err != nil { return errors.Wrapf(err, "repair failed for block %s", id) } - level.Info(conf.Logger).Log("msg", "verifying repaired block", "id", id, "newID", resid) + level.Info(ctx.Logger).Log("msg", "verifying repaired block", "id", id, "newID", resid) // Verify repaired block before uploading it. - if err := block.VerifyIndex(conf.Logger, filepath.Join(tmpdir, resid.String(), block.IndexFilename), meta.MinTime, meta.MaxTime); err != nil { + if err := block.VerifyIndex(ctx.Logger, filepath.Join(tmpdir, resid.String(), block.IndexFilename), meta.MinTime, meta.MaxTime); err != nil { return errors.Wrapf(err, "repaired block is invalid %s", resid) } - level.Info(conf.Logger).Log("msg", "uploading repaired block", "newID", resid) - if err = block.Upload(ctx, conf.Logger, conf.Bkt, filepath.Join(tmpdir, resid.String())); err != nil { + level.Info(ctx.Logger).Log("msg", "uploading repaired block", "newID", resid) + if err = block.Upload(ctx, ctx.Logger, ctx.Bkt, filepath.Join(tmpdir, resid.String())); err != nil { return errors.Wrapf(err, "upload of %s failed", resid) } - level.Info(conf.Logger).Log("msg", "safe deleting broken block", "id", id, "issue") - if err := BackupAndDeleteDownloaded(ctx, conf, filepath.Join(tmpdir, id.String()), id); err != nil { + level.Info(ctx.Logger).Log("msg", "safe deleting broken block", "id", id, "issue") + if err := BackupAndDeleteDownloaded(ctx, filepath.Join(tmpdir, id.String()), id); err != nil { return errors.Wrapf(err, "safe deleting old block %s failed", id) } - level.Info(conf.Logger).Log("msg", "all good, continuing", "id", id) + level.Info(ctx.Logger).Log("msg", "all good, continuing", "id", id) return nil } - level.Info(conf.Logger).Log("msg", "verified issue", "with-repair", repair) + level.Info(ctx.Logger).Log("msg", "verified issue", "with-repair", repair) return nil } diff --git a/pkg/verifier/overlapped_blocks.go b/pkg/verifier/overlapped_blocks.go index 68de7613be..b642c2c3e3 100644 --- a/pkg/verifier/overlapped_blocks.go +++ b/pkg/verifier/overlapped_blocks.go @@ -21,14 +21,14 @@ type OverlappedBlocksIssue struct{} func (OverlappedBlocksIssue) IssueID() string { return "overlapped_blocks" } -func (OverlappedBlocksIssue) Verify(ctx context.Context, conf Config, idMatcher func(ulid.ULID) bool) error { +func (OverlappedBlocksIssue) Verify(ctx Context, idMatcher func(ulid.ULID) bool) error { if idMatcher != nil { return errors.Errorf("id matching is not supported") } - level.Info(conf.Logger).Log("msg", "started verifying issue") + level.Info(ctx.Logger).Log("msg", "started verifying issue") - overlaps, err := fetchOverlaps(ctx, conf.Fetcher) + overlaps, err := fetchOverlaps(ctx, ctx.Fetcher) if err != nil { return errors.Wrap(err, "fetch overlaps") } @@ -39,7 +39,7 @@ func (OverlappedBlocksIssue) Verify(ctx context.Context, conf Config, idMatcher } for k, o := range overlaps { - level.Warn(conf.Logger).Log("msg", "found overlapped blocks", "group", k, "overlap", o) + level.Warn(ctx.Logger).Log("msg", "found overlapped blocks", "group", k, "overlap", o) } return nil } diff --git a/pkg/verifier/safe_delete.go b/pkg/verifier/safe_delete.go index 30b38562ac..3d3704ae38 100644 --- a/pkg/verifier/safe_delete.go +++ b/pkg/verifier/safe_delete.go @@ -37,9 +37,9 @@ func TSDBBlockExistsInBucket(ctx context.Context, bkt objstore.Bucket, id ulid.U // It returns error if block dir already exists in // the backup bucket (blocks should be immutable) or if any of the operations // fail. -func BackupAndDelete(ctx context.Context, conf Config, id ulid.ULID) error { +func BackupAndDelete(ctx Context, id ulid.ULID) error { // Does this TSDB block exist in backupBkt already? - found, err := TSDBBlockExistsInBucket(ctx, conf.BackupBkt, id) + found, err := TSDBBlockExistsInBucket(ctx, ctx.BackupBkt, id) if err != nil { return err } @@ -54,31 +54,31 @@ func BackupAndDelete(ctx context.Context, conf Config, id ulid.ULID) error { } defer func() { if err := os.RemoveAll(tempdir); err != nil { - level.Warn(conf.Logger).Log("msg", "failed to delete dir", "dir", tempdir, "err", err) + level.Warn(ctx.Logger).Log("msg", "failed to delete dir", "dir", tempdir, "err", err) } }() // Download the TSDB block. dir := filepath.Join(tempdir, id.String()) - if err := block.Download(ctx, conf.Logger, conf.Bkt, id, dir); err != nil { + if err := block.Download(ctx, ctx.Logger, ctx.Bkt, id, dir); err != nil { return errors.Wrap(err, "download from source") } // Backup the block. - if err := backupDownloaded(ctx, conf.Logger, dir, conf.BackupBkt, id); err != nil { + if err := backupDownloaded(ctx, ctx.Logger, dir, ctx.BackupBkt, id); err != nil { return err } // Block uploaded, so we are ok to remove from src bucket. - if conf.DeleteDelay.Seconds() == 0 { - level.Info(conf.Logger).Log("msg", "Deleting block", "id", id.String()) - if err := block.Delete(ctx, conf.Logger, conf.Bkt, id); err != nil { + if ctx.DeleteDelay.Seconds() == 0 { + level.Info(ctx.Logger).Log("msg", "Deleting block", "id", id.String()) + if err := block.Delete(ctx, ctx.Logger, ctx.Bkt, id); err != nil { return errors.Wrap(err, "delete from source") } } - level.Info(conf.Logger).Log("msg", "Marking block as deleted", "id", id.String()) - if err := block.MarkForDeletion(ctx, conf.Logger, conf.Bkt, id, "manual verify-repair", conf.metrics.blocksMarkedForDeletion); err != nil { + level.Info(ctx.Logger).Log("msg", "Marking block as deleted", "id", id.String()) + if err := block.MarkForDeletion(ctx, ctx.Logger, ctx.Bkt, id, "manual verify-repair", ctx.metrics.blocksMarkedForDeletion); err != nil { return errors.Wrap(err, "marking delete from source") } return nil @@ -90,9 +90,9 @@ func BackupAndDelete(ctx context.Context, conf Config, id ulid.ULID) error { // points to the location on disk where the TSDB block was previously // downloaded allowing this function to avoid downloading the TSDB block from // the source bucket again. An error is returned if any operation fails. -func BackupAndDeleteDownloaded(ctx context.Context, conf Config, bdir string, id ulid.ULID) error { +func BackupAndDeleteDownloaded(ctx Context, bdir string, id ulid.ULID) error { // Does this TSDB block exist in backupBkt already? - found, err := TSDBBlockExistsInBucket(ctx, conf.BackupBkt, id) + found, err := TSDBBlockExistsInBucket(ctx, ctx.BackupBkt, id) if err != nil { return err } @@ -101,21 +101,21 @@ func BackupAndDeleteDownloaded(ctx context.Context, conf Config, bdir string, id } // Backup the block. - if err := backupDownloaded(ctx, conf.Logger, bdir, conf.BackupBkt, id); err != nil { + if err := backupDownloaded(ctx, ctx.Logger, bdir, ctx.BackupBkt, id); err != nil { return err } // Block uploaded, so we are ok to remove from src bucket. - if conf.DeleteDelay.Seconds() == 0 { - level.Info(conf.Logger).Log("msg", "Deleting block", "id", id.String()) - if err := block.Delete(ctx, conf.Logger, conf.Bkt, id); err != nil { + if ctx.DeleteDelay.Seconds() == 0 { + level.Info(ctx.Logger).Log("msg", "Deleting block", "id", id.String()) + if err := block.Delete(ctx, ctx.Logger, ctx.Bkt, id); err != nil { return errors.Wrap(err, "delete from source") } return nil } - level.Info(conf.Logger).Log("msg", "Marking block as deleted", "id", id.String()) - if err := block.MarkForDeletion(ctx, conf.Logger, conf.Bkt, id, "manual verify-repair", conf.metrics.blocksMarkedForDeletion); err != nil { + level.Info(ctx.Logger).Log("msg", "Marking block as deleted", "id", id.String()) + if err := block.MarkForDeletion(ctx, ctx.Logger, ctx.Bkt, id, "manual verify-repair", ctx.metrics.blocksMarkedForDeletion); err != nil { return errors.Wrap(err, "marking delete from source") } return nil diff --git a/pkg/verifier/verify.go b/pkg/verifier/verify.go index 1c83886525..f2b7de044d 100644 --- a/pkg/verifier/verify.go +++ b/pkg/verifier/verify.go @@ -21,16 +21,18 @@ import ( type Verifier interface { IssueID() string - Verify(ctx context.Context, conf Config, idMatcher func(ulid.ULID) bool) error + Verify(ctx Context, idMatcher func(ulid.ULID) bool) error } type VerifierRepairer interface { IssueID() string - VerifyRepair(ctx context.Context, conf Config, idMatcher func(ulid.ULID) bool, repair bool) error + VerifyRepair(ctx Context, idMatcher func(ulid.ULID) bool, repair bool) error } -// Config is an verifier config. -type Config struct { +// Context is an verifier config. +type Context struct { + context.Context + Logger log.Logger Bkt objstore.Bucket BackupBkt objstore.Bucket @@ -55,7 +57,7 @@ func newVerifierMetrics(reg prometheus.Registerer) *metrics { // Manager runs given issues to verify if bucket is healthy. type Manager struct { - Config + Context vs Registry } @@ -109,13 +111,18 @@ idLoop: } // New returns verifier's manager. -func NewManager(reg prometheus.Registerer, config Config, vs Registry) *Manager { - if config.metrics == nil { - config.metrics = newVerifierMetrics(reg) - } +func NewManager(reg prometheus.Registerer, logger log.Logger, bkt objstore.Bucket, backupBkt objstore.Bucket, fetcher block.MetadataFetcher, deleteDelay time.Duration, vs Registry) *Manager { return &Manager{ - Config: config, - vs: vs, + Context: Context{ + Logger: logger, + Bkt: bkt, + BackupBkt: backupBkt, + Fetcher: fetcher, + DeleteDelay: deleteDelay, + + metrics: newVerifierMetrics(reg), + }, + vs: vs, } } @@ -130,16 +137,18 @@ func (m *Manager) Verify(ctx context.Context, idMatcher func(ulid.ULID) bool) er level.Info(logger).Log("msg", "Starting verify task") for _, v := range m.vs.Verifiers { - conf := m.Config - conf.Logger = log.With(logger, "verifier", v.IssueID()) - if err := v.Verify(ctx, conf, idMatcher); err != nil { + vCtx := m.Context + vCtx.Logger = log.With(logger, "verifier", v.IssueID()) + vCtx.Context = ctx + if err := v.Verify(vCtx, idMatcher); err != nil { return errors.Wrapf(err, "verify %s", v.IssueID()) } } for _, vr := range m.vs.VerifierRepairers { - conf := m.Config - conf.Logger = log.With(logger, "verifier", vr.IssueID()) - if err := vr.VerifyRepair(ctx, conf, idMatcher, false); err != nil { + vCtx := m.Context + vCtx.Context = ctx + vCtx.Logger = log.With(logger, "verifier", vr.IssueID()) + if err := vr.VerifyRepair(vCtx, idMatcher, false); err != nil { return errors.Wrapf(err, "verify %s", vr.IssueID()) } } @@ -160,9 +169,10 @@ func (m *Manager) VerifyAndRepair(ctx context.Context, idMatcher func(ulid.ULID) level.Info(logger).Log("msg", "Starting verify and repair task") for _, vr := range m.vs.VerifierRepairers { - conf := m.Config - conf.Logger = log.With(logger, "verifier", vr.IssueID()) - if err := vr.VerifyRepair(ctx, conf, idMatcher, true); err != nil { + vCtx := m.Context + vCtx.Logger = log.With(logger, "verifier", vr.IssueID()) + vCtx.Context = ctx + if err := vr.VerifyRepair(vCtx, idMatcher, true); err != nil { return errors.Wrapf(err, "verify and repair %s", vr.IssueID()) } }