Skip to content

Commit

Permalink
Merge #87436
Browse files Browse the repository at this point in the history
87436: kv: update kvprober with quarantine pool r=Santamaura a=Santamaura

These changes update the kvprober to add ranges that
fail probing into a quarantine pool where they are
continuously probed. A metric which indicates the
duration of the longest tenured range has also been
added.

Resolves #74407

Release justification: low risk, high benefit changes to
existing functionality.

Release note: None

Co-authored-by: Santamaura <[email protected]>
Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
3 people committed Sep 23, 2022
2 parents 2fddd36 + 165423b commit b6f8265
Show file tree
Hide file tree
Showing 8 changed files with 330 additions and 22 deletions.
3 changes: 3 additions & 0 deletions pkg/kv/kvprober/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
srcs = [
"kvprober.go",
"planner.go",
"quarantine_pool.go",
"settings.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvprober",
Expand All @@ -22,6 +23,7 @@ go_library(
"//pkg/util/metric",
"//pkg/util/randutil",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
Expand All @@ -37,6 +39,7 @@ go_test(
"kvprober_test.go",
"main_test.go",
"planner_test.go",
"quarantine_pool_test.go",
],
args = ["-test.timeout=295s"],
embed = [":kvprober"],
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvprober/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ var (
)

func (p *Prober) ReadProbe(ctx context.Context, db *kv.DB) {
p.readProbe(ctx, db, p.readPlanner)
p.readProbe(ctx, p.readPlanner)
}

func (p *Prober) WriteProbe(ctx context.Context, db *kv.DB) {
p.writeProbe(ctx, db, p.writePlanner)
p.writeProbe(ctx, p.writePlanner)
}

type recordingPlanner struct {
Expand All @@ -50,7 +50,7 @@ func (rp *recordingPlanner) next(ctx context.Context) (Step, error) {
func (p *Prober) WriteProbeReturnLastStep(ctx context.Context, db *kv.DB) *Step {
rp := &recordingPlanner{}
rp.pl = p.writePlanner
p.writeProbe(ctx, db, rp)
p.writeProbe(ctx, rp)
return &rp.last
}

Expand Down
80 changes: 64 additions & 16 deletions pkg/kv/kvprober/kvprober.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type Prober struct {
// goal of the prober IS to populate these metrics.
metrics Metrics
tracer *tracing.Tracer
// quarantineWritePool pool keeps track of ranges that timed out/errored when
// probing and repeatedly probes those ranges until a probe succeeds.
quarantineWritePool *quarantinePool
}

// Opts provides knobs to control kvprober.Prober.
Expand Down Expand Up @@ -104,6 +107,14 @@ var (
Measurement: "Latency",
Unit: metric.Unit_NANOSECONDS,
}
metaWriteProbeQuarantineOldestDuration = metric.Metadata{
Name: "kv.prober.write.quarantine.oldest_duration",
Help: "The duration that the oldest range in the " +
"write quarantine pool has remained",
Measurement: "Seconds",
Unit: metric.Unit_SECONDS,
}

metaProbePlanAttempts = metric.Metadata{
Name: "kv.prober.planning_attempts",
Help: "Number of attempts at planning out probes made; " +
Expand All @@ -127,14 +138,15 @@ var (

// Metrics groups together the metrics that kvprober exports.
type Metrics struct {
ReadProbeAttempts *metric.Counter
ReadProbeFailures *metric.Counter
ReadProbeLatency *metric.Histogram
WriteProbeAttempts *metric.Counter
WriteProbeFailures *metric.Counter
WriteProbeLatency *metric.Histogram
ProbePlanAttempts *metric.Counter
ProbePlanFailures *metric.Counter
ReadProbeAttempts *metric.Counter
ReadProbeFailures *metric.Counter
ReadProbeLatency *metric.Histogram
WriteProbeAttempts *metric.Counter
WriteProbeFailures *metric.Counter
WriteProbeLatency *metric.Histogram
WriteProbeQuarantineOldestDuration *metric.Gauge
ProbePlanAttempts *metric.Counter
ProbePlanFailures *metric.Counter
}

// proberOpsI is an interface that the prober will use to run ops against some
Expand Down Expand Up @@ -206,6 +218,7 @@ func (p *proberTxnImpl) TxnRootKV(

// NewProber creates a Prober from Opts.
func NewProber(opts Opts) *Prober {
qPool := newQuarantinePool(opts.Settings)
return &Prober{
db: opts.DB,
settings: opts.Settings,
Expand All @@ -224,10 +237,15 @@ func NewProber(opts Opts) *Prober {
WriteProbeLatency: metric.NewHistogram(
metaWriteProbeLatency, opts.HistogramWindowInterval, metric.NetworkLatencyBuckets,
),
WriteProbeQuarantineOldestDuration: metric.NewFunctionalGauge(
metaWriteProbeQuarantineOldestDuration,
func() int64 { return qPool.oldestDuration().Nanoseconds() },
),
ProbePlanAttempts: metric.NewCounter(metaProbePlanAttempts),
ProbePlanFailures: metric.NewCounter(metaProbePlanFailures),
},
tracer: opts.Tracer,
tracer: opts.Tracer,
quarantineWritePool: qPool,
}
}

Expand All @@ -240,7 +258,7 @@ func (p *Prober) Metrics() Metrics {
// returns an error only if stopper.RunAsyncTask returns an error.
func (p *Prober) Start(ctx context.Context, stopper *stop.Stopper) error {
ctx = logtags.AddTag(ctx, "kvprober", nil /* value */)
startLoop := func(ctx context.Context, opName string, probe func(context.Context, *kv.DB, planner), pl planner, interval *settings.DurationSetting) error {
startLoop := func(ctx context.Context, opName string, probe func(context.Context, planner), pl planner, interval *settings.DurationSetting) error {
return stopper.RunAsyncTaskEx(ctx, stop.TaskOpts{TaskName: opName, SpanOpt: stop.SterileRootSpan}, func(ctx context.Context) {
defer logcrash.RecoverAndReportNonfatalPanic(ctx, &p.settings.SV)

Expand All @@ -266,7 +284,7 @@ func (p *Prober) Start(ctx context.Context, stopper *stop.Stopper) error {
}

probeCtx, sp := tracing.EnsureChildSpan(ctx, p.tracer, opName+" - probe")
probe(probeCtx, p.db, pl)
probe(probeCtx, pl)
sp.Finish()
}
})
Expand All @@ -275,11 +293,19 @@ func (p *Prober) Start(ctx context.Context, stopper *stop.Stopper) error {
if err := startLoop(ctx, "read probe loop", p.readProbe, p.readPlanner, readInterval); err != nil {
return err
}
return startLoop(ctx, "write probe loop", p.writeProbe, p.writePlanner, writeInterval)
if err := startLoop(ctx, "write probe loop", p.writeProbe, p.writePlanner, writeInterval); err != nil {
return err
}
// The purpose of the quarantine pool is to detect outages affecting a small number
// of ranges but at a high confidence. The quarantine pool does this by repeatedly
// probing ranges that are in the pool.
return startLoop(ctx, "quarantine write probe loop", p.quarantineProbe, p.quarantineWritePool, quarantineWriteInterval)
}

// Doesn't return an error. Instead increments error type specific metrics.
func (p *Prober) readProbe(ctx context.Context, db *kv.DB, pl planner) {
// Doesn't return an error. Instead, increments error type specific metrics.
//
// TODO(tbg): db parameter is unused, remove it.
func (p *Prober) readProbe(ctx context.Context, pl planner) {
p.readProbeImpl(ctx, &ProberOps{}, &proberTxnImpl{db: p.db}, pl)
}

Expand All @@ -291,6 +317,9 @@ func (p *Prober) readProbeImpl(ctx context.Context, ops proberOpsI, txns proberT
p.metrics.ProbePlanAttempts.Inc(1)

step, err := pl.next(ctx)
if err == nil && step.RangeID == 0 {
return
}
if err != nil {
log.Health.Errorf(ctx, "can't make a plan: %v", err)
p.metrics.ProbePlanFailures.Inc(1)
Expand Down Expand Up @@ -337,7 +366,7 @@ func (p *Prober) readProbeImpl(ctx context.Context, ops proberOpsI, txns proberT
}

// Doesn't return an error. Instead increments error type specific metrics.
func (p *Prober) writeProbe(ctx context.Context, db *kv.DB, pl planner) {
func (p *Prober) writeProbe(ctx context.Context, pl planner) {
p.writeProbeImpl(ctx, &ProberOps{}, &proberTxnImpl{db: p.db}, pl)
}

Expand All @@ -349,6 +378,11 @@ func (p *Prober) writeProbeImpl(ctx context.Context, ops proberOpsI, txns prober
p.metrics.ProbePlanAttempts.Inc(1)

step, err := pl.next(ctx)
// In the case where the quarantine pool is empty don't record a planning failure since
// it isn't an actual plan failure.
if err == nil && step.RangeID == 0 {
return
}
if err != nil {
log.Health.Errorf(ctx, "can't make a plan: %v", err)
p.metrics.ProbePlanFailures.Inc(1)
Expand All @@ -370,10 +404,15 @@ func (p *Prober) writeProbeImpl(ctx context.Context, ops proberOpsI, txns prober
return txns.TxnRootKV(ctx, f)
})
if err != nil {
log.Health.Errorf(ctx, "kv.Txn(Put(%s); Del(-)), r=%v failed with: %v", step.Key, step.RangeID, err)
added := p.quarantineWritePool.maybeAdd(ctx, step)
log.Health.Errorf(
ctx, "kv.Txn(Put(%s); Del(-)), r=%v failed with: %v [quarantined=%t]", step.Key, step.RangeID, err, added,
)
p.metrics.WriteProbeFailures.Inc(1)
return
}
// This will no-op if not in the quarantine pool.
p.quarantineWritePool.maybeRemove(ctx, step)

d := timeutil.Since(start)
log.Health.Infof(ctx, "kv.Txn(Put(%s); Del(-)), r=%v returned success in %v", step.Key, step.RangeID, d)
Expand All @@ -382,6 +421,15 @@ func (p *Prober) writeProbeImpl(ctx context.Context, ops proberOpsI, txns prober
p.metrics.WriteProbeLatency.RecordValue(d.Nanoseconds())
}

// Wrapper function for probing the quarantine pool.
func (p *Prober) quarantineProbe(ctx context.Context, pl planner) {
if !quarantineWriteEnabled.Get(&p.settings.SV) {
return
}

p.writeProbe(ctx, pl)
}

// Returns a random duration pulled from the uniform distribution given below:
// [d - 0.25*d, d + 0.25*d).
func withJitter(d time.Duration, rnd *rand.Rand) time.Duration {
Expand Down
17 changes: 14 additions & 3 deletions pkg/kv/kvprober/kvprober_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -200,6 +201,7 @@ func initTestProber(ctx context.Context, m *mock) *Prober {
})
readEnabled.Override(ctx, &p.settings.SV, m.read)
writeEnabled.Override(ctx, &p.settings.SV, m.write)
quarantineWriteEnabled.Override(ctx, &p.settings.SV, m.qWrite)
bypassAdmissionControl.Override(ctx, &p.settings.SV, m.bypass)
p.readPlanner = m
return p
Expand All @@ -210,21 +212,30 @@ type mock struct {

bypass bool

noPlan bool
planErr error
noPlan bool
emptyQPool bool
planErr error

read bool
write bool
qWrite bool
readErr error
writeErr error
txnErr error
}

func (m *mock) next(ctx context.Context) (Step, error) {
step := Step{}
if m.noPlan {
m.t.Error("plan call made but not expected")
}
return Step{}, m.planErr
if !m.emptyQPool {
step = Step{
RangeID: 1,
Key: keys.LocalMax,
}
}
return step, m.planErr
}

func (m *mock) Read(key interface{}) func(context.Context, *kv.Txn) error {
Expand Down
121 changes: 121 additions & 0 deletions pkg/kv/kvprober/quarantine_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvprober

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

// quarantinePool provides a way of repeatedly probing ranges that
// have previously failed to probe. This provides confirmation of outages
// over a smaller set of ranges at a high confidence.
type quarantinePool struct {
size func() int64 // can change over time
mu struct {
syncutil.Mutex
// steps are the queue of ranges to probe in quarantine.
steps []Step
// entryTimeMap keeps track of when ranges enter the quarantine
// pool. This is used to determine the WriteProbeQuarantineOldestDuration
// metric.
entryTimeMap map[roachpb.RangeID]time.Time
}
}

func newQuarantinePool(settings *cluster.Settings) *quarantinePool {
return &quarantinePool{
size: func() int64 { return quarantinePoolSize.Get(&settings.SV) },
}
}

func (qp *quarantinePool) maybeAdd(ctx context.Context, step Step) (added bool) {
qp.mu.Lock()
defer qp.mu.Unlock()
if _, ok := qp.mu.entryTimeMap[step.RangeID]; ok {
// Already in the pool.
return false
}

size := qp.size()

if int64(len(qp.mu.steps)) >= size {
// The pool is full. Note that we don't log, as we have a full pool of
// failing ranges, and it should thus be clear that the cluster is likely
// experiencing a widespread outage.
//
// Truncate slice in case size() got lowered.
qp.mu.steps = qp.mu.steps[:size]
return false
}
qp.mu.steps = append(qp.mu.steps, step)
if qp.mu.entryTimeMap == nil {
qp.mu.entryTimeMap = map[roachpb.RangeID]time.Time{}
}
qp.mu.entryTimeMap[step.RangeID] = timeutil.Now()
return true
}

func (qp *quarantinePool) oldestDuration() time.Duration {
qp.mu.Lock()
defer qp.mu.Unlock()

now := timeutil.Now()
var max time.Duration
for _, then := range qp.mu.entryTimeMap {
dur := now.Sub(then)
if dur > max {
max = dur
}
}
return max
}

func (qp *quarantinePool) maybeRemove(ctx context.Context, step Step) {
qp.mu.Lock()
defer qp.mu.Unlock()
if _, found := qp.mu.entryTimeMap[step.RangeID]; !found {
return
}
delete(qp.mu.entryTimeMap, step.RangeID)
idx := -1
for k, v := range qp.mu.steps {
if v.RangeID == step.RangeID {
idx = k
break
}
}
if idx == -1 {
// This is a programming error. We had an entry in entryTimeMap, but can't
// find the corresponding step.
log.Health.Errorf(ctx, "inconsistent from quarantine pool: %s not found", step.RangeID)
return
}
qp.mu.steps = append(qp.mu.steps[:idx], qp.mu.steps[idx+1:]...)
}

func (qp *quarantinePool) next(ctx context.Context) (Step, error) {
qp.mu.Lock()
defer qp.mu.Unlock()

if len(qp.mu.steps) == 0 {
return Step{}, nil
}

step := qp.mu.steps[0]
return step, nil
}
Loading

0 comments on commit b6f8265

Please sign in to comment.