Skip to content

Commit

Permalink
kvprober: add probes for problem ranges
Browse files Browse the repository at this point in the history
The kvprober provides good coverage of issues that affect many ranges,
but has a lower probability of detecting individual bad ranges. To
improve coverage of the latter case, remember failed ranges from the
existing prober and probe them more frequently in a second pair of probe
loops.

Resolves cockroachdb#74407.

Release note: None
  • Loading branch information
jmcarp committed Feb 4, 2022
1 parent 7499c27 commit 6d3d3fc
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 48 deletions.
248 changes: 201 additions & 47 deletions pkg/kv/kvprober/kvprober.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,62 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
)

const putValue = "thekvproberwrotethis"

var ErrPlanEmpty = errors.New("no keys need to be probed")

// Prober sends queries to KV in a loop. See package docstring for more.
type Prober struct {
db *kv.DB
settings *cluster.Settings
tracer *tracing.Tracer
loops []ProbeLoop
metrics []Metrics
}

type ProbeLoop struct {
name string
prober func(context.Context, *kv.DB, planner, ProbeLoop)
// planner is an interface for selecting a range to probe. There are
// separate planners for the read & write probe loops, so as to achieve
// a balanced probing of the keyspace, regardless of differences in the rate
// at which Prober sends different probes. Also note that planner is
// NOT thread-safe.
readPlanner planner
writePlanner planner
planner planner
interval *settings.DurationSetting
problemTracker *problemTracker
// metrics wraps up the set of prometheus metrics that the prober sets; the
// goal of the prober IS to populate these metrics.
metrics Metrics
tracer *tracing.Tracer
}

type problemTracker struct {
steps []Step
size int
}

func newProblemTracker() *problemTracker {
return &problemTracker{size: 3}
}

func (pt *problemTracker) add(step Step) {
if len(pt.steps) >= pt.size-1 {
pt.steps = append(pt.steps[1:], step)
} else {
pt.steps = append(pt.steps, step)
}
}

// Choose a random problem range, if any available
func (pt *problemTracker) next() (Step, error) {
if len(pt.steps) > 0 {
return pt.steps[rand.Intn(len(pt.steps))], nil
} else {
return Step{}, ErrPlanEmpty
}
}

// Opts provides knobs to control kvprober.Prober.
Expand Down Expand Up @@ -120,6 +156,61 @@ var (
Measurement: "Runs",
Unit: metric.Unit_COUNT,
}

metaProblemReadProbeAttempts = metric.Metadata{
Name: "kv.prober.problem.read.attempts",
Help: "Number of attempts made to read probe KV, regardless of outcome",
Measurement: "Queries",
Unit: metric.Unit_COUNT,
}
metaProblemReadProbeFailures = metric.Metadata{
Name: "kv.prober.problem.read.failures",
Help: "Number of attempts made to read probe KV that failed, " +
"whether due to error or timeout",
Measurement: "Queries",
Unit: metric.Unit_COUNT,
}
metaProblemReadProbeLatency = metric.Metadata{
Name: "kv.prober.problem.read.latency",
Help: "Latency of successful KV read probes",
Measurement: "Latency",
Unit: metric.Unit_NANOSECONDS,
}
metaProblemWriteProbeAttempts = metric.Metadata{
Name: "kv.prober.problem.write.attempts",
Help: "Number of attempts made to write probe KV, regardless of outcome",
Measurement: "Queries",
Unit: metric.Unit_COUNT,
}
metaProblemWriteProbeFailures = metric.Metadata{
Name: "kv.prober.problem.write.failures",
Help: "Number of attempts made to write probe KV that failed, " +
"whether due to error or timeout",
Measurement: "Queries",
Unit: metric.Unit_COUNT,
}
metaProblemWriteProbeLatency = metric.Metadata{
Name: "kv.prober.problem.write.latency",
Help: "Latency of successful KV write probes",
Measurement: "Latency",
Unit: metric.Unit_NANOSECONDS,
}
metaProblemProbePlanAttempts = metric.Metadata{
Name: "kv.prober.problem.planning_attempts",
Help: "Number of attempts at planning out probes made; " +
"in order to probe KV we need to plan out which ranges to probe;",
Measurement: "Runs",
Unit: metric.Unit_COUNT,
}
metaProblemProbePlanFailures = metric.Metadata{
Name: "kv.prober.problem.planning_failures",
Help: "Number of attempts at planning out probes that failed; " +
"in order to probe KV we need to plan out which ranges to probe; " +
"if planning fails, then kvprober is not able to send probes to " +
"all ranges; consider alerting on this metric as a result",
Measurement: "Runs",
Unit: metric.Unit_COUNT,
}
// TODO(josh): Add a histogram that captures where in the "rangespace" errors
// are occurring. This will allow operators to see at a glance what percentage
// of ranges are affected.
Expand Down Expand Up @@ -202,43 +293,90 @@ func (p *proberTxnImpl) TxnRootKV(

// NewProber creates a Prober from Opts.
func NewProber(opts Opts) *Prober {
return &Prober{
readProblemTracker := newProblemTracker()
writeProblemTracker := newProblemTracker()

meta2Metrics := Metrics{
ReadProbeAttempts: metric.NewCounter(metaReadProbeAttempts),
ReadProbeFailures: metric.NewCounter(metaReadProbeFailures),
ReadProbeLatency: metric.NewLatency(metaReadProbeLatency, opts.HistogramWindowInterval),
WriteProbeAttempts: metric.NewCounter(metaWriteProbeAttempts),
WriteProbeFailures: metric.NewCounter(metaWriteProbeFailures),
WriteProbeLatency: metric.NewLatency(metaWriteProbeLatency, opts.HistogramWindowInterval),
ProbePlanAttempts: metric.NewCounter(metaProbePlanAttempts),
ProbePlanFailures: metric.NewCounter(metaProbePlanFailures),
}
problemMetrics := Metrics{
ReadProbeAttempts: metric.NewCounter(metaProblemReadProbeAttempts),
ReadProbeFailures: metric.NewCounter(metaProblemReadProbeFailures),
ReadProbeLatency: metric.NewLatency(metaProblemReadProbeLatency, opts.HistogramWindowInterval),
WriteProbeAttempts: metric.NewCounter(metaProblemWriteProbeAttempts),
WriteProbeFailures: metric.NewCounter(metaProblemWriteProbeFailures),
WriteProbeLatency: metric.NewLatency(metaProblemWriteProbeLatency, opts.HistogramWindowInterval),
ProbePlanAttempts: metric.NewCounter(metaProblemProbePlanAttempts),
ProbePlanFailures: metric.NewCounter(metaProblemProbePlanFailures),
}

prober := Prober{
db: opts.DB,
settings: opts.Settings,
tracer: opts.Tracer,
}

readPlanner: newMeta2Planner(opts.DB, opts.Settings, func() time.Duration { return readInterval.Get(&opts.Settings.SV) }),
writePlanner: newMeta2Planner(opts.DB, opts.Settings, func() time.Duration { return writeInterval.Get(&opts.Settings.SV) }),

metrics: Metrics{
ReadProbeAttempts: metric.NewCounter(metaReadProbeAttempts),
ReadProbeFailures: metric.NewCounter(metaReadProbeFailures),
ReadProbeLatency: metric.NewLatency(metaReadProbeLatency, opts.HistogramWindowInterval),
WriteProbeAttempts: metric.NewCounter(metaWriteProbeAttempts),
WriteProbeFailures: metric.NewCounter(metaWriteProbeFailures),
WriteProbeLatency: metric.NewLatency(metaWriteProbeLatency, opts.HistogramWindowInterval),
ProbePlanAttempts: metric.NewCounter(metaProbePlanAttempts),
ProbePlanFailures: metric.NewCounter(metaProbePlanFailures),
prober.loops = []ProbeLoop{
{
name: "read probe loop",
planner: newMeta2Planner(opts.DB, opts.Settings, func() time.Duration { return readInterval.Get(&opts.Settings.SV) }),
interval: readInterval,
problemTracker: readProblemTracker,
prober: prober.readProbe,
metrics: meta2Metrics,
},
{
name: "write probe loop",
planner: newMeta2Planner(opts.DB, opts.Settings, func() time.Duration { return readInterval.Get(&opts.Settings.SV) }),
interval: writeInterval,
problemTracker: writeProblemTracker,
prober: prober.writeProbe,
metrics: meta2Metrics,
},
{
name: "problem read probe loop",
planner: newProblemTrackerPlanner(readProblemTracker),
interval: readInterval,
problemTracker: nil,
prober: prober.readProbe,
metrics: problemMetrics,
},
{
name: "problem write probe loop",
planner: newProblemTrackerPlanner(writeProblemTracker),
interval: writeInterval,
problemTracker: nil,
prober: prober.writeProbe,
metrics: problemMetrics,
},
tracer: opts.Tracer,
}

return &prober
}

// Metrics returns a struct which contains the kvprober metrics.
func (p *Prober) Metrics() Metrics {
func (p *Prober) Metrics() []Metrics {
return p.metrics
}

// Start causes kvprober to start probing KV. Start returns immediately. Start
// returns an error only if stopper.RunAsyncTask returns an error.
func (p *Prober) Start(ctx context.Context, stopper *stop.Stopper) error {
ctx = logtags.AddTag(ctx, "kvprober", nil /* value */)
startLoop := func(ctx context.Context, opName string, probe func(context.Context, *kv.DB, planner), pl planner, interval *settings.DurationSetting) error {
return stopper.RunAsyncTaskEx(ctx, stop.TaskOpts{TaskName: opName, SpanOpt: stop.SterileRootSpan}, func(ctx context.Context) {
startLoop := func(ctx context.Context, loop ProbeLoop) error {
return stopper.RunAsyncTaskEx(ctx, stop.TaskOpts{TaskName: loop.name, SpanOpt: stop.SterileRootSpan}, func(ctx context.Context) {
defer logcrash.RecoverAndReportNonfatalPanic(ctx, &p.settings.SV)

rnd, _ /* seed */ := randutil.NewPseudoRand()
d := func() time.Duration {
return withJitter(interval.Get(&p.settings.SV), rnd)
return withJitter(loop.interval.Get(&p.settings.SV), rnd)
}
t := timeutil.NewTimer()
defer t.Stop()
Expand All @@ -257,36 +395,42 @@ func (p *Prober) Start(ctx context.Context, stopper *stop.Stopper) error {
return
}

probeCtx, sp := tracing.EnsureChildSpan(ctx, p.tracer, opName+" - probe")
probe(probeCtx, p.db, pl)
probeCtx, sp := tracing.EnsureChildSpan(ctx, p.tracer, loop.name+" - probe")
loop.prober(probeCtx, p.db, loop.planner, loop)
sp.Finish()
}
})
}

if err := startLoop(ctx, "read probe loop", p.readProbe, p.readPlanner, readInterval); err != nil {
return err
for _, loop := range p.loops {
if err := startLoop(ctx, loop); err != nil {
return err
}
}
return startLoop(ctx, "write probe loop", p.writeProbe, p.writePlanner, writeInterval)
return nil
}

// Doesn't return an error. Instead increments error type specific metrics.
func (p *Prober) readProbe(ctx context.Context, db *kv.DB, pl planner) {
p.readProbeImpl(ctx, &proberOpsImpl{}, &proberTxnImpl{db: p.db}, pl)
func (p *Prober) readProbe(ctx context.Context, db *kv.DB, pl planner, l ProbeLoop) {
p.readProbeImpl(ctx, &proberOpsImpl{}, &proberTxnImpl{db: p.db}, pl, l)
}

func (p *Prober) readProbeImpl(ctx context.Context, ops proberOps, txns proberTxn, pl planner) {
func (p *Prober) readProbeImpl(ctx context.Context, ops proberOps, txns proberTxn, pl planner, l ProbeLoop) {
if !readEnabled.Get(&p.settings.SV) {
return
}

p.metrics.ProbePlanAttempts.Inc(1)
l.metrics.ProbePlanAttempts.Inc(1)

step, err := pl.next(ctx)
if err != nil {
log.Health.Errorf(ctx, "can't make a plan: %v", err)
p.metrics.ProbePlanFailures.Inc(1)
return
if errors.Is(err, ErrPlanEmpty) {
log.Health.Infof(ctx, "skipping empty plan: %v", err)
} else {
log.Health.Errorf(ctx, "can't make a plan: %v", err)
l.metrics.ProbePlanFailures.Inc(1)
return
}
}

// If errors above the KV scan, then this counter won't be incremented.
Expand All @@ -296,7 +440,7 @@ func (p *Prober) readProbeImpl(ctx context.Context, ops proberOps, txns proberTx
// ProbePlanFailures. This would probably be a ticket alerting as
// the impact is more low visibility into possible failures than a high
// impact production issue.
p.metrics.ReadProbeAttempts.Inc(1)
l.metrics.ReadProbeAttempts.Inc(1)

start := timeutil.Now()

Expand All @@ -317,37 +461,44 @@ func (p *Prober) readProbeImpl(ctx context.Context, ops proberOps, txns proberTx
if err != nil {
// TODO(josh): Write structured events with log.Structured.
log.Health.Errorf(ctx, "kv.Get(%s), r=%v failed with: %v", step.Key, step.RangeID, err)
p.metrics.ReadProbeFailures.Inc(1)
if l.problemTracker != nil {
l.problemTracker.add(step)
}
l.metrics.ReadProbeFailures.Inc(1)
return
}

d := timeutil.Since(start)
log.Health.Infof(ctx, "kv.Get(%s), r=%v returned success in %v", step.Key, step.RangeID, d)

// Latency of failures is not recorded. They are counted as failures tho.
p.metrics.ReadProbeLatency.RecordValue(d.Nanoseconds())
l.metrics.ReadProbeLatency.RecordValue(d.Nanoseconds())
}

// Doesn't return an error. Instead increments error type specific metrics.
func (p *Prober) writeProbe(ctx context.Context, db *kv.DB, pl planner) {
p.writeProbeImpl(ctx, &proberOpsImpl{}, &proberTxnImpl{db: p.db}, pl)
func (p *Prober) writeProbe(ctx context.Context, db *kv.DB, pl planner, l ProbeLoop) {
p.writeProbeImpl(ctx, &proberOpsImpl{}, &proberTxnImpl{db: p.db}, pl, l)
}

func (p *Prober) writeProbeImpl(ctx context.Context, ops proberOps, txns proberTxn, pl planner) {
func (p *Prober) writeProbeImpl(ctx context.Context, ops proberOps, txns proberTxn, pl planner, l ProbeLoop) {
if !writeEnabled.Get(&p.settings.SV) {
return
}

p.metrics.ProbePlanAttempts.Inc(1)
l.metrics.ProbePlanAttempts.Inc(1)

step, err := pl.next(ctx)
if err != nil {
log.Health.Errorf(ctx, "can't make a plan: %v", err)
p.metrics.ProbePlanFailures.Inc(1)
return
if errors.Is(err, ErrPlanEmpty) {
log.Health.Infof(ctx, "skipping empty plan: %v", err)
} else {
log.Health.Errorf(ctx, "can't make a plan: %v", err)
l.metrics.ProbePlanFailures.Inc(1)
return
}
}

p.metrics.WriteProbeAttempts.Inc(1)
l.metrics.WriteProbeAttempts.Inc(1)

start := timeutil.Now()

Expand All @@ -363,15 +514,18 @@ func (p *Prober) writeProbeImpl(ctx context.Context, ops proberOps, txns proberT
})
if err != nil {
log.Health.Errorf(ctx, "kv.Txn(Put(%s); Del(-)), r=%v failed with: %v", step.Key, step.RangeID, err)
p.metrics.WriteProbeFailures.Inc(1)
if l.problemTracker != nil {
l.problemTracker.add(step)
}
l.metrics.WriteProbeFailures.Inc(1)
return
}

d := timeutil.Since(start)
log.Health.Infof(ctx, "kv.Txn(Put(%s); Del(-)), r=%v returned success in %v", step.Key, step.RangeID, d)

// Latency of failures is not recorded. They are counted as failures tho.
p.metrics.WriteProbeLatency.RecordValue(d.Nanoseconds())
l.metrics.WriteProbeLatency.RecordValue(d.Nanoseconds())
}

// Returns a random duration pulled from the uniform distribution given below:
Expand Down
12 changes: 12 additions & 0 deletions pkg/kv/kvprober/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,15 @@ func meta2KVsToPlanImpl(kvs []kv.KeyValue) ([]Step, error) {

return plans, nil
}

type problemTrackerPlanner struct {
tracker *problemTracker
}

func newProblemTrackerPlanner(tracker *problemTracker) *problemTrackerPlanner {
return &problemTrackerPlanner{tracker: tracker}
}

func (p *problemTrackerPlanner) next(ctx context.Context) (Step, error) {
return p.tracker.next()
}
Loading

0 comments on commit 6d3d3fc

Please sign in to comment.