Skip to content

Commit

Permalink
kv: update kvprober with quarantine pool
Browse files Browse the repository at this point in the history
These changes update the kvprober to add ranges that
fail probing into a quarantine pool where they are
continuously probed. A metric which indicates the
duration of the longest tenured range has also been
added.

Resolves cockroachdb#74407

Release justification: low risk, high benefit changes to
existing functionality.

Release note: None
  • Loading branch information
Santamaura committed Sep 8, 2022
1 parent d33e93f commit 74cdeff
Show file tree
Hide file tree
Showing 2 changed files with 255 additions and 15 deletions.
186 changes: 171 additions & 15 deletions pkg/kv/kvprober/kvprober.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
)

Expand All @@ -53,6 +55,54 @@ type Prober struct {
// goal of the prober IS to populate these metrics.
metrics Metrics
tracer *tracing.Tracer
// quarantine pools keep track of ranges that timed/errored when probing and
// continually probe those ranges until killed/probe is succesful
quarantineReadPool *quarantinePool
quarantineWritePool *quarantinePool
}

type quarantinePool struct {
steps []Step
size int64
entryTimeMap map[roachpb.RangeID]time.Time
}

func newQuarantinePool(settings *cluster.Settings) *quarantinePool {
return &quarantinePool{size: quarantinePoolSize.Get(&settings.SV)}
}

func (qp *quarantinePool) add(ctx context.Context, step Step) {
if int64(len(qp.steps)) >= qp.size-1 {
log.Health.Errorf(ctx, "cannot add range %s to quarantine pool, at capacity", step.RangeID.String())
} else {
qp.steps = append(qp.steps, step)
}
}

func (qp *quarantinePool) remove(ctx context.Context, step Step) {
if len(qp.steps) < 1 {
log.Health.Errorf(ctx, "cannot remove range %s from quarantine pool, pool is empty", step.RangeID.String())
}
idx := -1
for k, v := range qp.steps {
if v.RangeID == step.RangeID {
idx = k
break
}
}
if idx == -1 {
log.Health.Errorf(ctx, "cannot remove range %s from quarantine pool, not found", step.RangeID.String())
}
// expensive op if pool size is very large.
qp.steps = append(qp.steps[:idx], qp.steps[idx+1:]...)
}

func (qp *quarantinePool) next() (Step, error) {
if len(qp.steps) > 0 {
return qp.steps[rand.Intn(len(qp.steps))], nil
} else {
return Step{}, errors.New("there are no keys in quarantine")
}
}

// Opts provides knobs to control kvprober.Prober.
Expand Down Expand Up @@ -120,21 +170,37 @@ var (
Measurement: "Runs",
Unit: metric.Unit_COUNT,
}
metaReadProbeQuarantineOldestDuration = metric.Metadata{
Name: "kv.prober.read.quarantine.oldest_duration",
Help: "The duration that the oldest range in the " +
"read quarantine pool has remained",
Measurement: "Seconds",
Unit: metric.Unit_SECONDS,
}
metaWriteProbeQuarantineOldestDuration = metric.Metadata{
Name: "kv.prober.write.quarantine.oldest_duration",
Help: "The duration that the oldest range in the " +
"write quarantine pool has remained",
Measurement: "Seconds",
Unit: metric.Unit_SECONDS,
}
// TODO(josh): Add a histogram that captures where in the "rangespace" errors
// are occurring. This will allow operators to see at a glance what percentage
// of ranges are affected.
)

// Metrics groups together the metrics that kvprober exports.
type Metrics struct {
ReadProbeAttempts *metric.Counter
ReadProbeFailures *metric.Counter
ReadProbeLatency *metric.Histogram
WriteProbeAttempts *metric.Counter
WriteProbeFailures *metric.Counter
WriteProbeLatency *metric.Histogram
ProbePlanAttempts *metric.Counter
ProbePlanFailures *metric.Counter
ReadProbeAttempts *metric.Counter
ReadProbeFailures *metric.Counter
ReadProbeLatency *metric.Histogram
WriteProbeAttempts *metric.Counter
WriteProbeFailures *metric.Counter
WriteProbeLatency *metric.Histogram
ProbePlanAttempts *metric.Counter
ProbePlanFailures *metric.Counter
ReadProbeQuarantineOldestDuration *metric.Gauge
WriteProbeQuarantineOldestDuration *metric.Gauge
}

// proberOpsI is an interface that the prober will use to run ops against some
Expand Down Expand Up @@ -224,10 +290,14 @@ func NewProber(opts Opts) *Prober {
WriteProbeLatency: metric.NewHistogram(
metaWriteProbeLatency, opts.HistogramWindowInterval, metric.NetworkLatencyBuckets,
),
ProbePlanAttempts: metric.NewCounter(metaProbePlanAttempts),
ProbePlanFailures: metric.NewCounter(metaProbePlanFailures),
ProbePlanAttempts: metric.NewCounter(metaProbePlanAttempts),
ProbePlanFailures: metric.NewCounter(metaProbePlanFailures),
ReadProbeQuarantineOldestDuration: metric.NewGauge(metaReadProbeQuarantineOldestDuration),
WriteProbeQuarantineOldestDuration: metric.NewGauge(metaWriteProbeQuarantineOldestDuration),
},
tracer: opts.Tracer,
tracer: opts.Tracer,
quarantineReadPool: newQuarantinePool(opts.Settings),
quarantineWritePool: newQuarantinePool(opts.Settings),
}
}

Expand All @@ -240,7 +310,7 @@ func (p *Prober) Metrics() Metrics {
// returns an error only if stopper.RunAsyncTask returns an error.
func (p *Prober) Start(ctx context.Context, stopper *stop.Stopper) error {
ctx = logtags.AddTag(ctx, "kvprober", nil /* value */)
startLoop := func(ctx context.Context, opName string, probe func(context.Context, *kv.DB, planner), pl planner, interval *settings.DurationSetting) error {
startLoop := func(ctx context.Context, opName string, probe func(context.Context, *kv.DB, planner), qProbe func(context.Context, *kv.DB, bool), pl planner, interval *settings.DurationSetting, isQuarantine bool, isWrite bool) error {
return stopper.RunAsyncTaskEx(ctx, stop.TaskOpts{TaskName: opName, SpanOpt: stop.SterileRootSpan}, func(ctx context.Context) {
defer logcrash.RecoverAndReportNonfatalPanic(ctx, &p.settings.SV)

Expand All @@ -266,16 +336,26 @@ func (p *Prober) Start(ctx context.Context, stopper *stop.Stopper) error {
}

probeCtx, sp := tracing.EnsureChildSpan(ctx, p.tracer, opName+" - probe")
probe(probeCtx, p.db, pl)
if isQuarantine {
qProbe(probeCtx, p.db, isWrite)
} else {
probe(probeCtx, p.db, pl)
}
sp.Finish()
}
})
}

if err := startLoop(ctx, "read probe loop", p.readProbe, p.readPlanner, readInterval); err != nil {
if err := startLoop(ctx, "read probe loop", p.readProbe, nil, p.readPlanner, readInterval, false, false); err != nil {
return err
}
return startLoop(ctx, "write probe loop", p.writeProbe, p.writePlanner, writeInterval)
if err := startLoop(ctx, "write probe loop", p.writeProbe, nil, p.writePlanner, writeInterval, false, false); err != nil {
return err
}
if err := startLoop(ctx, "quarantine read probe loop", nil, p.quarantineProbe, nil, quarantineReadInterval, true, false); err != nil {
return err
}
return startLoop(ctx, "quarantine write probe loop", nil, p.quarantineProbe, nil, quarantineWriteInterval, true, true)
}

// Doesn't return an error. Instead increments error type specific metrics.
Expand Down Expand Up @@ -326,6 +406,7 @@ func (p *Prober) readProbeImpl(ctx context.Context, ops proberOpsI, txns proberT
// TODO(josh): Write structured events with log.Structured.
log.Health.Errorf(ctx, "kv.Get(%s), r=%v failed with: %v", step.Key, step.RangeID, err)
p.metrics.ReadProbeFailures.Inc(1)
p.quarantineReadPool.add(ctx, step)
return
}

Expand Down Expand Up @@ -372,6 +453,7 @@ func (p *Prober) writeProbeImpl(ctx context.Context, ops proberOpsI, txns prober
if err != nil {
log.Health.Errorf(ctx, "kv.Txn(Put(%s); Del(-)), r=%v failed with: %v", step.Key, step.RangeID, err)
p.metrics.WriteProbeFailures.Inc(1)
p.quarantineWritePool.add(ctx, step)
return
}

Expand All @@ -382,6 +464,80 @@ func (p *Prober) writeProbeImpl(ctx context.Context, ops proberOpsI, txns prober
p.metrics.WriteProbeLatency.RecordValue(d.Nanoseconds())
}

func (p *Prober) quarantineProbe(ctx context.Context, db *kv.DB, isWrite bool) {
p.quarantineProbeImpl(ctx, &ProberOps{}, &proberTxnImpl{db: p.db}, isWrite)
}

func (p *Prober) quarantineProbeImpl(ctx context.Context, ops proberOpsI, txns proberTxn, isWrite bool) {
if !isWrite && !quarantineReadEnabled.Get(&p.settings.SV) {
return
} else if isWrite && !quarantineWriteEnabled.Get(&p.settings.SV) {
return
}

// Set quarantine pool to use.
quarantinePool := p.quarantineReadPool
timeout := quarantineReadTimeout.Get(&p.settings.SV)
opName := "quarantine read probe"
if isWrite {
quarantinePool = p.quarantineWritePool
timeout = quarantineWriteTimeout.Get(&p.settings.SV)
opName = "quarantine write probe"
}
step, err := quarantinePool.next()
if err != nil {
log.Health.Errorf(ctx, "can't make a plan: %v", err)
return
}

// If the range is already in the quarantine pool it's not necessary to continue.
if _, ok := quarantinePool.entryTimeMap[step.RangeID]; ok {
return
}

quarantinePool.entryTimeMap[step.RangeID] = timeutil.Now()
start := timeutil.Now()
// Probe same range until it doesn't error out (or hits the max retries?).
for {
err = contextutil.RunWithTimeout(ctx, opName, timeout, func(ctx context.Context) error {
var f func(context.Context, *kv.Txn) error
if isWrite {
f = ops.Write(step.Key)
} else {
f = ops.Read(step.Key)
}
if bypassAdmissionControl.Get(&p.settings.SV) {
return txns.Txn(ctx, f)
}
return txns.TxnRootKV(ctx, f)
})
if err != nil {
// Set longest time in quarantine metric if needed.
log.Health.Errorf(ctx, "quarantine pool: kv.Txn(Put(%s); Del(-)), r=%v failed with: %v", step.Key, step.RangeID, err)
duration := int64(timeutil.Since(quarantinePool.entryTimeMap[step.RangeID]).Seconds())
if isWrite {
longestDuration := p.metrics.WriteProbeQuarantineOldestDuration.Value()
if duration > longestDuration {
p.metrics.WriteProbeQuarantineOldestDuration.Update(duration)
}
} else {
longestDuration := p.metrics.ReadProbeQuarantineOldestDuration.Value()
if duration > longestDuration {
p.metrics.ReadProbeQuarantineOldestDuration.Update(duration)
}
}

} else {
quarantinePool.remove(ctx, step)
delete(quarantinePool.entryTimeMap, step.RangeID)
break
}
}
d := timeutil.Since(start)
log.Health.Infof(ctx, "quarantine pool: kv.Txn(Put(%s); Del(-)), r=%v returned success in %v. Removed from pool.", step.Key, step.RangeID, d)

}

// Returns a random duration pulled from the uniform distribution given below:
// [d - 0.25*d, d + 0.25*d).
func withJitter(d time.Duration, rnd *rand.Rand) time.Duration {
Expand Down
84 changes: 84 additions & 0 deletions pkg/kv/kvprober/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,87 @@ var numStepsToPlanAtOnce = settings.RegisterIntSetting(
}
return nil
})

var quarantinePoolSize = settings.RegisterIntSetting(
settings.TenantWritable,
"kv.prober.quarantine_pool_size",
"the maximum size of the kv prober quarantine pool, where the quarantine "+
"pool holds Steps for ranges that have been probed and timed out; If "+
"the quarantine pool is full, probes that fail will not be added to "+
" the pool",
100, func(i int64) error {
if i <= 0 {
return errors.New("param must be >0")
}
return nil
})

var quarantineReadEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
"kv.prober.quarantine.read.enabled",
"whether the KV read prober is enabled for the quarantine pool",
false)

var quarantineReadInterval = settings.RegisterDurationSetting(
settings.TenantWritable,
"kv.prober.quarantine.read.interval",
"how often each node sends a read probe for the quarantine pool to the KV layer "+
"on average (jitter is added); "+
"note that a very slow read can block kvprober from sending additional probes; "+
"kv.prober.read.timeout controls the max time kvprober can be blocked",
1*time.Minute, func(duration time.Duration) error {
if duration <= 0 {
return errors.New("param must be >0")
}
return nil
})

var quarantineReadTimeout = settings.RegisterDurationSetting(
settings.TenantWritable,
"kv.prober.quarantine.read.timeout",
// Slow enough response times are not different than errors from the
// perspective of the user.
"if this much time elapses without success, a KV read probe for the quarantine pool will be treated as an error; "+
"note that a very slow read can block kvprober from sending additional probes"+
"this setting controls the max time kvprober can be blocked",
2*time.Second, func(duration time.Duration) error {
if duration <= 0 {
return errors.New("param must be >0")
}
return nil
})

var quarantineWriteEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
"kv.prober.quarantine.write.enabled",
"whether the KV write prober is enabled for the quaranatine pool",
false)

var quarantineWriteInterval = settings.RegisterDurationSetting(
settings.TenantWritable,
"kv.prober.quarantine.write.interval",
"how often each node sends a write probe for the quarantine pool to the KV layer "+
"on average (jitter is added); "+
"note that a very slow read can block kvprober from sending additional probes; "+
"kv.prober.write.timeout controls the max time kvprober can be blocked",
10*time.Second, func(duration time.Duration) error {
if duration <= 0 {
return errors.New("param must be >0")
}
return nil
})

var quarantineWriteTimeout = settings.RegisterDurationSetting(
settings.TenantWritable,
"kv.prober.quarantine.write.timeout",
// Slow enough response times are not different than errors from the
// perspective of the user.
"if this much time elapses without success, a KV write probe for the quarantine pool will be treated as an error; "+
"note that a very slow read can block kvprober from sending additional probes"+
"this setting controls the max time kvprober can be blocked",
4*time.Second, func(duration time.Duration) error {
if duration <= 0 {
return errors.New("param must be >0")
}
return nil
})

0 comments on commit 74cdeff

Please sign in to comment.