Skip to content

Commit

Permalink
admission: introduce soft slots for speculative concurrency
Browse files Browse the repository at this point in the history
This is for using additional theads for Pebble compaction compression
and later for adjusting the number of concurrent compactions.

Since these slots are not acquired or released in a very fine grained
manner, we've chosen to model these in way that allows for
over-commitment. The SoftSlotGranter should be used for acquiring
and releasing such slots, instead of WorkQueue, since there is no
queueing for such slots.

Release note: None
  • Loading branch information
sumeerbhola authored and bananabrick committed May 24, 2022
1 parent e6e623f commit 9827839
Show file tree
Hide file tree
Showing 3 changed files with 511 additions and 82 deletions.
235 changes: 200 additions & 35 deletions pkg/util/admission/granter.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,11 @@ type slotGranter struct {
workKind WorkKind
requester requester
usedSlots int
totalSlots int
usedSoftSlots int
totalHighLoadSlots int
totalModerateLoadSlots int
failedSoftSlotsGet bool
runnableEWMA float64
skipSlotEnforcement bool

// Optional. Nil for a slotGranter used for KVWork since the slots for that
Expand Down Expand Up @@ -375,7 +379,7 @@ func (sg *slotGranter) tryGetLocked(count int64) grantResult {
if sg.cpuOverload != nil && sg.cpuOverload.isOverloaded() {
return grantFailDueToSharedResource
}
if sg.usedSlots < sg.totalSlots || sg.skipSlotEnforcement {
if sg.usedSlots < sg.totalHighLoadSlots || sg.skipSlotEnforcement {
sg.usedSlots++
sg.usedSlotsMetric.Update(int64(sg.usedSlots))
return grantSuccess
Expand All @@ -390,6 +394,31 @@ func (sg *slotGranter) returnGrant(count int64) {
sg.coord.returnGrant(sg.workKind, count)
}

func (sg *slotGranter) tryGetSoftSlots(count int) int {
sg.coord.mu.Lock()
defer sg.coord.mu.Unlock()
spareModerateLoadSlots := sg.totalModerateLoadSlots - sg.usedSoftSlots - sg.usedSlots
if spareModerateLoadSlots <= 0 {
sg.failedSoftSlotsGet = true
return 0
}
allocatedSlots := count
if allocatedSlots > spareModerateLoadSlots {
allocatedSlots = spareModerateLoadSlots
}
sg.usedSoftSlots += allocatedSlots
return allocatedSlots
}

func (sg *slotGranter) returnSoftSlots(count int) {
sg.coord.mu.Lock()
defer sg.coord.mu.Unlock()
sg.usedSoftSlots -= count
if sg.usedSoftSlots < 0 {
panic("used soft slots is negative")
}
}

func (sg *slotGranter) returnGrantLocked(count int64) {
if count != 1 {
panic(errors.AssertionFailedf("unexpected count: %d", count))
Expand Down Expand Up @@ -486,17 +515,62 @@ func (tg *tokenGranter) continueGrantChain(grantChainID grantChainID) {
tg.coord.continueGrantChain(tg.workKind, grantChainID)
}

// kvStoreTokenGranter implements granterWithLockedCalls. It is used for
// grants to KVWork to a store, that is limited by IO tokens.
type kvStoreTokenGranter struct {

// For the cpu-bound slot case we have background activities (like Pebble
// compactions) that would like to utilize additional slots if available (e.g.
// to do concurrent compression of ssblocks). These activities do not want to
// wait for a slot, since they can proceed without the slot at their usual
// slower pace (e.g. without doing concurrent compression). They also are
// sensitive to small overheads in their tight loops, and cannot afford the
// overhead of interacting with admission control at a fine granularity (like
// asking for a slot when compressing each ssblock). A coarse granularity
// interaction causes a delay in returning slots to admission control, and we
// don't want that delay to cause admission delay for normal work. Hence, we
// model slots granted to background activities as "soft-slots". Think of
// regular used slots as "hard-slots", in that we assume that the holder of
// the slot is still "using" it, while a soft-slot is "squishy" and in some
// cases we can pretend that it is not being used. Say we are allowed
// to allocate up to M slots. In this scheme, when allocating a soft-slot
// one must conform to usedSoftSlots+usedSlots <= M, and when allocating
// a regular (hard) slot one must conform to usedSlots <= M.
//
// That is, soft-slots allow for over-commitment until the soft-slots are
// returned, which may mean some additional queueing in the goroutine
// scheduler.
//
// We have another wrinkle in that we do not want to maintain a single M. For
// these optional background activities we desire to do them only when the
// load is low enough. This is because at high load, all work suffers from
// additional queueing in the goroutine scheduler. So we want to make sure
// regular work does not suffer such goroutine scheduler queueing because we
// granted too many soft-slots and caused CPU utilization to be high. So we
// maintain two kinds of M, totalHighLoadSlots and totalModerateLoadSlots.
// totalHighLoadSlots are estimated so as to allow CPU utilization to be high,
// while totalModerateLoadSlots are trying to keep queuing in the goroutine
// scheduler to a lower level. So the revised equations for allocation are:
// - Allocating a soft-slot: usedSoftSlots+usedSlots <= totalModerateLoadSlots
// - Allocating a regular slot: usedSlots <= totalHighLoadSlots
//
// NB: we may in the future add other kinds of background activities that do
// not have a lag in interacting with admission control, but want to schedule
// them only under moderate load. Those activities will be counted in
// usedSlots but when granting a slot to such an activity, the equation will
// be usedSoftSlots+usedSlots <= totalModerateLoadSlots.
//
// That is, let us not confuse that moderate load slot allocation is only for
// soft-slots. Soft-slots are introduced only for squishiness.

// kvStoreTokenGranter implements granterWithLockedCalls. It is used for
// grants to KVWork to a store, that is limited by IO tokens.
type kvStoreTokenGranter struct {
coord *GrantCoordinator
requester requester
// There is no rate limiting in granting these tokens. That is, they are all
// burst tokens.
availableIOTokens int64
ioTokensExhaustedDurationMetric *metric.Counter
exhaustedStart time.Time
}
}

var _ granterWithLockedCalls = &kvStoreTokenGranter{}

Expand All @@ -514,12 +588,13 @@ func (sg *kvStoreTokenGranter) tryGet(count int64) bool {

func (sg *kvStoreTokenGranter) tryGetLocked(count int64) grantResult {
if sg.availableIOTokens > 0 {
sg.subtractTokens(count, false)
return grantSuccess
sg.subtractTokens(count, false)
return grantSuccess
}
return grantFailLocal
}


func (sg *kvStoreTokenGranter) returnGrant(count int64) {
sg.coord.returnGrant(KVWork, count)
}
Expand Down Expand Up @@ -737,11 +812,13 @@ func NewGrantCoordinators(
}

kvg := &slotGranter{
coord: coord,
workKind: KVWork,
totalSlots: opts.MinCPUSlots,
usedSlotsMetric: metrics.KVUsedSlots,
coord: coord,
workKind: KVWork,
totalHighLoadSlots: opts.MinCPUSlots,
totalModerateLoadSlots: opts.MinCPUSlots,
usedSlotsMetric: metrics.KVUsedSlots,
}

kvSlotAdjuster.granter = kvg
coord.queues[KVWork] = makeRequester(ambientCtx, KVWork, kvg, st, makeWorkQueueOptions(KVWork))
kvg.requester = coord.queues[KVWork]
Expand Down Expand Up @@ -774,7 +851,7 @@ func NewGrantCoordinators(
sg := &slotGranter{
coord: coord,
workKind: SQLStatementLeafStartWork,
totalSlots: opts.SQLStatementLeafStartWorkSlots,
totalHighLoadSlots: opts.SQLStatementLeafStartWorkSlots,
cpuOverload: kvSlotAdjuster,
usedSlotsMetric: metrics.SQLLeafStartUsedSlots,
}
Expand All @@ -786,7 +863,7 @@ func NewGrantCoordinators(
sg = &slotGranter{
coord: coord,
workKind: SQLStatementRootStartWork,
totalSlots: opts.SQLStatementRootStartWorkSlots,
totalHighLoadSlots: opts.SQLStatementRootStartWorkSlots,
cpuOverload: kvSlotAdjuster,
usedSlotsMetric: metrics.SQLRootStartUsedSlots,
}
Expand Down Expand Up @@ -865,7 +942,7 @@ func NewGrantCoordinatorSQL(
sg := &slotGranter{
coord: coord,
workKind: SQLStatementLeafStartWork,
totalSlots: opts.SQLStatementLeafStartWorkSlots,
totalHighLoadSlots: opts.SQLStatementLeafStartWorkSlots,
cpuOverload: sqlNodeCPU,
usedSlotsMetric: metrics.SQLLeafStartUsedSlots,
}
Expand All @@ -877,7 +954,7 @@ func NewGrantCoordinatorSQL(
sg = &slotGranter{
coord: coord,
workKind: SQLStatementRootStartWork,
totalSlots: opts.SQLStatementRootStartWorkSlots,
totalHighLoadSlots: opts.SQLStatementRootStartWorkSlots,
cpuOverload: sqlNodeCPU,
usedSlotsMetric: metrics.SQLRootStartUsedSlots,
}
Expand Down Expand Up @@ -1212,16 +1289,20 @@ func (coord *GrantCoordinator) SafeFormat(s redact.SafePrinter, verb rune) {
switch kind {
case KVWork:
switch g := coord.granters[i].(type) {
case *slotGranter:
s.Printf("%s%s: used: %d, total: %d", curSep, workKindString(kind), g.usedSlots,
g.totalSlots)
case *kvStoreTokenGranter:
s.Printf(" io-avail: %d", g.availableIOTokens)
}
case *slotGranter:
s.Printf(
"%s%s: used: %d, high(moderate)-total: %d(%d)", curSep, workKindString(kind),
g.usedSlots, g.totalHighLoadSlots, g.totalModerateLoadSlots)
if g.usedSoftSlots > 0 {
s.Printf(" used-soft: %d", g.usedSoftSlots)
}
case *kvStoreTokenGranter:
s.Printf(" io-avail: %d", g.availableIOTokens)
}
case SQLStatementLeafStartWork, SQLStatementRootStartWork:
if coord.granters[i] != nil {
g := coord.granters[i].(*slotGranter)
s.Printf("%s%s: used: %d, total: %d", curSep, workKindString(kind), g.usedSlots, g.totalSlots)
s.Printf("%s%s: used: %d, total: %d", curSep, workKindString(kind), g.usedSlots, g.totalHighLoadSlots)
}
case SQLKVResponseWork, SQLSQLResponseWork:
if coord.granters[i] != nil {
Expand Down Expand Up @@ -1353,8 +1434,9 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID int32) *GrantCoo
useGrantChains: false,
numProcs: 1,
}

kvg := &kvStoreTokenGranter{
coord: coord,
coord: coord,
ioTokensExhaustedDurationMetric: sgc.kvIOTokensExhaustedDuration,
}
opts := makeWorkQueueOptions(KVWork)
Expand Down Expand Up @@ -1466,7 +1548,8 @@ func (kvsa *kvSlotAdjuster) CPULoad(runnable int, procs int, _ time.Duration) {

// Simple heuristic, which worked ok in experiments. More sophisticated ones
// could be devised.
if runnable >= threshold*procs {
usedSlots := kvsa.granter.usedSlots + kvsa.granter.usedSoftSlots
tryDecreaseSlots := func(total int) int {
// Overload.
// If using some slots, and the used slots is less than the total slots,
// and total slots hasn't bottomed out at the min, decrease the total
Expand All @@ -1479,29 +1562,81 @@ func (kvsa *kvSlotAdjuster) CPULoad(runnable int, procs int, _ time.Duration) {
// so it is suggests that the drop in slots should not be causing cpu
// under-utilization, but one cannot be sure. Experiment with a smoothed
// signal or other ways to prevent a fast drop.
if kvsa.granter.usedSlots > 0 && kvsa.granter.totalSlots > kvsa.minCPUSlots &&
kvsa.granter.usedSlots <= kvsa.granter.totalSlots {
kvsa.granter.totalSlots--
if usedSlots > 0 && total > kvsa.minCPUSlots && usedSlots <= total {
total--
}
} else if float64(runnable) <= float64((threshold*procs)/2) {
return total
}
tryIncreaseSlots := func(total int) int {
// TODO: 0.8 is arbitrary.
closeToTotalSlots := int(float64(total) * 0.8)
// Underload.
// Used all its slots and can increase further, so additive increase.
if kvsa.granter.usedSlots >= kvsa.granter.totalSlots &&
kvsa.granter.totalSlots < kvsa.maxCPUSlots {
// Used all its slots and can increase further, so additive increase. We
// also handle the case where the used slots are a bit less than total
// slots, since callers for soft slots don't block.
if (usedSlots >= total || (usedSlots >= closeToTotalSlots && kvsa.granter.failedSoftSlotsGet)) &&
total < kvsa.maxCPUSlots {
// NB: If the workload is IO bound, the slot count here will keep
// incrementing until these slots are no longer the bottleneck for
// admission. So it is not unreasonable to see this slot count go into
// the 1000s. If the workload switches to being CPU bound, we can
// decrease by 1000 slots every second (because the CPULoad ticks are at
// 1ms intervals, and we do additive decrease).
kvsa.granter.totalSlots++
total++
}
return total
}
kvsa.totalSlotsMetric.Update(int64(kvsa.granter.totalSlots))

// TODO: the fractions below are arbitrary and subject to tuning.
if runnable >= threshold*procs {
// Very overloaded.
kvsa.granter.totalHighLoadSlots = tryDecreaseSlots(kvsa.granter.totalHighLoadSlots)
kvsa.granter.totalModerateLoadSlots = tryDecreaseSlots(kvsa.granter.totalModerateLoadSlots)
} else if float64(runnable) <= float64((threshold*procs)/4) {
// Very underloaded.
kvsa.granter.totalHighLoadSlots = tryIncreaseSlots(kvsa.granter.totalHighLoadSlots)
kvsa.granter.totalModerateLoadSlots = tryIncreaseSlots(kvsa.granter.totalModerateLoadSlots)
} else if float64(runnable) <= float64((threshold*procs)/2) {
// Moderately underloaded -- can afford to increase regular slots.
kvsa.granter.totalHighLoadSlots = tryIncreaseSlots(kvsa.granter.totalHighLoadSlots)
} else if runnable >= 3*threshold*procs/4 {
// Moderately overloaded -- should decrease moderate load slots.
//
// NB: decreasing moderate load slots may not halt the runnable growth
// since the regular traffic may be high and can use up to the high load
// slots. When usedSlots>totalModerateLoadSlots, we won't actually
// decrease totalModerateLoadSlots (see the logic in tryDecreaseSlots).
// However, that doesn't mean that totalModerateLoadSlots is accurate.
// This inaccuracy is fine since we have chosen to be in a high load
// regime, since all the work we are doing is non-optional regular work
// (not background work).
//
// Where this will help is when what is pushing us over moderate load is
// optional background work, so by decreasing totalModerateLoadSlots we will
// contain the load due to that work.
kvsa.granter.totalModerateLoadSlots = tryDecreaseSlots(kvsa.granter.totalModerateLoadSlots)
}
// Consider the following cases, when we started this method with
// totalHighLoadSlots==totalModerateLoadSlots.
// - underload such that we are able to increase totalModerateLoadSlots: in
// this case we will also be able to increase totalHighLoadSlots (since
// the used and total comparisons gating the increase in tryIncreaseSlots
// will also be true for totalHighLoadSlots).
// - overload such that we are able to decrease totalHighLoadSlots: in this
// case the logic in tryDecreaseSlots will also be able to decrease
// totalModerateLoadSlots.
// So the natural behavior of the slot adjustment itself guarantees
// totalHighLoadSlots >= totalModerateLoadSlots. But as a defensive measure
// we clamp totalModerateLoadSlots to not exceed totalHighLoadSlots.
if kvsa.granter.totalHighLoadSlots < kvsa.granter.totalModerateLoadSlots {
kvsa.granter.totalModerateLoadSlots = kvsa.granter.totalHighLoadSlots
}
kvsa.granter.failedSoftSlotsGet = false
kvsa.totalSlotsMetric.Update(int64(kvsa.granter.totalHighLoadSlots))
}

func (kvsa *kvSlotAdjuster) isOverloaded() bool {
return kvsa.granter.usedSlots >= kvsa.granter.totalSlots && !kvsa.granter.skipSlotEnforcement
return kvsa.granter.usedSlots >= kvsa.granter.totalHighLoadSlots && !kvsa.granter.skipSlotEnforcement
}

// sqlNodeCPUOverloadIndicator is the implementation of cpuOverloadIndicator
Expand Down Expand Up @@ -2067,3 +2202,33 @@ var _ = NewGrantCoordinatorSQL
// uses the term "slot" for these is that we have a completion indicator, and
// when we do have such an indicator it can be beneficial to be able to keep
// track of how many ongoing work items we have.

// SoftSlotGranter grants soft slots without queueing. See the comment with
// kvGranter.
type SoftSlotGranter struct {
kvGranter *slotGranter
}

// MakeSoftSlotGranter constructs a SoftSlotGranter given a GrantCoordinator
// that is responsible for KV and lower layers.
func MakeSoftSlotGranter(gc *GrantCoordinator) (*SoftSlotGranter, error) {
kvGranter, ok := gc.granters[KVWork].(*slotGranter)
if !ok {
return nil, errors.Errorf("GrantCoordinator does not support soft slots")
}
return &SoftSlotGranter{
kvGranter: kvGranter,
}, nil
}

// TryGetSlots attempts to acquire count slots and returns what was acquired
// (possibly 0).
// TODO: experiment with calling this when opening an sstable.Writer.
func (ssg *SoftSlotGranter) TryGetSlots(count int) int {
return ssg.kvGranter.tryGetSoftSlots(count)
}

// ReturnSlots returns count slots (count must be >= 0).
func (ssg *SoftSlotGranter) ReturnSlots(count int) {
ssg.kvGranter.returnSoftSlots(count)
}
Loading

0 comments on commit 9827839

Please sign in to comment.