Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compact: accept malformed index #953

Merged
merged 8 commits into from
Mar 25, 2019
8 changes: 7 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri

haltOnError := cmd.Flag("debug.halt-on-error", "Halt the process if a critical compaction error is detected.").
Hidden().Default("true").Bool()
acceptMalformedIndex := cmd.Flag("debug.accept-malformed-index",
"Compaction index verification will ignore out of order label names.").
Hidden().Default("false").Bool()

httpAddr := regHTTPAddrFlag(cmd)

Expand Down Expand Up @@ -102,6 +105,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
objStoreConfig,
time.Duration(*syncDelay),
*haltOnError,
*acceptMalformedIndex,
*wait,
map[compact.ResolutionLevel]time.Duration{
compact.ResolutionLevelRaw: time.Duration(*retentionRaw),
Expand All @@ -125,6 +129,7 @@ func runCompact(
objStoreConfig *pathOrContent,
syncDelay time.Duration,
haltOnError bool,
acceptMalformedIndex bool,
wait bool,
retentionByResolution map[compact.ResolutionLevel]time.Duration,
component string,
Expand Down Expand Up @@ -162,7 +167,8 @@ func runCompact(
}
}()

sy, err := compact.NewSyncer(logger, reg, bkt, syncDelay, blockSyncConcurrency)
sy, err := compact.NewSyncer(logger, reg, bkt, syncDelay,
blockSyncConcurrency, acceptMalformedIndex)
if err != nil {
return errors.Wrap(err, "create syncer")
}
Expand Down
26 changes: 25 additions & 1 deletion pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/prometheus/tsdb/fileutil"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/oklog/ulid"
"github.com/pkg/errors"
Expand Down Expand Up @@ -248,6 +249,20 @@ type Stats struct {
// Specifically we mean here chunks with minTime == block.maxTime and maxTime > block.MaxTime. These are
// are segregated into separate counters. These chunks are safe to be deleted, since they are duplicated across 2 blocks.
Issue347OutsideChunks int
// OutOfOrderLabels represents the number of postings that contained out
// of order labels, a bug present in Prometheus 2.8.0 and below.
OutOfOrderLabels int
}

// PrometheusIssue5372Err returns an error if the Stats object indicates
// postings with out of order labels. This is corrected by Prometheus Issue
// #5372 and affects Prometheus versions 2.8.0 and below.
func (i Stats) PrometheusIssue5372Err() error {
if i.OutOfOrderLabels > 0 {
return errors.Errorf("index contains %d postings with out of order labels",
i.OutOfOrderLabels)
}
return nil
}

// Issue347OutsideChunksErr returns error if stats indicates issue347 block issue, that is repaired explicitly before compaction (on plan block).
Expand Down Expand Up @@ -301,6 +316,10 @@ func (i Stats) AnyErr() error {
errMsg = append(errMsg, err.Error())
}

if err := i.PrometheusIssue5372Err(); err != nil {
errMsg = append(errMsg, err.Error())
}

if len(errMsg) > 0 {
return errors.New(strings.Join(errMsg, ", "))
}
Expand Down Expand Up @@ -348,7 +367,12 @@ func GatherIndexIssueStats(logger log.Logger, fn string, minTime int64, maxTime
l0 := lset[0]
for _, l := range lset[1:] {
if l.Name < l0.Name {
return stats, errors.Errorf("out-of-order label set %s for series %d", lset, id)
stats.OutOfOrderLabels++
level.Warn(logger).Log("msg",
"out-of-order label set: known bug in Prometheus 2.8.0 and below",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should suggest users here to enable --debug.accept-malformed-index here? I mean we would only tell users about a known bug now but wouldn't tell them how to fix it so that's weird 😄 we could be more user-friendly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I addressed this in pkg/compact/compact.go where the error is generated that would otherwise halt execution of the compact component. Rather than here were it just becomes noise when the CLI option is enabled.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, that's even better 👍

"labelset", fmt.Sprintf("%s", lset),
"series", fmt.Sprintf("%d", id),
)
}
l0 = l
}
Expand Down
15 changes: 13 additions & 2 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Syncer struct {
blocksMtx sync.Mutex
blockSyncConcurrency int
metrics *syncerMetrics
acceptMalformedIndex bool
}

type syncerMetrics struct {
Expand Down Expand Up @@ -128,7 +129,7 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics {

// NewSyncer returns a new Syncer for the given Bucket and directory.
// Blocks must be at least as old as the sync delay for being considered.
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, syncDelay time.Duration, blockSyncConcurrency int) (*Syncer, error) {
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, syncDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool) (*Syncer, error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -140,6 +141,7 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket
bkt: bkt,
metrics: newSyncerMetrics(reg),
blockSyncConcurrency: blockSyncConcurrency,
acceptMalformedIndex: acceptMalformedIndex,
}, nil
}

Expand Down Expand Up @@ -291,6 +293,7 @@ func (c *Syncer) Groups() (res []*Group, err error) {
c.bkt,
labels.FromMap(m.Thanos.Labels),
m.Thanos.Downsample.Resolution,
c.acceptMalformedIndex,
c.metrics.compactions.WithLabelValues(GroupKey(*m)),
c.metrics.compactionFailures.WithLabelValues(GroupKey(*m)),
c.metrics.garbageCollectedBlocks,
Expand Down Expand Up @@ -436,6 +439,7 @@ type Group struct {
resolution int64
mtx sync.Mutex
blocks map[ulid.ULID]*metadata.Meta
acceptMalformedIndex bool
compactions prometheus.Counter
compactionFailures prometheus.Counter
groupGarbageCollectedBlocks prometheus.Counter
Expand All @@ -447,6 +451,7 @@ func newGroup(
bkt objstore.Bucket,
lset labels.Labels,
resolution int64,
acceptMalformedIndex bool,
compactions prometheus.Counter,
compactionFailures prometheus.Counter,
groupGarbageCollectedBlocks prometheus.Counter,
Expand All @@ -460,6 +465,7 @@ func newGroup(
labels: lset,
resolution: resolution,
blocks: map[ulid.ULID]*metadata.Meta{},
acceptMalformedIndex: acceptMalformedIndex,
compactions: compactions,
compactionFailures: compactionFailures,
groupGarbageCollectedBlocks: groupGarbageCollectedBlocks,
Expand Down Expand Up @@ -769,6 +775,11 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
if err := stats.Issue347OutsideChunksErr(); err != nil {
return false, ulid.ULID{}, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", pdir), meta.ULID)
}

if err := stats.PrometheusIssue5372Err(); !cg.acceptMalformedIndex && err != nil {
return false, ulid.ULID{}, errors.Wrapf(err,
"block id %s, try running with --debug.accept-malformed-index", id)
}
}
level.Debug(cg.logger).Log("msg", "downloaded and verified blocks",
"blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin))
Expand Down Expand Up @@ -816,7 +827,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
}

// Ensure the output block is valid.
if err := block.VerifyIndex(cg.logger, filepath.Join(bdir, block.IndexFilename), newMeta.MinTime, newMeta.MaxTime); err != nil {
if err := block.VerifyIndex(cg.logger, filepath.Join(bdir, block.IndexFilename), newMeta.MinTime, newMeta.MaxTime); !cg.acceptMalformedIndex && err != nil {
return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir))
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()

sy, err := NewSyncer(nil, nil, bkt, 0, 1)
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false)
testutil.Ok(t, err)

// Generate 15 blocks. Initially the first 10 are synced into memory and only the last
Expand Down Expand Up @@ -134,7 +134,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
}

// Do one initial synchronization with the bucket.
sy, err := NewSyncer(nil, nil, bkt, 0, 1)
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false)
testutil.Ok(t, err)
testutil.Ok(t, sy.SyncMetas(ctx))

Expand Down Expand Up @@ -244,6 +244,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
bkt,
extLset,
124,
false,
metrics.compactions.WithLabelValues(""),
metrics.compactionFailures.WithLabelValues(""),
metrics.garbageCollectedBlocks,
Expand Down