Skip to content

Commit

Permalink
kv: update kvprober with quarantine pool
Browse files Browse the repository at this point in the history
These changes update the kvprober to add ranges that
fail probing into a quarantine pool where they are
continuously probed. A metric which indicates the
duration of the longest tenured range has also been
added.

Resolves #74407

Release justification: low risk, high benefit changes to
existing functionality.

Release note: None
  • Loading branch information
Santamaura committed Sep 1, 2022
1 parent 34089c8 commit b3f9b08
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 12 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvprober/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
srcs = [
"kvprober.go",
"planner.go",
"quarantine_pool.go",
"settings.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvprober",
Expand Down
61 changes: 49 additions & 12 deletions pkg/kv/kvprober/kvprober.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type Prober struct {
// goal of the prober IS to populate these metrics.
metrics Metrics
tracer *tracing.Tracer
// quarantine pools keep track of ranges that timed/errored when probing and
// continually probe those ranges until killed/probe is succesful.
quarantineWritePool *quarantinePool
}

// Opts provides knobs to control kvprober.Prober.
Expand Down Expand Up @@ -120,21 +123,29 @@ var (
Measurement: "Runs",
Unit: metric.Unit_COUNT,
}
metaWriteProbeQuarantineOldestDuration = metric.Metadata{
Name: "kv.prober.write.quarantine.oldest_duration",
Help: "The duration that the oldest range in the " +
"write quarantine pool has remained",
Measurement: "Seconds",
Unit: metric.Unit_SECONDS,
}
// TODO(josh): Add a histogram that captures where in the "rangespace" errors
// are occurring. This will allow operators to see at a glance what percentage
// of ranges are affected.
)

// Metrics groups together the metrics that kvprober exports.
type Metrics struct {
ReadProbeAttempts *metric.Counter
ReadProbeFailures *metric.Counter
ReadProbeLatency *metric.Histogram
WriteProbeAttempts *metric.Counter
WriteProbeFailures *metric.Counter
WriteProbeLatency *metric.Histogram
ProbePlanAttempts *metric.Counter
ProbePlanFailures *metric.Counter
ReadProbeAttempts *metric.Counter
ReadProbeFailures *metric.Counter
ReadProbeLatency *metric.Histogram
WriteProbeAttempts *metric.Counter
WriteProbeFailures *metric.Counter
WriteProbeLatency *metric.Histogram
ProbePlanAttempts *metric.Counter
ProbePlanFailures *metric.Counter
WriteProbeQuarantineOldestDuration *metric.Gauge
}

// proberOpsI is an interface that the prober will use to run ops against some
Expand Down Expand Up @@ -224,10 +235,12 @@ func NewProber(opts Opts) *Prober {
WriteProbeLatency: metric.NewHistogram(
metaWriteProbeLatency, opts.HistogramWindowInterval, metric.NetworkLatencyBuckets,
),
ProbePlanAttempts: metric.NewCounter(metaProbePlanAttempts),
ProbePlanFailures: metric.NewCounter(metaProbePlanFailures),
ProbePlanAttempts: metric.NewCounter(metaProbePlanAttempts),
ProbePlanFailures: metric.NewCounter(metaProbePlanFailures),
WriteProbeQuarantineOldestDuration: metric.NewGauge(metaWriteProbeQuarantineOldestDuration),
},
tracer: opts.Tracer,
tracer: opts.Tracer,
quarantineWritePool: newQuarantinePool(opts.Settings),
}
}

Expand Down Expand Up @@ -275,7 +288,10 @@ func (p *Prober) Start(ctx context.Context, stopper *stop.Stopper) error {
if err := startLoop(ctx, "read probe loop", p.readProbe, p.readPlanner, readInterval); err != nil {
return err
}
return startLoop(ctx, "write probe loop", p.writeProbe, p.writePlanner, writeInterval)
if err := startLoop(ctx, "write probe loop", p.writeProbe, p.writePlanner, writeInterval); err != nil {
return err
}
return startLoop(ctx, "quarantine write probe loop", p.quarantineProbe, p.quarantineWritePool, quarantineWriteInterval)
}

// Doesn't return an error. Instead increments error type specific metrics.
Expand Down Expand Up @@ -372,8 +388,20 @@ func (p *Prober) writeProbeImpl(ctx context.Context, ops proberOpsI, txns prober
if err != nil {
log.Health.Errorf(ctx, "kv.Txn(Put(%s); Del(-)), r=%v failed with: %v", step.Key, step.RangeID, err)
p.metrics.WriteProbeFailures.Inc(1)
// Add to quarantine pool if not in the entry map.
if _, ok := p.quarantineWritePool.entryTimeMap[step.RangeID]; !ok {
p.quarantineWritePool.add(ctx, step)
} else {
duration := int64(timeutil.Since(p.quarantineWritePool.entryTimeMap[step.RangeID]).Seconds())
longestDuration := p.metrics.WriteProbeQuarantineOldestDuration.Value()
if duration > longestDuration {
p.metrics.WriteProbeQuarantineOldestDuration.Update(duration)
}
}
return
}
// This will no-op if not in the quarantine pool.
p.quarantineWritePool.remove(ctx, step)

d := timeutil.Since(start)
log.Health.Infof(ctx, "kv.Txn(Put(%s); Del(-)), r=%v returned success in %v", step.Key, step.RangeID, d)
Expand All @@ -382,6 +410,15 @@ func (p *Prober) writeProbeImpl(ctx context.Context, ops proberOpsI, txns prober
p.metrics.WriteProbeLatency.RecordValue(d.Nanoseconds())
}

// Wrapper function for probing the quarantine pool.
func (p *Prober) quarantineProbe(ctx context.Context, db *kv.DB, pl planner) {
if !quarantineWriteEnabled.Get(&p.settings.SV) {
return
}

p.writeProbe(ctx, db, pl)
}

// Returns a random duration pulled from the uniform distribution given below:
// [d - 0.25*d, d + 0.25*d).
func withJitter(d time.Duration, rnd *rand.Rand) time.Duration {
Expand Down
82 changes: 82 additions & 0 deletions pkg/kv/kvprober/quarantine_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

// Package kvprober sends queries to KV in a loop, with configurable sleep
// times, in order to generate data about the healthiness or unhealthiness of
// kvclient & below.
//
// Prober increments metrics that SRE & other operators can use as alerting
// signals. It also writes to logs to help narrow down the problem (e.g. which
// range(s) are acting up).
package kvprober

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

type quarantinePool struct {
steps []Step
size int64
entryTimeMap map[roachpb.RangeID]time.Time
}

func newQuarantinePool(settings *cluster.Settings) *quarantinePool {
poolSize := quarantinePoolSize.Get(&settings.SV)
return &quarantinePool{
size: poolSize,
entryTimeMap: make(map[roachpb.RangeID]time.Time),
steps: make([]Step, poolSize),
}
}

func (qp *quarantinePool) add(ctx context.Context, step Step) {
if int64(len(qp.steps)) >= qp.size-1 {
log.Health.Errorf(ctx, "cannot add range %s to quarantine pool, at capacity", step.RangeID.String())
} else {
qp.steps = append(qp.steps, step)
qp.entryTimeMap[step.RangeID] = timeutil.Now()
}
}

func (qp *quarantinePool) remove(ctx context.Context, step Step) {
if len(qp.steps) < 1 {
log.Health.Errorf(ctx, "cannot remove range %s from quarantine pool, pool is empty", step.RangeID.String())
return
}
idx := -1
for k, v := range qp.steps {
if v.RangeID == step.RangeID {
idx = k
break
}
}
if idx == -1 {
log.Health.Errorf(ctx, "cannot remove range %s from quarantine pool, not found", step.RangeID.String())
return
}
// Expensive op if pool size is very large.
qp.steps = append(qp.steps[:idx], qp.steps[idx+1:]...)
delete(qp.entryTimeMap, step.RangeID)
}

func (qp *quarantinePool) next(ctx context.Context) (Step, error) {
if len(qp.steps) > 0 {
step := qp.steps[0]
return step, nil
}
return Step{}, errors.New("there are no keys in quarantine")
}
34 changes: 34 additions & 0 deletions pkg/kv/kvprober/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,37 @@ var numStepsToPlanAtOnce = settings.RegisterIntSetting(
}
return nil
})

var quarantinePoolSize = settings.RegisterIntSetting(
settings.TenantWritable,
"kv.prober.quarantine_pool_size",
"the maximum size of the kv prober quarantine pool, where the quarantine "+
"pool holds Steps for ranges that have been probed and timed out; If "+
"the quarantine pool is full, probes that fail will not be added to "+
" the pool",
100, func(i int64) error {
if i <= 0 {
return errors.New("param must be >0")
}
return nil
})

var quarantineWriteEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
"kv.prober.quarantine.write.enabled",
"whether the KV write prober is enabled for the quaranatine pool",
false)

var quarantineWriteInterval = settings.RegisterDurationSetting(
settings.TenantWritable,
"kv.prober.quarantine.write.interval",
"how often each node sends a write probe for the quarantine pool to the KV layer "+
"on average (jitter is added); "+
"note that a very slow read can block kvprober from sending additional probes; "+
"kv.prober.write.timeout controls the max time kvprober can be blocked",
10*time.Second, func(duration time.Duration) error {
if duration <= 0 {
return errors.New("param must be >0")
}
return nil
})

0 comments on commit b3f9b08

Please sign in to comment.