From 6d3d3fc15456e1e9eb4cccfe987fcdae79a6f8f2 Mon Sep 17 00:00:00 2001 From: Josh Carp Date: Fri, 4 Feb 2022 15:08:01 -0500 Subject: [PATCH] kvprober: add probes for problem ranges The kvprober provides good coverage of issues that affect many ranges, but has a lower probability of detecting individual bad ranges. To improve coverage of the latter case, remember failed ranges from the existing prober and probe them more frequently in a second pair of probe loops. Resolves #74407. Release note: None --- pkg/kv/kvprober/kvprober.go | 248 +++++++++++++++++++++++++++++------- pkg/kv/kvprober/planner.go | 12 ++ pkg/server/server.go | 4 +- 3 files changed, 216 insertions(+), 48 deletions(-) diff --git a/pkg/kv/kvprober/kvprober.go b/pkg/kv/kvprober/kvprober.go index 1c4a73f82044..21110e458270 100644 --- a/pkg/kv/kvprober/kvprober.go +++ b/pkg/kv/kvprober/kvprober.go @@ -33,26 +33,62 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" ) const putValue = "thekvproberwrotethis" +var ErrPlanEmpty = errors.New("no keys need to be probed") + // Prober sends queries to KV in a loop. See package docstring for more. type Prober struct { db *kv.DB settings *cluster.Settings + tracer *tracing.Tracer + loops []ProbeLoop + metrics []Metrics +} + +type ProbeLoop struct { + name string + prober func(context.Context, *kv.DB, planner, ProbeLoop) // planner is an interface for selecting a range to probe. There are // separate planners for the read & write probe loops, so as to achieve // a balanced probing of the keyspace, regardless of differences in the rate // at which Prober sends different probes. Also note that planner is - // NOT thread-safe. - readPlanner planner - writePlanner planner + planner planner + interval *settings.DurationSetting + problemTracker *problemTracker // metrics wraps up the set of prometheus metrics that the prober sets; the // goal of the prober IS to populate these metrics. metrics Metrics - tracer *tracing.Tracer +} + +type problemTracker struct { + steps []Step + size int +} + +func newProblemTracker() *problemTracker { + return &problemTracker{size: 3} +} + +func (pt *problemTracker) add(step Step) { + if len(pt.steps) >= pt.size-1 { + pt.steps = append(pt.steps[1:], step) + } else { + pt.steps = append(pt.steps, step) + } +} + +// Choose a random problem range, if any available +func (pt *problemTracker) next() (Step, error) { + if len(pt.steps) > 0 { + return pt.steps[rand.Intn(len(pt.steps))], nil + } else { + return Step{}, ErrPlanEmpty + } } // Opts provides knobs to control kvprober.Prober. @@ -120,6 +156,61 @@ var ( Measurement: "Runs", Unit: metric.Unit_COUNT, } + + metaProblemReadProbeAttempts = metric.Metadata{ + Name: "kv.prober.problem.read.attempts", + Help: "Number of attempts made to read probe KV, regardless of outcome", + Measurement: "Queries", + Unit: metric.Unit_COUNT, + } + metaProblemReadProbeFailures = metric.Metadata{ + Name: "kv.prober.problem.read.failures", + Help: "Number of attempts made to read probe KV that failed, " + + "whether due to error or timeout", + Measurement: "Queries", + Unit: metric.Unit_COUNT, + } + metaProblemReadProbeLatency = metric.Metadata{ + Name: "kv.prober.problem.read.latency", + Help: "Latency of successful KV read probes", + Measurement: "Latency", + Unit: metric.Unit_NANOSECONDS, + } + metaProblemWriteProbeAttempts = metric.Metadata{ + Name: "kv.prober.problem.write.attempts", + Help: "Number of attempts made to write probe KV, regardless of outcome", + Measurement: "Queries", + Unit: metric.Unit_COUNT, + } + metaProblemWriteProbeFailures = metric.Metadata{ + Name: "kv.prober.problem.write.failures", + Help: "Number of attempts made to write probe KV that failed, " + + "whether due to error or timeout", + Measurement: "Queries", + Unit: metric.Unit_COUNT, + } + metaProblemWriteProbeLatency = metric.Metadata{ + Name: "kv.prober.problem.write.latency", + Help: "Latency of successful KV write probes", + Measurement: "Latency", + Unit: metric.Unit_NANOSECONDS, + } + metaProblemProbePlanAttempts = metric.Metadata{ + Name: "kv.prober.problem.planning_attempts", + Help: "Number of attempts at planning out probes made; " + + "in order to probe KV we need to plan out which ranges to probe;", + Measurement: "Runs", + Unit: metric.Unit_COUNT, + } + metaProblemProbePlanFailures = metric.Metadata{ + Name: "kv.prober.problem.planning_failures", + Help: "Number of attempts at planning out probes that failed; " + + "in order to probe KV we need to plan out which ranges to probe; " + + "if planning fails, then kvprober is not able to send probes to " + + "all ranges; consider alerting on this metric as a result", + Measurement: "Runs", + Unit: metric.Unit_COUNT, + } // TODO(josh): Add a histogram that captures where in the "rangespace" errors // are occurring. This will allow operators to see at a glance what percentage // of ranges are affected. @@ -202,29 +293,76 @@ func (p *proberTxnImpl) TxnRootKV( // NewProber creates a Prober from Opts. func NewProber(opts Opts) *Prober { - return &Prober{ + readProblemTracker := newProblemTracker() + writeProblemTracker := newProblemTracker() + + meta2Metrics := Metrics{ + ReadProbeAttempts: metric.NewCounter(metaReadProbeAttempts), + ReadProbeFailures: metric.NewCounter(metaReadProbeFailures), + ReadProbeLatency: metric.NewLatency(metaReadProbeLatency, opts.HistogramWindowInterval), + WriteProbeAttempts: metric.NewCounter(metaWriteProbeAttempts), + WriteProbeFailures: metric.NewCounter(metaWriteProbeFailures), + WriteProbeLatency: metric.NewLatency(metaWriteProbeLatency, opts.HistogramWindowInterval), + ProbePlanAttempts: metric.NewCounter(metaProbePlanAttempts), + ProbePlanFailures: metric.NewCounter(metaProbePlanFailures), + } + problemMetrics := Metrics{ + ReadProbeAttempts: metric.NewCounter(metaProblemReadProbeAttempts), + ReadProbeFailures: metric.NewCounter(metaProblemReadProbeFailures), + ReadProbeLatency: metric.NewLatency(metaProblemReadProbeLatency, opts.HistogramWindowInterval), + WriteProbeAttempts: metric.NewCounter(metaProblemWriteProbeAttempts), + WriteProbeFailures: metric.NewCounter(metaProblemWriteProbeFailures), + WriteProbeLatency: metric.NewLatency(metaProblemWriteProbeLatency, opts.HistogramWindowInterval), + ProbePlanAttempts: metric.NewCounter(metaProblemProbePlanAttempts), + ProbePlanFailures: metric.NewCounter(metaProblemProbePlanFailures), + } + + prober := Prober{ db: opts.DB, settings: opts.Settings, + tracer: opts.Tracer, + } - readPlanner: newMeta2Planner(opts.DB, opts.Settings, func() time.Duration { return readInterval.Get(&opts.Settings.SV) }), - writePlanner: newMeta2Planner(opts.DB, opts.Settings, func() time.Duration { return writeInterval.Get(&opts.Settings.SV) }), - - metrics: Metrics{ - ReadProbeAttempts: metric.NewCounter(metaReadProbeAttempts), - ReadProbeFailures: metric.NewCounter(metaReadProbeFailures), - ReadProbeLatency: metric.NewLatency(metaReadProbeLatency, opts.HistogramWindowInterval), - WriteProbeAttempts: metric.NewCounter(metaWriteProbeAttempts), - WriteProbeFailures: metric.NewCounter(metaWriteProbeFailures), - WriteProbeLatency: metric.NewLatency(metaWriteProbeLatency, opts.HistogramWindowInterval), - ProbePlanAttempts: metric.NewCounter(metaProbePlanAttempts), - ProbePlanFailures: metric.NewCounter(metaProbePlanFailures), + prober.loops = []ProbeLoop{ + { + name: "read probe loop", + planner: newMeta2Planner(opts.DB, opts.Settings, func() time.Duration { return readInterval.Get(&opts.Settings.SV) }), + interval: readInterval, + problemTracker: readProblemTracker, + prober: prober.readProbe, + metrics: meta2Metrics, + }, + { + name: "write probe loop", + planner: newMeta2Planner(opts.DB, opts.Settings, func() time.Duration { return readInterval.Get(&opts.Settings.SV) }), + interval: writeInterval, + problemTracker: writeProblemTracker, + prober: prober.writeProbe, + metrics: meta2Metrics, + }, + { + name: "problem read probe loop", + planner: newProblemTrackerPlanner(readProblemTracker), + interval: readInterval, + problemTracker: nil, + prober: prober.readProbe, + metrics: problemMetrics, + }, + { + name: "problem write probe loop", + planner: newProblemTrackerPlanner(writeProblemTracker), + interval: writeInterval, + problemTracker: nil, + prober: prober.writeProbe, + metrics: problemMetrics, }, - tracer: opts.Tracer, } + + return &prober } // Metrics returns a struct which contains the kvprober metrics. -func (p *Prober) Metrics() Metrics { +func (p *Prober) Metrics() []Metrics { return p.metrics } @@ -232,13 +370,13 @@ func (p *Prober) Metrics() Metrics { // returns an error only if stopper.RunAsyncTask returns an error. func (p *Prober) Start(ctx context.Context, stopper *stop.Stopper) error { ctx = logtags.AddTag(ctx, "kvprober", nil /* value */) - startLoop := func(ctx context.Context, opName string, probe func(context.Context, *kv.DB, planner), pl planner, interval *settings.DurationSetting) error { - return stopper.RunAsyncTaskEx(ctx, stop.TaskOpts{TaskName: opName, SpanOpt: stop.SterileRootSpan}, func(ctx context.Context) { + startLoop := func(ctx context.Context, loop ProbeLoop) error { + return stopper.RunAsyncTaskEx(ctx, stop.TaskOpts{TaskName: loop.name, SpanOpt: stop.SterileRootSpan}, func(ctx context.Context) { defer logcrash.RecoverAndReportNonfatalPanic(ctx, &p.settings.SV) rnd, _ /* seed */ := randutil.NewPseudoRand() d := func() time.Duration { - return withJitter(interval.Get(&p.settings.SV), rnd) + return withJitter(loop.interval.Get(&p.settings.SV), rnd) } t := timeutil.NewTimer() defer t.Stop() @@ -257,36 +395,42 @@ func (p *Prober) Start(ctx context.Context, stopper *stop.Stopper) error { return } - probeCtx, sp := tracing.EnsureChildSpan(ctx, p.tracer, opName+" - probe") - probe(probeCtx, p.db, pl) + probeCtx, sp := tracing.EnsureChildSpan(ctx, p.tracer, loop.name+" - probe") + loop.prober(probeCtx, p.db, loop.planner, loop) sp.Finish() } }) } - if err := startLoop(ctx, "read probe loop", p.readProbe, p.readPlanner, readInterval); err != nil { - return err + for _, loop := range p.loops { + if err := startLoop(ctx, loop); err != nil { + return err + } } - return startLoop(ctx, "write probe loop", p.writeProbe, p.writePlanner, writeInterval) + return nil } // Doesn't return an error. Instead increments error type specific metrics. -func (p *Prober) readProbe(ctx context.Context, db *kv.DB, pl planner) { - p.readProbeImpl(ctx, &proberOpsImpl{}, &proberTxnImpl{db: p.db}, pl) +func (p *Prober) readProbe(ctx context.Context, db *kv.DB, pl planner, l ProbeLoop) { + p.readProbeImpl(ctx, &proberOpsImpl{}, &proberTxnImpl{db: p.db}, pl, l) } -func (p *Prober) readProbeImpl(ctx context.Context, ops proberOps, txns proberTxn, pl planner) { +func (p *Prober) readProbeImpl(ctx context.Context, ops proberOps, txns proberTxn, pl planner, l ProbeLoop) { if !readEnabled.Get(&p.settings.SV) { return } - p.metrics.ProbePlanAttempts.Inc(1) + l.metrics.ProbePlanAttempts.Inc(1) step, err := pl.next(ctx) if err != nil { - log.Health.Errorf(ctx, "can't make a plan: %v", err) - p.metrics.ProbePlanFailures.Inc(1) - return + if errors.Is(err, ErrPlanEmpty) { + log.Health.Infof(ctx, "skipping empty plan: %v", err) + } else { + log.Health.Errorf(ctx, "can't make a plan: %v", err) + l.metrics.ProbePlanFailures.Inc(1) + return + } } // If errors above the KV scan, then this counter won't be incremented. @@ -296,7 +440,7 @@ func (p *Prober) readProbeImpl(ctx context.Context, ops proberOps, txns proberTx // ProbePlanFailures. This would probably be a ticket alerting as // the impact is more low visibility into possible failures than a high // impact production issue. - p.metrics.ReadProbeAttempts.Inc(1) + l.metrics.ReadProbeAttempts.Inc(1) start := timeutil.Now() @@ -317,7 +461,10 @@ func (p *Prober) readProbeImpl(ctx context.Context, ops proberOps, txns proberTx if err != nil { // TODO(josh): Write structured events with log.Structured. log.Health.Errorf(ctx, "kv.Get(%s), r=%v failed with: %v", step.Key, step.RangeID, err) - p.metrics.ReadProbeFailures.Inc(1) + if l.problemTracker != nil { + l.problemTracker.add(step) + } + l.metrics.ReadProbeFailures.Inc(1) return } @@ -325,29 +472,33 @@ func (p *Prober) readProbeImpl(ctx context.Context, ops proberOps, txns proberTx log.Health.Infof(ctx, "kv.Get(%s), r=%v returned success in %v", step.Key, step.RangeID, d) // Latency of failures is not recorded. They are counted as failures tho. - p.metrics.ReadProbeLatency.RecordValue(d.Nanoseconds()) + l.metrics.ReadProbeLatency.RecordValue(d.Nanoseconds()) } // Doesn't return an error. Instead increments error type specific metrics. -func (p *Prober) writeProbe(ctx context.Context, db *kv.DB, pl planner) { - p.writeProbeImpl(ctx, &proberOpsImpl{}, &proberTxnImpl{db: p.db}, pl) +func (p *Prober) writeProbe(ctx context.Context, db *kv.DB, pl planner, l ProbeLoop) { + p.writeProbeImpl(ctx, &proberOpsImpl{}, &proberTxnImpl{db: p.db}, pl, l) } -func (p *Prober) writeProbeImpl(ctx context.Context, ops proberOps, txns proberTxn, pl planner) { +func (p *Prober) writeProbeImpl(ctx context.Context, ops proberOps, txns proberTxn, pl planner, l ProbeLoop) { if !writeEnabled.Get(&p.settings.SV) { return } - p.metrics.ProbePlanAttempts.Inc(1) + l.metrics.ProbePlanAttempts.Inc(1) step, err := pl.next(ctx) if err != nil { - log.Health.Errorf(ctx, "can't make a plan: %v", err) - p.metrics.ProbePlanFailures.Inc(1) - return + if errors.Is(err, ErrPlanEmpty) { + log.Health.Infof(ctx, "skipping empty plan: %v", err) + } else { + log.Health.Errorf(ctx, "can't make a plan: %v", err) + l.metrics.ProbePlanFailures.Inc(1) + return + } } - p.metrics.WriteProbeAttempts.Inc(1) + l.metrics.WriteProbeAttempts.Inc(1) start := timeutil.Now() @@ -363,7 +514,10 @@ func (p *Prober) writeProbeImpl(ctx context.Context, ops proberOps, txns proberT }) if err != nil { log.Health.Errorf(ctx, "kv.Txn(Put(%s); Del(-)), r=%v failed with: %v", step.Key, step.RangeID, err) - p.metrics.WriteProbeFailures.Inc(1) + if l.problemTracker != nil { + l.problemTracker.add(step) + } + l.metrics.WriteProbeFailures.Inc(1) return } @@ -371,7 +525,7 @@ func (p *Prober) writeProbeImpl(ctx context.Context, ops proberOps, txns proberT log.Health.Infof(ctx, "kv.Txn(Put(%s); Del(-)), r=%v returned success in %v", step.Key, step.RangeID, d) // Latency of failures is not recorded. They are counted as failures tho. - p.metrics.WriteProbeLatency.RecordValue(d.Nanoseconds()) + l.metrics.WriteProbeLatency.RecordValue(d.Nanoseconds()) } // Returns a random duration pulled from the uniform distribution given below: diff --git a/pkg/kv/kvprober/planner.go b/pkg/kv/kvprober/planner.go index 36042f70dd22..1d79e0de69eb 100644 --- a/pkg/kv/kvprober/planner.go +++ b/pkg/kv/kvprober/planner.go @@ -275,3 +275,15 @@ func meta2KVsToPlanImpl(kvs []kv.KeyValue) ([]Step, error) { return plans, nil } + +type problemTrackerPlanner struct { + tracker *problemTracker +} + +func newProblemTrackerPlanner(tracker *problemTracker) *problemTrackerPlanner { + return &problemTrackerPlanner{tracker: tracker} +} + +func (p *problemTrackerPlanner) next(ctx context.Context) (Step, error) { + return p.tracker.next() +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 4a84684d58c5..caede1292745 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -683,7 +683,9 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { Settings: st, HistogramWindowInterval: cfg.HistogramWindowInterval(), }) - registry.AddMetricStruct(kvProber.Metrics()) + for _, metrics := range kvProber.Metrics() { + registry.AddMetricStruct(metrics) + } settingsWriter := newSettingsCacheWriter(engines[0], stopper) sqlServer, err := newSQLServer(ctx, sqlServerArgs{