Skip to content

Commit

Permalink
admission: CPU slot adjustment and utilization metrics
Browse files Browse the repository at this point in the history
Our existing metrics are gauges (total and used slots) which don't give us
insight into what is happening at smaller time scales. This creates
uncertainty when we observe admission queueing but the gauge samples show
total slots consistenly greater than used slots. Additionally, if total
slots is steady during queuing, it doesn't tell us whether that was because
of roughly matching increments or decrements, or no increments/decrements.

The following metrics are added:
- admission.granter.slots_exhausted_duration.kv: cumulative duration when
  the slots were exhausted. This can give insight into how much exhaustion
  was occurring. It is insufficient to tell us whether 0.5sec/sec of
  exhaustion is due to a long 500ms of exhaustion and then non-exhaustion
  or alternating 1ms of exhaustion and non-exhaustion. But this is an
  improvement over what we have.
- admission.granter.slot_adjuster_{increments,decrements}.kv: Counts the
  increments and decrements of the total slots.
- admission.granter.cpu_load_{short,long}_period_duration.kv: cumulative
  duration of short and long ticks, as indicated by the period in the
  CPULoad callback. We don't expect long period ticks when admission
  control is active (and we explicitly disable enforcement during long
  period ticks), but it helps us eliminate some hypothesis during
  incidents (e.g. long period ticks alternating with short period ticks
  causing a slow down in how fast we increment slots). Additionally, the
  sum of the rate of these two, if significantly < 1, would indicate that
  CPULoad frequency is lower than expected, say due to CPU overload.

Fixes cockroachdb#92673

Epic: none
  • Loading branch information
sumeerbhola committed Jan 10, 2023
1 parent 0cf6cc8 commit 343b76a
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 38 deletions.
1 change: 1 addition & 0 deletions pkg/testutils/lint/lint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2023,6 +2023,7 @@ func TestLint(t *testing.T) {
"../../storage",
"../../storage/enginepb",
"../../util",
"../../util/admission",
"../../util/hlc",
"../../util/intsets",
}
Expand Down
71 changes: 44 additions & 27 deletions pkg/util/admission/grant_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,13 +431,17 @@ func makeRegularGrantCoordinator(
}

kvSlotAdjuster := &kvSlotAdjuster{
settings: st,
minCPUSlots: opts.MinCPUSlots,
maxCPUSlots: opts.MaxCPUSlots,
totalSlotsMetric: metrics.KVTotalSlots,
totalModerateSlotsMetric: metrics.KVTotalModerateSlots,
moderateSlotsClamp: opts.MaxCPUSlots,
runnableAlphaOverride: opts.RunnableAlphaOverride,
settings: st,
minCPUSlots: opts.MinCPUSlots,
maxCPUSlots: opts.MaxCPUSlots,
moderateSlotsClamp: opts.MaxCPUSlots,
runnableAlphaOverride: opts.RunnableAlphaOverride,
totalSlotsMetric: metrics.KVTotalSlots,
totalModerateSlotsMetric: metrics.KVTotalModerateSlots,
cpuLoadShortPeriodDurationMetric: metrics.KVCPULoadShortPeriodDuration,
cpuLoadLongPeriodDurationMetric: metrics.KVCPULoadLongPeriodDuration,
slotAdjusterIncrementsMetric: metrics.KVSlotAdjusterIncrements,
slotAdjusterDecrementsMetric: metrics.KVSlotAdjusterDecrements,
}
coord := &GrantCoordinator{
ambientCtx: ambientCtx,
Expand All @@ -451,12 +455,13 @@ func makeRegularGrantCoordinator(
}

kvg := &slotGranter{
coord: coord,
workKind: KVWork,
totalHighLoadSlots: opts.MinCPUSlots,
totalModerateLoadSlots: opts.MinCPUSlots,
usedSlotsMetric: metrics.KVUsedSlots,
usedSoftSlotsMetric: metrics.KVUsedSoftSlots,
coord: coord,
workKind: KVWork,
totalHighLoadSlots: opts.MinCPUSlots,
totalModerateLoadSlots: opts.MinCPUSlots,
usedSlotsMetric: metrics.KVUsedSlots,
usedSoftSlotsMetric: metrics.KVUsedSoftSlots,
slotsExhaustedDurationMetric: metrics.KVSlotsExhaustedDuration,
}

kvSlotAdjuster.granter = kvg
Expand Down Expand Up @@ -946,27 +951,39 @@ func (coord *GrantCoordinator) SafeFormat(s redact.SafePrinter, verb rune) {

// GrantCoordinatorMetrics are metrics associated with a GrantCoordinator.
type GrantCoordinatorMetrics struct {
KVTotalSlots *metric.Gauge
KVUsedSlots *metric.Gauge
KVTotalModerateSlots *metric.Gauge
KVUsedSoftSlots *metric.Gauge
KVIOTokensExhaustedDuration *metric.Counter
SQLLeafStartUsedSlots *metric.Gauge
SQLRootStartUsedSlots *metric.Gauge
KVTotalSlots *metric.Gauge
KVUsedSlots *metric.Gauge
KVTotalModerateSlots *metric.Gauge
KVUsedSoftSlots *metric.Gauge
KVSlotsExhaustedDuration *metric.Counter
KVCPULoadShortPeriodDuration *metric.Counter
KVCPULoadLongPeriodDuration *metric.Counter
KVSlotAdjusterIncrements *metric.Counter
KVSlotAdjusterDecrements *metric.Counter
KVIOTokensExhaustedDuration *metric.Counter
SQLLeafStartUsedSlots *metric.Gauge
SQLRootStartUsedSlots *metric.Gauge
}

// MetricStruct implements the metric.Struct interface.
func (GrantCoordinatorMetrics) MetricStruct() {}

func makeGrantCoordinatorMetrics() GrantCoordinatorMetrics {
m := GrantCoordinatorMetrics{
KVTotalSlots: metric.NewGauge(totalSlots),
KVUsedSlots: metric.NewGauge(addName(workKindString(KVWork), usedSlots)),
KVTotalModerateSlots: metric.NewGauge(totalModerateSlots),
KVUsedSoftSlots: metric.NewGauge(usedSoftSlots),
KVIOTokensExhaustedDuration: metric.NewCounter(kvIOTokensExhaustedDuration),
SQLLeafStartUsedSlots: metric.NewGauge(addName(workKindString(SQLStatementLeafStartWork), usedSlots)),
SQLRootStartUsedSlots: metric.NewGauge(addName(workKindString(SQLStatementRootStartWork), usedSlots)),
KVTotalSlots: metric.NewGauge(totalSlots),
KVUsedSlots: metric.NewGauge(addName(workKindString(KVWork), usedSlots)),
// TODO(sumeer): remove moderate load slots and soft slots code and
// metrics.
KVTotalModerateSlots: metric.NewGauge(totalModerateSlots),
KVUsedSoftSlots: metric.NewGauge(usedSoftSlots),
KVSlotsExhaustedDuration: metric.NewCounter(kvSlotsExhaustedDuration),
KVCPULoadShortPeriodDuration: metric.NewCounter(kvCPULoadShortPeriodDuration),
KVCPULoadLongPeriodDuration: metric.NewCounter(kvCPULoadLongPeriodDuration),
KVSlotAdjusterIncrements: metric.NewCounter(kvSlotAdjusterIncrements),
KVSlotAdjusterDecrements: metric.NewCounter(kvSlotAdjusterDecrements),
KVIOTokensExhaustedDuration: metric.NewCounter(kvIOTokensExhaustedDuration),
SQLLeafStartUsedSlots: metric.NewGauge(addName(workKindString(SQLStatementLeafStartWork), usedSlots)),
SQLRootStartUsedSlots: metric.NewGauge(addName(workKindString(SQLStatementRootStartWork), usedSlots)),
}
return m
}
Expand Down
76 changes: 76 additions & 0 deletions pkg/util/admission/granter.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ type slotGranter struct {

usedSlotsMetric *metric.Gauge
usedSoftSlotsMetric *metric.Gauge
// Non-nil for KV slots.
slotsExhaustedDurationMetric *metric.Counter
exhaustedStart time.Time
}

var _ granterWithLockedCalls = &slotGranter{}
Expand All @@ -127,6 +130,9 @@ func (sg *slotGranter) tryGetLocked(count int64, _ int8) grantResult {
}
if sg.usedSlots < sg.totalHighLoadSlots || sg.skipSlotEnforcement {
sg.usedSlots++
if sg.usedSlots == sg.totalHighLoadSlots && sg.slotsExhaustedDurationMetric != nil {
sg.exhaustedStart = timeutil.Now()
}
sg.usedSlotsMetric.Update(int64(sg.usedSlots))
return grantSuccess
}
Expand Down Expand Up @@ -172,6 +178,11 @@ func (sg *slotGranter) returnGrantLocked(count int64, _ int8) {
if count != 1 {
panic(errors.AssertionFailedf("unexpected count: %d", count))
}
if sg.usedSlots == sg.totalHighLoadSlots && sg.slotsExhaustedDurationMetric != nil {
now := timeutil.Now()
exhaustedMicros := now.Sub(sg.exhaustedStart).Microseconds()
sg.slotsExhaustedDurationMetric.Inc(exhaustedMicros)
}
sg.usedSlots--
if sg.usedSlots < 0 {
panic(errors.AssertionFailedf("used slots is negative %d", sg.usedSlots))
Expand All @@ -190,6 +201,9 @@ func (sg *slotGranter) tookWithoutPermissionLocked(count int64, _ int8) {
panic(errors.AssertionFailedf("unexpected count: %d", count))
}
sg.usedSlots++
if sg.usedSlots == sg.totalHighLoadSlots && sg.slotsExhaustedDurationMetric != nil {
sg.exhaustedStart = timeutil.Now()
}
sg.usedSlotsMetric.Update(int64(sg.usedSlots))
}

Expand Down Expand Up @@ -219,6 +233,32 @@ func (sg *slotGranter) tryGrantLocked(grantChainID grantChainID) grantResult {
return res
}

//gcassert:inline
func (sg *slotGranter) setTotalHighLoadSlotsLocked(totalHighLoadSlots int) {
// Mid-stack inlining.
if totalHighLoadSlots == sg.totalHighLoadSlots {
return
}
sg.setTotalHighLoadSlotsLockedInternal(totalHighLoadSlots)
}

func (sg *slotGranter) setTotalHighLoadSlotsLockedInternal(totalHighLoadSlots int) {
if sg.slotsExhaustedDurationMetric != nil {
if totalHighLoadSlots > sg.totalHighLoadSlots {
if sg.totalHighLoadSlots <= sg.usedSlots && totalHighLoadSlots > sg.usedSlots {
now := timeutil.Now()
exhaustedMicros := now.Sub(sg.exhaustedStart).Microseconds()
sg.slotsExhaustedDurationMetric.Inc(exhaustedMicros)
}
} else if totalHighLoadSlots < sg.totalHighLoadSlots {
if sg.totalHighLoadSlots > sg.usedSlots && totalHighLoadSlots <= sg.usedSlots {
sg.exhaustedStart = timeutil.Now()
}
}
}
sg.totalHighLoadSlots = totalHighLoadSlots
}

// tokenGranter implements granterWithLockedCalls.
type tokenGranter struct {
coord *GrantCoordinator
Expand Down Expand Up @@ -691,6 +731,42 @@ var (
Measurement: "Slots",
Unit: metric.Unit_COUNT,
}
// NB: this metric is independent of whether slots enforcement is happening
// or not.
kvSlotsExhaustedDuration = metric.Metadata{
Name: "admission.granter.slots_exhausted_duration.kv",
Help: "Total duration when KV slots were exhausted, in micros",
Measurement: "Microseconds",
Unit: metric.Unit_COUNT,
}
// We have a metric for both short and long period. These metrics use the
// period provided in CPULoad and not wall time. So if the sum of the rate
// of these two is < 1sec/sec, the CPULoad ticks are not happening at the
// expected frequency (this could happen due to CPU overload).
kvCPULoadShortPeriodDuration = metric.Metadata{
Name: "admission.granter.cpu_load_short_period_duration.kv",
Help: "Total duration when CPULoad was being called with a short period, in micros",
Measurement: "Microseconds",
Unit: metric.Unit_COUNT,
}
kvCPULoadLongPeriodDuration = metric.Metadata{
Name: "admission.granter.cpu_load_long_period_duration.kv",
Help: "Total duration when CPULoad was being called with a long period, in micros",
Measurement: "Microseconds",
Unit: metric.Unit_COUNT,
}
kvSlotAdjusterIncrements = metric.Metadata{
Name: "admission.granter.slot_adjuster_increments.kv",
Help: "Number of increments of the total KV slots ",
Measurement: "Slots",
Unit: metric.Unit_COUNT,
}
kvSlotAdjusterDecrements = metric.Metadata{
Name: "admission.granter.slot_adjuster_decrements.kv",
Help: "Number of decrements of the total KV slots ",
Measurement: "Slots",
Unit: metric.Unit_COUNT,
}
kvIOTokensExhaustedDuration = metric.Metadata{
Name: "admission.granter.io_tokens_exhausted_duration.kv",
Help: "Total duration when IO tokens were exhausted, in micros",
Expand Down
12 changes: 11 additions & 1 deletion pkg/util/admission/granter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,17 @@ func TestGranterBasic(t *testing.T) {
samplePeriod = 250 * time.Millisecond
}
coord.CPULoad(runnable, procs, samplePeriod)
return flushAndReset()
str := flushAndReset()
kvsa := coord.cpuLoadListener.(*kvSlotAdjuster)
microsToMillis := func(micros int64) int64 {
return micros * int64(time.Microsecond) / int64(time.Millisecond)
}
return fmt.Sprintf("%sSlotAdjuster metrics: slots: %d, duration (short, long) millis: (%d, %d), inc: %d, dec: %d\n",
str, kvsa.totalSlotsMetric.Value(),
microsToMillis(kvsa.cpuLoadShortPeriodDurationMetric.Count()),
microsToMillis(kvsa.cpuLoadLongPeriodDurationMetric.Count()),
kvsa.slotAdjusterIncrementsMetric.Count(), kvsa.slotAdjusterDecrementsMetric.Count(),
)

case "set-io-tokens":
var tokens int
Expand Down
42 changes: 32 additions & 10 deletions pkg/util/admission/kv_slot_adjuster.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,12 @@ type kvSlotAdjuster struct {
// A 0 value indicates that there is no override.
runnableAlphaOverride float64

totalSlotsMetric *metric.Gauge
totalModerateSlotsMetric *metric.Gauge
totalSlotsMetric *metric.Gauge
totalModerateSlotsMetric *metric.Gauge
cpuLoadShortPeriodDurationMetric *metric.Counter
cpuLoadLongPeriodDurationMetric *metric.Counter
slotAdjusterIncrementsMetric *metric.Counter
slotAdjusterDecrementsMetric *metric.Counter
}

var _ cpuOverloadIndicator = &kvSlotAdjuster{}
Expand All @@ -68,6 +72,12 @@ var _ CPULoadListener = &kvSlotAdjuster{}
func (kvsa *kvSlotAdjuster) CPULoad(runnable int, procs int, samplePeriod time.Duration) {
threshold := int(KVSlotAdjusterOverloadThreshold.Get(&kvsa.settings.SV))

periodDurationMicros := samplePeriod.Microseconds()
if samplePeriod > time.Millisecond {
kvsa.cpuLoadLongPeriodDurationMetric.Inc(periodDurationMicros)
} else {
kvsa.cpuLoadShortPeriodDurationMetric.Inc(periodDurationMicros)
}
// 0.009 gives weight to at least a few hundred samples at a 1ms sampling rate.
alpha := 0.009 * float64(samplePeriod/time.Millisecond)
if alpha > 0.5 {
Expand All @@ -83,7 +93,7 @@ func (kvsa *kvSlotAdjuster) CPULoad(runnable int, procs int, samplePeriod time.D
// Simple heuristic, which worked ok in experiments. More sophisticated ones
// could be devised.
usedSlots := kvsa.granter.usedSlots + kvsa.granter.usedSoftSlots
tryDecreaseSlots := func(total int) int {
tryDecreaseSlots := func(total int, adjustMetric bool) int {
// Overload.
// If using some slots, and the used slots is less than the total slots,
// and total slots hasn't bottomed out at the min, decrease the total
Expand All @@ -98,10 +108,13 @@ func (kvsa *kvSlotAdjuster) CPULoad(runnable int, procs int, samplePeriod time.D
// signal or other ways to prevent a fast drop.
if usedSlots > 0 && total > kvsa.minCPUSlots && usedSlots <= total {
total--
if adjustMetric {
kvsa.slotAdjusterDecrementsMetric.Inc(1)
}
}
return total
}
tryIncreaseSlots := func(total int) int {
tryIncreaseSlots := func(total int, adjustMetric bool) int {
// Underload.
// Used all its slots and can increase further, so additive increase. We
// also handle the case where the used slots are a bit less than total
Expand All @@ -114,21 +127,29 @@ func (kvsa *kvSlotAdjuster) CPULoad(runnable int, procs int, samplePeriod time.D
// decrease by 1000 slots every second (because the CPULoad ticks are at
// 1ms intervals, and we do additive decrease).
total++
if adjustMetric {
kvsa.slotAdjusterIncrementsMetric.Inc(1)
}
}
return total
}

if runnable >= threshold*procs {
// Very overloaded.
kvsa.granter.totalHighLoadSlots = tryDecreaseSlots(kvsa.granter.totalHighLoadSlots)
kvsa.granter.totalModerateLoadSlots = tryDecreaseSlots(kvsa.granter.totalModerateLoadSlots)
kvsa.granter.setTotalHighLoadSlotsLocked(
tryDecreaseSlots(kvsa.granter.totalHighLoadSlots, true))
kvsa.granter.totalModerateLoadSlots = tryDecreaseSlots(
kvsa.granter.totalModerateLoadSlots, false)
} else if float64(runnable) <= float64((threshold*procs)/4) {
// Very underloaded.
kvsa.granter.totalHighLoadSlots = tryIncreaseSlots(kvsa.granter.totalHighLoadSlots)
kvsa.granter.totalModerateLoadSlots = tryIncreaseSlots(kvsa.granter.totalModerateLoadSlots)
kvsa.granter.setTotalHighLoadSlotsLocked(
tryIncreaseSlots(kvsa.granter.totalHighLoadSlots, true))
kvsa.granter.totalModerateLoadSlots = tryIncreaseSlots(
kvsa.granter.totalModerateLoadSlots, false)
} else if float64(runnable) <= float64((threshold*procs)/2) {
// Moderately underloaded -- can afford to increase regular slots.
kvsa.granter.totalHighLoadSlots = tryIncreaseSlots(kvsa.granter.totalHighLoadSlots)
kvsa.granter.setTotalHighLoadSlotsLocked(
tryIncreaseSlots(kvsa.granter.totalHighLoadSlots, true))
} else if runnable >= 3*threshold*procs/4 {
// Moderately overloaded -- should decrease moderate load slots.
//
Expand All @@ -144,7 +165,8 @@ func (kvsa *kvSlotAdjuster) CPULoad(runnable int, procs int, samplePeriod time.D
// Where this will help is when what is pushing us over moderate load is
// optional background work, so by decreasing totalModerateLoadSlots we will
// contain the load due to that work.
kvsa.granter.totalModerateLoadSlots = tryDecreaseSlots(kvsa.granter.totalModerateLoadSlots)
kvsa.granter.totalModerateLoadSlots = tryDecreaseSlots(
kvsa.granter.totalModerateLoadSlots, false)
}
// Consider the following cases, when we started this method with
// totalHighLoadSlots==totalModerateLoadSlots.
Expand Down
Loading

0 comments on commit 343b76a

Please sign in to comment.