Skip to content

Commit

Permalink
[DNM] admission: introduce soft slots for speculative concurrency
Browse files Browse the repository at this point in the history
This is for using additional theads for Pebble compaction compression
and later for adjusting the number of concurrent compactions.

Since these slots are not acquired or released in a very fine grained
manner, we've chosen to model these in way that allows for
over-commitment. The SoftSlotGranter should be used for acquiring
and releasing such slots, instead of WorkQueue, since there is no
queueing for such slots.

Release note: None
  • Loading branch information
sumeerbhola committed Mar 28, 2022
1 parent 531eee8 commit 451f7c6
Show file tree
Hide file tree
Showing 3 changed files with 464 additions and 68 deletions.
158 changes: 147 additions & 11 deletions pkg/util/admission/granter.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,12 +451,36 @@ func (tg *tokenGranter) continueGrantChain(grantChainID grantChainID) {
// kvGranter implements granterWithLockedCalls. It is used for grants to
// KVWork, that are limited by slots (CPU bound work) and/or tokens (IO
// bound work).
//
// In production, a kvGranter is doing either slots (ioTokensEnabled=false),
// or tokens (totalSlots=MaxInt). The former is used for (typically) cpu-bound
// work (KV and storage layer) across the whole node, and latter for the
// per-store write admission control (see StoreGrantCoordinators).
//
//
// For the cpu-bound slot case we have background activities (like Pebble
// compactions) that would like to utilize additional slots if available (e.g.
// to do concurrent compression of ssblocks). These activities do not want to
// wait for a slot, since they can proceed without the slot at their usual
// slower pace. They also are performance sensitive, and can't afford to
// interact with admission control at a fine granularity (like asking for a
// slot when compressing each ssblock). A coarse granularity interaction
// causes a delay in returning slots to admission control, and we don't want
// that delay to cause admission delay for normal work. Hence, we model slots
// granted to background activities as "soft-slots". Granting a soft-slot has
// to conform to usedSoftSlots+usedSlots <= totalSlots. Granting a regular
// slot only has to conform to usedSlots <= totalSlots. That is, soft-slots
// allow for over-commitment until the soft-slots are returned, which may mean
// some additional queueing in the goroutine scheduler.
type kvGranter struct {
coord *GrantCoordinator
requester requester
usedSlots int
usedSoftSlots int
totalSlots int
totalSoftSlots int
skipSlotEnforcement bool
failedSoftSlotsGet bool

ioTokensEnabled bool
// There is no rate limiting in granting these tokens. That is, they are all
Expand Down Expand Up @@ -555,6 +579,35 @@ func (sg *kvGranter) setAvailableIOTokensLocked(tokens int64) {
}
}

func (sg *kvGranter) tryGetSoftSlots(count int) int {
sg.coord.mu.Lock()
defer sg.coord.mu.Unlock()
spareSoftSlots := sg.totalSoftSlots - sg.usedSoftSlots
spareSlots := sg.totalSlots - (sg.usedSlots + sg.usedSoftSlots)
if spareSlots < spareSoftSlots {
spareSoftSlots = spareSlots
}
if spareSoftSlots <= 0 {
sg.failedSoftSlotsGet = true
return 0
}
allocatedSlots := count
if allocatedSlots > spareSoftSlots {
allocatedSlots = spareSoftSlots
}
sg.usedSoftSlots += allocatedSlots
return allocatedSlots
}

func (sg *kvGranter) returnSoftSlots(count int) {
sg.coord.mu.Lock()
defer sg.coord.mu.Unlock()
sg.usedSoftSlots -= count
if sg.usedSoftSlots < 0 {
panic("used soft slots is negative")
}
}

// GrantCoordinator is the top-level object that coordinates grants across
// different WorkKinds (for more context see the comment in doc.go, and the
// comment where WorkKind is declared). Typically there will one
Expand Down Expand Up @@ -717,6 +770,7 @@ func NewGrantCoordinators(
kvg := &kvGranter{
coord: coord,
totalSlots: opts.MinCPUSlots,
totalSoftSlots: opts.MinCPUSlots,
usedSlotsMetric: metrics.KVUsedSlots,
}
kvSlotAdjuster.granter = kvg
Expand Down Expand Up @@ -1175,7 +1229,11 @@ func (coord *GrantCoordinator) SafeFormat(s redact.SafePrinter, verb rune) {
switch kind {
case KVWork:
g := coord.granters[i].(*kvGranter)
s.Printf("%s%s: used: %d, total: %d", curSep, workKindString(kind), g.usedSlots, g.totalSlots)
s.Printf("%s%s: used: %d, total(soft): %d(%d)", curSep, workKindString(kind),
g.usedSlots, g.totalSlots, g.totalSoftSlots)
if g.usedSoftSlots > 0 {
s.Printf(" used-soft: %d", g.usedSoftSlots)
}
if g.ioTokensEnabled {
s.Printf(" io-avail: %d", g.availableIOTokens)
}
Expand Down Expand Up @@ -1398,7 +1456,8 @@ func (kvsa *kvSlotAdjuster) CPULoad(runnable int, procs int, _ time.Duration) {

// Simple heuristic, which worked ok in experiments. More sophisticated ones
// could be devised.
if runnable >= threshold*procs {
usedSlots := kvsa.granter.usedSlots + kvsa.granter.usedSoftSlots
tryDecreaseSlots := func(used int, total int) int {
// Overload.
// If using some slots, and the used slots is less than the total slots,
// and total slots hasn't bottomed out at the min, decrease the total
Expand All @@ -1411,24 +1470,71 @@ func (kvsa *kvSlotAdjuster) CPULoad(runnable int, procs int, _ time.Duration) {
// so it is suggests that the drop in slots should not be causing cpu
// under-utilization, but one cannot be sure. Experiment with a smoothed
// signal or other ways to prevent a fast drop.
if kvsa.granter.usedSlots > 0 && kvsa.granter.totalSlots > kvsa.minCPUSlots &&
kvsa.granter.usedSlots <= kvsa.granter.totalSlots {
kvsa.granter.totalSlots--
if used > 0 && total > kvsa.minCPUSlots && used <= total {
total--
}
} else if float64(runnable) <= float64((threshold*procs)/2) {
return total
}
tryIncreaseSlots := func(used int, total int) int {
// TODO: 0.8 is arbitrary.
closeToTotalSlots := int(float64(total) * 0.8)
// Underload.
// Used all its slots and can increase further, so additive increase.
if kvsa.granter.usedSlots >= kvsa.granter.totalSlots &&
kvsa.granter.totalSlots < kvsa.maxCPUSlots {
// Used all its slots and can increase further, so additive increase. We
// also handle the case where the used slots are a bit less than total
// slots, since callers for soft slots don't block.
if (used >= total || (used >= closeToTotalSlots && kvsa.granter.failedSoftSlotsGet)) &&
total < kvsa.maxCPUSlots {
// NB: If the workload is IO bound, the slot count here will keep
// incrementing until these slots are no longer the bottleneck for
// admission. So it is not unreasonable to see this slot count go into
// the 1000s. If the workload switches to being CPU bound, we can
// decrease by 1000 slots every second (because the CPULoad ticks are at
// 1ms intervals, and we do additive decrease).
kvsa.granter.totalSlots++
total++
}
}
return total
}
// NB: usedSlots >= kvGranter.usedSoftSlots. Consider the following cases, when
// totalSlots == totalSoftSlots.
// - underload such that we are able to increase totalSoftSlots: in this
// case we will also be able to increase totalSlots (since the used and
// total comparisons gating the increase in tryIncreaseSlots will also be
// true for totalSlots).
// - overload such that we are able to decrease totalSlots: in this case it
// is possible that we don't decrease the totalSoftSlots because of the
// used > 0 condition in tryDecreaseSlots.

// So the natural behavior of the slot adjustments does not guarantee
// totalSlots >= totalSoftSlots. We add logic to impose this on top of the
// natural adjustment.
//
// TODO: the fractions below are arbitrary and subject to tuning.
if runnable >= threshold*procs {
// Very overloaded.
kvsa.granter.totalSlots = tryDecreaseSlots(usedSlots, kvsa.granter.totalSlots)
kvsa.granter.totalSoftSlots = tryDecreaseSlots(
kvsa.granter.usedSoftSlots, kvsa.granter.totalSoftSlots)
} else if float64(runnable) <= float64((threshold*procs)/4) {
// Very underloaded.
kvsa.granter.totalSlots = tryIncreaseSlots(usedSlots, kvsa.granter.totalSlots)
kvsa.granter.totalSoftSlots = tryIncreaseSlots(
kvsa.granter.usedSoftSlots, kvsa.granter.totalSoftSlots)
} else if float64(runnable) <= float64((threshold*procs)/2) {
// Moderately underloaded -- can afford to increase regular slots.
kvsa.granter.totalSlots = tryIncreaseSlots(usedSlots, kvsa.granter.totalSlots)
} else if runnable >= 3*threshold*procs/4 {
// Moderately overloaded -- should decrease soft slots.
// NB: decreasing soft slots may not halt the runnable growth since the
// regular slot traffic may be high. Which means we will keep decreasing
// soft slots and undershoot. This is acceptable since soft slots are
// strictly best-effort.
kvsa.granter.totalSoftSlots = tryDecreaseSlots(
kvsa.granter.usedSoftSlots, kvsa.granter.totalSoftSlots)
}
if kvsa.granter.totalSlots < kvsa.granter.totalSoftSlots {
kvsa.granter.totalSoftSlots = kvsa.granter.totalSlots
}
kvsa.granter.failedSoftSlotsGet = false
kvsa.totalSlotsMetric.Update(int64(kvsa.granter.totalSlots))
}

Expand Down Expand Up @@ -1755,3 +1861,33 @@ var _ = (*GrantCoordinator)(nil).GetWorkQueue
// uses the term "slot" for these is that we have a completion indicator, and
// when we do have such an indicator it can be beneficial to be able to keep
// track of how many ongoing work items we have.

// SoftSlotGranter grants soft slots without queueing. See the comment with
// kvGranter.
type SoftSlotGranter struct {
kvGranter *kvGranter
}

// MakeSoftSlotGranter constructs a SoftSlotGranter given a GrantCoordinator
// that is responsible for KV and lower layers.
func MakeSoftSlotGranter(gc *GrantCoordinator) (*SoftSlotGranter, error) {
kvGranter, ok := gc.granters[KVWork].(*kvGranter)
if !ok {
return nil, errors.Errorf("GrantCoordinator does not support soft slots")
}
return &SoftSlotGranter{
kvGranter: kvGranter,
}, nil
}

// TryGetSlots attempts to acquire count slots and returns what was acquired
// (possibly 0).
// TODO: experiment with calling this when opening an sstable.Writer.
func (ssg *SoftSlotGranter) TryGetSlots(count int) int {
return ssg.kvGranter.tryGetSoftSlots(count)
}

// ReturnSlots returns count slots (count must be >= 0).
func (ssg *SoftSlotGranter) ReturnSlots(count int) {
ssg.kvGranter.returnSoftSlots(count)
}
17 changes: 17 additions & 0 deletions pkg/util/admission/granter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func TestGranterBasic(t *testing.T) {
var ambientCtx log.AmbientContext
var requesters [numWorkKinds]*testRequester
var coord *GrantCoordinator
var ssg *SoftSlotGranter
var buf strings.Builder
flushAndReset := func() string {
fmt.Fprintf(&buf, "GrantCoordinator:\n%s\n", coord.String())
Expand Down Expand Up @@ -131,6 +132,9 @@ func TestGranterBasic(t *testing.T) {
delayForGrantChainTermination = 0
coords, _ := NewGrantCoordinators(ambientCtx, opts)
coord = coords.Regular
var err error
ssg, err = MakeSoftSlotGranter(coord)
require.NoError(t, err)
return flushAndReset()

case "set-has-waiting-requests":
Expand Down Expand Up @@ -187,6 +191,19 @@ func TestGranterBasic(t *testing.T) {
coord.testingTryGrant()
return flushAndReset()

case "try-get-soft-slots":
var slots int
d.ScanArgs(t, "slots", &slots)
granted := ssg.TryGetSlots(slots)
fmt.Fprintf(&buf, "requested: %d, granted: %d\n", slots, granted)
return flushAndReset()

case "return-soft-slots":
var slots int
d.ScanArgs(t, "slots", &slots)
ssg.ReturnSlots(slots)
return flushAndReset()

default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
}
Expand Down
Loading

0 comments on commit 451f7c6

Please sign in to comment.