Skip to content

Commit

Permalink
verify: Added extra block statistic; print on debug when using index_…
Browse files Browse the repository at this point in the history
…issue (thanos-io#3385)

Signed-off-by: Bartlomiej Plotka <[email protected]>
Signed-off-by: Oghenebrume50 <[email protected]>
  • Loading branch information
bwplotka authored and Oghenebrume50 committed Dec 7, 2020
1 parent 24e8d72 commit 212b89c
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 100 deletions.
14 changes: 3 additions & 11 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,6 @@ func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.Path
return err
}

ctx := context.Background()
v := verifier.NewManager(reg, verifier.Config{
Logger: logger,
Bkt: bkt,
BackupBkt: backupBkt,
Fetcher: fetcher,
DeleteDelay: time.Duration(*deleteDelay),
}, r)

var idMatcher func(ulid.ULID) bool = nil
if len(*ids) > 0 {
idsMap := map[string]struct{}{}
Expand All @@ -169,11 +160,12 @@ func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.Path
}
}

v := verifier.NewManager(reg, logger, bkt, backupBkt, fetcher, time.Duration(*deleteDelay), r)
if *repair {
return v.VerifyAndRepair(ctx, idMatcher)
return v.VerifyAndRepair(context.Background(), idMatcher)
}

return v.Verify(ctx, idMatcher)
return v.Verify(context.Background(), idMatcher)
})
}

Expand Down
146 changes: 135 additions & 11 deletions pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"hash/crc32"
"math"
"math/rand"
"path/filepath"
"sort"
Expand All @@ -29,17 +30,17 @@ import (

// VerifyIndex does a full run over a block index and verifies that it fulfills the order invariants.
func VerifyIndex(logger log.Logger, fn string, minTime int64, maxTime int64) error {
stats, err := GatherIndexIssueStats(logger, fn, minTime, maxTime)
stats, err := GatherIndexHealthStats(logger, fn, minTime, maxTime)
if err != nil {
return err
}

return stats.AnyErr()
}

type Stats struct {
type HealthStats struct {
// TotalSeries represents total number of series in block.
TotalSeries int
TotalSeries int64
// OutOfOrderSeries represents number of series that have out of order chunks.
OutOfOrderSeries int

Expand All @@ -60,12 +61,41 @@ type Stats struct {
// OutOfOrderLabels represents the number of postings that contained out
// of order labels, a bug present in Prometheus 2.8.0 and below.
OutOfOrderLabels int

// Debug Statistics.
SeriesMinLifeDuration time.Duration
SeriesAvgLifeDuration time.Duration
SeriesMaxLifeDuration time.Duration

SeriesMinLifeDurationWithoutSingleSampleSeries time.Duration
SeriesAvgLifeDurationWithoutSingleSampleSeries time.Duration
SeriesMaxLifeDurationWithoutSingleSampleSeries time.Duration

SeriesMinChunks int64
SeriesAvgChunks int64
SeriesMaxChunks int64

TotalChunks int64

ChunkMinDuration time.Duration
ChunkAvgDuration time.Duration
ChunkMaxDuration time.Duration

ChunkMinSize int64
ChunkAvgSize int64
ChunkMaxSize int64

SingleSampleSeries int64
SingleSampleChunks int64

LabelNamesCount int64
MetricLabelValuesCount int64
}

// PrometheusIssue5372Err returns an error if the Stats object indicates
// PrometheusIssue5372Err returns an error if the HealthStats object indicates
// postings with out of order labels. This is corrected by Prometheus Issue
// #5372 and affects Prometheus versions 2.8.0 and below.
func (i Stats) PrometheusIssue5372Err() error {
func (i HealthStats) PrometheusIssue5372Err() error {
if i.OutOfOrderLabels > 0 {
return errors.Errorf("index contains %d postings with out of order labels",
i.OutOfOrderLabels)
Expand All @@ -74,15 +104,15 @@ func (i Stats) PrometheusIssue5372Err() error {
}

// Issue347OutsideChunksErr returns error if stats indicates issue347 block issue, that is repaired explicitly before compaction (on plan block).
func (i Stats) Issue347OutsideChunksErr() error {
func (i HealthStats) Issue347OutsideChunksErr() error {
if i.Issue347OutsideChunks > 0 {
return errors.Errorf("found %d chunks outside the block time range introduced by https://github.com/prometheus/tsdb/issues/347", i.Issue347OutsideChunks)
}
return nil
}

// CriticalErr returns error if stats indicates critical block issue, that might solved only by manual repair procedure.
func (i Stats) CriticalErr() error {
func (i HealthStats) CriticalErr() error {
var errMsg []string

if i.OutOfOrderSeries > 0 {
Expand Down Expand Up @@ -113,7 +143,7 @@ func (i Stats) CriticalErr() error {
}

// AnyErr returns error if stats indicates any block issue.
func (i Stats) AnyErr() error {
func (i HealthStats) AnyErr() error {
var errMsg []string

if err := i.CriticalErr(); err != nil {
Expand All @@ -135,11 +165,44 @@ func (i Stats) AnyErr() error {
return nil
}

// GatherIndexIssueStats returns useful counters as well as outsider chunks (chunks outside of block time range) that
type minMaxSumInt64 struct {
sum int64
min int64
max int64

cnt int64
}

func newMinMaxSumInt64() minMaxSumInt64 {
return minMaxSumInt64{
min: math.MaxInt64,
max: math.MinInt64,
}
}

func (n *minMaxSumInt64) Add(v int64) {
n.cnt++
n.sum += v
if n.min > v {
n.min = v
}
if n.max < v {
n.max = v
}
}

func (n *minMaxSumInt64) Avg() int64 {
if n.cnt == 0 {
return 0
}
return n.sum / n.cnt
}

// GatherIndexHealthStats returns useful counters as well as outsider chunks (chunks outside of block time range) that
// helps to assess index health.
// It considers https://github.com/prometheus/tsdb/issues/347 as something that Thanos can handle.
// See Stats.Issue347OutsideChunks for details.
func GatherIndexIssueStats(logger log.Logger, fn string, minTime int64, maxTime int64) (stats Stats, err error) {
// See HealthStats.Issue347OutsideChunks for details.
func GatherIndexHealthStats(logger log.Logger, fn string, minTime int64, maxTime int64) (stats HealthStats, err error) {
r, err := index.NewFileReader(fn)
if err != nil {
return stats, errors.Wrap(err, "open index file")
Expand All @@ -154,8 +217,26 @@ func GatherIndexIssueStats(logger log.Logger, fn string, minTime int64, maxTime
lastLset labels.Labels
lset labels.Labels
chks []chunks.Meta

seriesLifeDuration = newMinMaxSumInt64()
seriesLifeDurationWithoutSingleSampleSeries = newMinMaxSumInt64()
seriesChunks = newMinMaxSumInt64()
chunkDuration = newMinMaxSumInt64()
chunkSize = newMinMaxSumInt64()
)

lnames, err := r.LabelNames()
if err != nil {
return stats, errors.Wrap(err, "label names")
}
stats.LabelNamesCount = int64(len(lnames))

lvals, err := r.LabelValues("__name__")
if err != nil {
return stats, errors.Wrap(err, "metric label values")
}
stats.MetricLabelValuesCount = int64(len(lvals))

// Per series.
for p.Next() {
lastLset = append(lastLset[:0], lset...)
Expand Down Expand Up @@ -189,8 +270,23 @@ func GatherIndexIssueStats(logger log.Logger, fn string, minTime int64, maxTime
}

ooo := 0
seriesLifeTimeMs := int64(0)
// Per chunk in series.
for i, c := range chks {
stats.TotalChunks++

chkDur := c.MaxTime - c.MinTime
seriesLifeTimeMs += chkDur
chunkDuration.Add(chkDur)
if chkDur == 0 {
stats.SingleSampleChunks++
}

// Approximate size.
if i < len(chks)-2 {
chunkSize.Add(int64(chks[i+1].Ref - c.Ref))
}

// Chunk vs the block ranges.
if c.MinTime < minTime || c.MaxTime > maxTime {
stats.OutsideChunks++
Expand Down Expand Up @@ -226,11 +322,39 @@ func GatherIndexIssueStats(logger log.Logger, fn string, minTime int64, maxTime
stats.OutOfOrderSeries++
stats.OutOfOrderChunks += ooo
}

seriesChunks.Add(int64(len(chks)))
seriesLifeDuration.Add(seriesLifeTimeMs)

if seriesLifeTimeMs == 0 {
stats.SingleSampleSeries++
} else {
seriesLifeDurationWithoutSingleSampleSeries.Add(seriesLifeTimeMs)
}
}
if p.Err() != nil {
return stats, errors.Wrap(err, "walk postings")
}

stats.SeriesMaxLifeDuration = time.Duration(seriesLifeDuration.max) * time.Millisecond
stats.SeriesAvgLifeDuration = time.Duration(seriesLifeDuration.Avg()) * time.Millisecond
stats.SeriesMinLifeDuration = time.Duration(seriesLifeDuration.min) * time.Millisecond

stats.SeriesMaxLifeDurationWithoutSingleSampleSeries = time.Duration(seriesLifeDurationWithoutSingleSampleSeries.max) * time.Millisecond
stats.SeriesAvgLifeDurationWithoutSingleSampleSeries = time.Duration(seriesLifeDurationWithoutSingleSampleSeries.Avg()) * time.Millisecond
stats.SeriesMinLifeDurationWithoutSingleSampleSeries = time.Duration(seriesLifeDurationWithoutSingleSampleSeries.min) * time.Millisecond

stats.SeriesMaxChunks = seriesChunks.max
stats.SeriesAvgChunks = seriesChunks.Avg()
stats.SeriesMinChunks = seriesChunks.min

stats.ChunkMaxSize = chunkSize.max
stats.ChunkAvgSize = chunkSize.Avg()
stats.ChunkMinSize = chunkSize.min

stats.ChunkMaxDuration = time.Duration(chunkDuration.max) * time.Millisecond
stats.ChunkAvgDuration = time.Duration(chunkDuration.Avg()) * time.Millisecond
stats.ChunkMinDuration = time.Duration(chunkDuration.min) * time.Millisecond
return stats, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
}

// Ensure all input blocks are valid.
stats, err := block.GatherIndexIssueStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime)
stats, err := block.GatherIndexHealthStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime)
if err != nil {
return false, ulid.ULID{}, errors.Wrapf(err, "gather index issues for block %s", bdir)
}
Expand Down
21 changes: 9 additions & 12 deletions pkg/verifier/duplicated_compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
package verifier

import (
"context"

"fmt"

"strings"
"time"

Expand All @@ -26,14 +23,14 @@ type DuplicatedCompactionBlocks struct{}

func (DuplicatedCompactionBlocks) IssueID() string { return "duplicated_compaction" }

func (DuplicatedCompactionBlocks) VerifyRepair(ctx context.Context, conf Config, idMatcher func(ulid.ULID) bool, repair bool) error {
func (DuplicatedCompactionBlocks) VerifyRepair(ctx Context, idMatcher func(ulid.ULID) bool, repair bool) error {
if idMatcher != nil {
return errors.Errorf("id matching is not supported")
}

level.Info(conf.Logger).Log("msg", "started verifying issue", "with-repair", repair)
level.Info(ctx.Logger).Log("msg", "started verifying issue", "with-repair", repair)

overlaps, err := fetchOverlaps(ctx, conf.Fetcher)
overlaps, err := fetchOverlaps(ctx, ctx.Fetcher)
if err != nil {
return errors.Wrap(err, "fetch overlaps")
}
Expand All @@ -57,7 +54,7 @@ func (DuplicatedCompactionBlocks) VerifyRepair(ctx context.Context, conf Config,

// Loop over duplicates sets.
for _, d := range dups {
level.Warn(conf.Logger).Log("msg", "found duplicated blocks", "group", k, "range-min", r.Min, "range-max", r.Max, "kill", sprintMetas(d[1:]))
level.Warn(ctx.Logger).Log("msg", "found duplicated blocks", "group", k, "range-min", r.Min, "range-max", r.Max, "kill", sprintMetas(d[1:]))

for _, m := range d[1:] {
if _, ok := toKillLookup[m.ULID]; ok {
Expand All @@ -70,25 +67,25 @@ func (DuplicatedCompactionBlocks) VerifyRepair(ctx context.Context, conf Config,
}

if len(dups) == 0 {
level.Warn(conf.Logger).Log("msg", "found overlapped blocks, but all of the blocks are unique. Seems like unrelated issue. Ignoring overlap", "group", k,
level.Warn(ctx.Logger).Log("msg", "found overlapped blocks, but all of the blocks are unique. Seems like unrelated issue. Ignoring overlap", "group", k,
"range", fmt.Sprintf("%v", r), "overlap", sprintMetas(blocks))
}
}
}

level.Warn(conf.Logger).Log("msg", "Found duplicated blocks that are ok to be removed", "ULIDs", fmt.Sprintf("%v", toKill), "num", len(toKill))
level.Warn(ctx.Logger).Log("msg", "Found duplicated blocks that are ok to be removed", "ULIDs", fmt.Sprintf("%v", toKill), "num", len(toKill))
if !repair {
return nil
}

for i, id := range toKill {
if err := BackupAndDelete(ctx, conf, id); err != nil {
if err := BackupAndDelete(ctx, id); err != nil {
return err
}
level.Info(conf.Logger).Log("msg", "Removed duplicated block", "id", id, "to-be-removed", len(toKill)-(i+1), "removed", i+1)
level.Info(ctx.Logger).Log("msg", "Removed duplicated block", "id", id, "to-be-removed", len(toKill)-(i+1), "removed", i+1)
}

level.Info(conf.Logger).Log("msg", "Removed all duplicated blocks. You might want to rerun this verify to check if there is still any unrelated overlap")
level.Info(ctx.Logger).Log("msg", "Removed all duplicated blocks. You might want to rerun this verify to check if there is still any unrelated overlap")
return nil
}

Expand Down
Loading

0 comments on commit 212b89c

Please sign in to comment.