Skip to content

Commit

Permalink
[DNM] admission: introduce soft slots for speculative concurrency
Browse files Browse the repository at this point in the history
This is for using additional theads for Pebble compaction compression
and later for adjusting the number of concurrent compactions.

Since these slots are not acquired or released in a very fine grained
manner, we've chosen to model these in way that allows for
over-commitment. The SoftSlotGranter should be used for acquiring
and releasing such slots, instead of WorkQueue, since there is no
queueing for such slots.

Release note: None
  • Loading branch information
sumeerbhola committed Apr 12, 2022
1 parent 531eee8 commit 1c6b211
Show file tree
Hide file tree
Showing 3 changed files with 513 additions and 82 deletions.
211 changes: 189 additions & 22 deletions pkg/util/admission/granter.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,12 +451,66 @@ func (tg *tokenGranter) continueGrantChain(grantChainID grantChainID) {
// kvGranter implements granterWithLockedCalls. It is used for grants to
// KVWork, that are limited by slots (CPU bound work) and/or tokens (IO
// bound work).
//
// In production, a kvGranter is doing either slots (ioTokensEnabled=false),
// or tokens (totalSlots=MaxInt). The former is used for (typically) cpu-bound
// work (KV and storage layer) across the whole node, and latter for the
// per-store write admission control (see StoreGrantCoordinators).
//
//

// For the cpu-bound slot case we have background activities (like Pebble
// compactions) that would like to utilize additional slots if available (e.g.
// to do concurrent compression of ssblocks). These activities do not want to
// wait for a slot, since they can proceed without the slot at their usual
// slower pace (e.g. without doing concurrent compression). They also are
// sensitive to small overheads in their tight loops, and cannot afford the
// overhead of interacting with admission control at a fine granularity (like
// asking for a slot when compressing each ssblock). A coarse granularity
// interaction causes a delay in returning slots to admission control, and we
// don't want that delay to cause admission delay for normal work. Hence, we
// model slots granted to background activities as "soft-slots". Think of
// regular used slots as "hard-slots", in that we assume that the holder of
// the slot is still "using" it, while a soft-slot is "squishy" and in some
// cases we can pretend that it is not being used. Say we are allowed
// to allocate up to M slots. In this scheme, when allocating a soft-slot
// one must conform to usedSoftSlots+usedSlots <= M, and when allocating
// a regular (hard) slot one must conform to usedSlots <= M.
//
// That is, soft-slots allow for over-commitment until the soft-slots are
// returned, which may mean some additional queueing in the goroutine
// scheduler.
//
// We have another wrinkle in that we do not want to maintain a single M. For
// these optional background activities we desire to do them only when the
// load is low enough. This is because at high load, all work suffers from
// additional queueing in the goroutine scheduler. So we want to make sure
// regular work does not suffer such goroutine scheduler queueing because we
// granted too many soft-slots and caused CPU utilization to be high. So we
// maintain two kinds of M, totalHighLoadSlots and totalModerateLoadSlots.
// totalHighLoadSlots are estimated so as to allow CPU utilization to be high,
// while totalModerateLoadSlots are trying to keep queuing in the goroutine
// scheduler to a lower level. So the revised equations for allocation are:
// - Allocating a soft-slot: usedSoftSlots+usedSlots <= totalModerateLoadSlots
// - Allocating a regular slot: usedSlots <= totalHighLoadSlots
//
// NB: we may in the future add other kinds of background activities that do
// not have a lag in interacting with admission control, but want to schedule
// them only under moderate load. Those activities will be counted in
// usedSlots but when granting a slot to such an activity, the equation will
// be usedSoftSlots+usedSlots <= totalModerateLoadSlots.
//
// That is, let us not confuse that moderate load slot allocation is only for
// soft-slots. Soft-slots are introduced only for squishiness.
type kvGranter struct {
coord *GrantCoordinator
requester requester
usedSlots int
totalSlots int
skipSlotEnforcement bool
coord *GrantCoordinator
requester requester
usedSlots int
usedSoftSlots int
totalHighLoadSlots int
totalModerateLoadSlots int
skipSlotEnforcement bool
failedSoftSlotsGet bool

ioTokensEnabled bool
// There is no rate limiting in granting these tokens. That is, they are all
Expand Down Expand Up @@ -486,7 +540,7 @@ func (sg *kvGranter) tryGet() bool {
}

func (sg *kvGranter) tryGetLocked() grantResult {
if sg.usedSlots < sg.totalSlots || sg.skipSlotEnforcement {
if sg.usedSlots < sg.totalHighLoadSlots || sg.skipSlotEnforcement {
if !sg.ioTokensEnabled || sg.availableIOTokens > 0 {
sg.usedSlots++
if sg.usedSlotsMetric != nil {
Expand Down Expand Up @@ -555,6 +609,31 @@ func (sg *kvGranter) setAvailableIOTokensLocked(tokens int64) {
}
}

func (sg *kvGranter) tryGetSoftSlots(count int) int {
sg.coord.mu.Lock()
defer sg.coord.mu.Unlock()
spareModerateLoadSlots := sg.totalModerateLoadSlots - sg.usedSoftSlots - sg.usedSlots
if spareModerateLoadSlots <= 0 {
sg.failedSoftSlotsGet = true
return 0
}
allocatedSlots := count
if allocatedSlots > spareModerateLoadSlots {
allocatedSlots = spareModerateLoadSlots
}
sg.usedSoftSlots += allocatedSlots
return allocatedSlots
}

func (sg *kvGranter) returnSoftSlots(count int) {
sg.coord.mu.Lock()
defer sg.coord.mu.Unlock()
sg.usedSoftSlots -= count
if sg.usedSoftSlots < 0 {
panic("used soft slots is negative")
}
}

// GrantCoordinator is the top-level object that coordinates grants across
// different WorkKinds (for more context see the comment in doc.go, and the
// comment where WorkKind is declared). Typically there will one
Expand Down Expand Up @@ -715,9 +794,10 @@ func NewGrantCoordinators(
}

kvg := &kvGranter{
coord: coord,
totalSlots: opts.MinCPUSlots,
usedSlotsMetric: metrics.KVUsedSlots,
coord: coord,
totalHighLoadSlots: opts.MinCPUSlots,
totalModerateLoadSlots: opts.MinCPUSlots,
usedSlotsMetric: metrics.KVUsedSlots,
}
kvSlotAdjuster.granter = kvg
coord.queues[KVWork] = makeRequester(ambientCtx, KVWork, kvg, st, makeWorkQueueOptions(KVWork))
Expand Down Expand Up @@ -1175,7 +1255,11 @@ func (coord *GrantCoordinator) SafeFormat(s redact.SafePrinter, verb rune) {
switch kind {
case KVWork:
g := coord.granters[i].(*kvGranter)
s.Printf("%s%s: used: %d, total: %d", curSep, workKindString(kind), g.usedSlots, g.totalSlots)
s.Printf("%s%s: used: %d, high(moderate)-total: %d(%d)", curSep, workKindString(kind),
g.usedSlots, g.totalHighLoadSlots, g.totalModerateLoadSlots)
if g.usedSoftSlots > 0 {
s.Printf(" used-soft: %d", g.usedSoftSlots)
}
if g.ioTokensEnabled {
s.Printf(" io-avail: %d", g.availableIOTokens)
}
Expand Down Expand Up @@ -1295,7 +1379,7 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID int32) *GrantCoo
kvg := &kvGranter{
coord: coord,
// Unlimited slots since not constrained by CPU.
totalSlots: math.MaxInt32,
totalHighLoadSlots: math.MaxInt32,
ioTokensExhaustedDurationMetric: sgc.kvIOTokensExhaustedDuration,
}
opts := makeWorkQueueOptions(KVWork)
Expand Down Expand Up @@ -1398,7 +1482,8 @@ func (kvsa *kvSlotAdjuster) CPULoad(runnable int, procs int, _ time.Duration) {

// Simple heuristic, which worked ok in experiments. More sophisticated ones
// could be devised.
if runnable >= threshold*procs {
usedSlots := kvsa.granter.usedSlots + kvsa.granter.usedSoftSlots
tryDecreaseSlots := func(total int) int {
// Overload.
// If using some slots, and the used slots is less than the total slots,
// and total slots hasn't bottomed out at the min, decrease the total
Expand All @@ -1411,29 +1496,81 @@ func (kvsa *kvSlotAdjuster) CPULoad(runnable int, procs int, _ time.Duration) {
// so it is suggests that the drop in slots should not be causing cpu
// under-utilization, but one cannot be sure. Experiment with a smoothed
// signal or other ways to prevent a fast drop.
if kvsa.granter.usedSlots > 0 && kvsa.granter.totalSlots > kvsa.minCPUSlots &&
kvsa.granter.usedSlots <= kvsa.granter.totalSlots {
kvsa.granter.totalSlots--
if usedSlots > 0 && total > kvsa.minCPUSlots && usedSlots <= total {
total--
}
} else if float64(runnable) <= float64((threshold*procs)/2) {
return total
}
tryIncreaseSlots := func(total int) int {
// TODO: 0.8 is arbitrary.
closeToTotalSlots := int(float64(total) * 0.8)
// Underload.
// Used all its slots and can increase further, so additive increase.
if kvsa.granter.usedSlots >= kvsa.granter.totalSlots &&
kvsa.granter.totalSlots < kvsa.maxCPUSlots {
// Used all its slots and can increase further, so additive increase. We
// also handle the case where the used slots are a bit less than total
// slots, since callers for soft slots don't block.
if (usedSlots >= total || (usedSlots >= closeToTotalSlots && kvsa.granter.failedSoftSlotsGet)) &&
total < kvsa.maxCPUSlots {
// NB: If the workload is IO bound, the slot count here will keep
// incrementing until these slots are no longer the bottleneck for
// admission. So it is not unreasonable to see this slot count go into
// the 1000s. If the workload switches to being CPU bound, we can
// decrease by 1000 slots every second (because the CPULoad ticks are at
// 1ms intervals, and we do additive decrease).
kvsa.granter.totalSlots++
total++
}
return total
}
kvsa.totalSlotsMetric.Update(int64(kvsa.granter.totalSlots))

// TODO: the fractions below are arbitrary and subject to tuning.
if runnable >= threshold*procs {
// Very overloaded.
kvsa.granter.totalHighLoadSlots = tryDecreaseSlots(kvsa.granter.totalHighLoadSlots)
kvsa.granter.totalModerateLoadSlots = tryDecreaseSlots(kvsa.granter.totalModerateLoadSlots)
} else if float64(runnable) <= float64((threshold*procs)/4) {
// Very underloaded.
kvsa.granter.totalHighLoadSlots = tryIncreaseSlots(kvsa.granter.totalHighLoadSlots)
kvsa.granter.totalModerateLoadSlots = tryIncreaseSlots(kvsa.granter.totalModerateLoadSlots)
} else if float64(runnable) <= float64((threshold*procs)/2) {
// Moderately underloaded -- can afford to increase regular slots.
kvsa.granter.totalHighLoadSlots = tryIncreaseSlots(kvsa.granter.totalHighLoadSlots)
} else if runnable >= 3*threshold*procs/4 {
// Moderately overloaded -- should decrease moderate load slots.
//
// NB: decreasing moderate load slots may not halt the runnable growth
// since the regular traffic may be high and can use up to the high load
// slots. When usedSlots>totalModerateLoadSlots, we won't actually
// decrease totalModerateLoadSlots (see the logic in tryDecreaseSlots).
// However, that doesn't mean that totalModerateLoadSlots is accurate.
// This inaccuracy is fine since we have chosen to be in a high load
// regime, since all the work we are doing is non-optional regular work
// (not background work).
//
// Where this will help is when what is pushing us over moderate load is
// optional background work, so by decreasing totalModerateLoadSlots we will
// contain the load due to that work.
kvsa.granter.totalModerateLoadSlots = tryDecreaseSlots(kvsa.granter.totalModerateLoadSlots)
}
// Consider the following cases, when we started this method with
// totalHighLoadSlots==totalModerateLoadSlots.
// - underload such that we are able to increase totalModerateLoadSlots: in
// this case we will also be able to increase totalHighLoadSlots (since
// the used and total comparisons gating the increase in tryIncreaseSlots
// will also be true for totalHighLoadSlots).
// - overload such that we are able to decrease totalHighLoadSlots: in this
// case the logic in tryDecreaseSlots will also be able to decrease
// totalModerateLoadSlots.
// So the natural behavior of the slot adjustment itself guarantees
// totalHighLoadSlots >= totalModerateLoadSlots. But as a defensive measure
// we clamp totalModerateLoadSlots to not exceed totalHighLoadSlots.
if kvsa.granter.totalHighLoadSlots < kvsa.granter.totalModerateLoadSlots {
kvsa.granter.totalModerateLoadSlots = kvsa.granter.totalHighLoadSlots
}
kvsa.granter.failedSoftSlotsGet = false
kvsa.totalSlotsMetric.Update(int64(kvsa.granter.totalHighLoadSlots))
}

func (kvsa *kvSlotAdjuster) isOverloaded() bool {
return kvsa.granter.usedSlots >= kvsa.granter.totalSlots && !kvsa.granter.skipSlotEnforcement
return kvsa.granter.usedSlots >= kvsa.granter.totalHighLoadSlots && !kvsa.granter.skipSlotEnforcement
}

// sqlNodeCPUOverloadIndicator is the implementation of cpuOverloadIndicator
Expand Down Expand Up @@ -1755,3 +1892,33 @@ var _ = (*GrantCoordinator)(nil).GetWorkQueue
// uses the term "slot" for these is that we have a completion indicator, and
// when we do have such an indicator it can be beneficial to be able to keep
// track of how many ongoing work items we have.

// SoftSlotGranter grants soft slots without queueing. See the comment with
// kvGranter.
type SoftSlotGranter struct {
kvGranter *kvGranter
}

// MakeSoftSlotGranter constructs a SoftSlotGranter given a GrantCoordinator
// that is responsible for KV and lower layers.
func MakeSoftSlotGranter(gc *GrantCoordinator) (*SoftSlotGranter, error) {
kvGranter, ok := gc.granters[KVWork].(*kvGranter)
if !ok {
return nil, errors.Errorf("GrantCoordinator does not support soft slots")
}
return &SoftSlotGranter{
kvGranter: kvGranter,
}, nil
}

// TryGetSlots attempts to acquire count slots and returns what was acquired
// (possibly 0).
// TODO: experiment with calling this when opening an sstable.Writer.
func (ssg *SoftSlotGranter) TryGetSlots(count int) int {
return ssg.kvGranter.tryGetSoftSlots(count)
}

// ReturnSlots returns count slots (count must be >= 0).
func (ssg *SoftSlotGranter) ReturnSlots(count int) {
ssg.kvGranter.returnSoftSlots(count)
}
17 changes: 17 additions & 0 deletions pkg/util/admission/granter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func TestGranterBasic(t *testing.T) {
var ambientCtx log.AmbientContext
var requesters [numWorkKinds]*testRequester
var coord *GrantCoordinator
var ssg *SoftSlotGranter
var buf strings.Builder
flushAndReset := func() string {
fmt.Fprintf(&buf, "GrantCoordinator:\n%s\n", coord.String())
Expand Down Expand Up @@ -131,6 +132,9 @@ func TestGranterBasic(t *testing.T) {
delayForGrantChainTermination = 0
coords, _ := NewGrantCoordinators(ambientCtx, opts)
coord = coords.Regular
var err error
ssg, err = MakeSoftSlotGranter(coord)
require.NoError(t, err)
return flushAndReset()

case "set-has-waiting-requests":
Expand Down Expand Up @@ -187,6 +191,19 @@ func TestGranterBasic(t *testing.T) {
coord.testingTryGrant()
return flushAndReset()

case "try-get-soft-slots":
var slots int
d.ScanArgs(t, "slots", &slots)
granted := ssg.TryGetSlots(slots)
fmt.Fprintf(&buf, "requested: %d, granted: %d\n", slots, granted)
return flushAndReset()

case "return-soft-slots":
var slots int
d.ScanArgs(t, "slots", &slots)
ssg.ReturnSlots(slots)
return flushAndReset()

default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
}
Expand Down
Loading

0 comments on commit 1c6b211

Please sign in to comment.