Skip to content

Commit

Permalink
admission: add intercept points for when replicated work gets admitted
Browse files Browse the repository at this point in the history
In a subsequent commit, when integrating kvflowcontrol into the critical
path for replication traffic, we'll set up the return of flow tokens
from the receiver node back to the sender once log entries get
(asynchronously) admitted[^1]. So we need to intercept the exact points
at which the virtually enqueued work items get admitted, since it all
happens asynchronously[^2]. To that end we introduce the following
interface:

    // OnLogEntryAdmitted is used to observe the specific entries
    // (identified by rangeID + log position) that were admitted. Since
    // admission control for log entries is asynchronous/non-blocking,
    // this allows callers to do requisite post-admission
    // bookkeeping.
    type OnLogEntryAdmitted interface {
     AdmittedLogEntry(
       origin roachpb.NodeID, /* node where the entry originated */
       pri admissionpb.WorkPriority, /* admission priority of the entry */
       storeID roachpb.StoreID, /* store on which the entry was admitted */
       rangeID roachpb.RangeID, /* identifying range for the log entry */
       pos LogPosition, /* log position of the entry that was admitted*/
     )
    }

For now we pass in a no-op implementation in production code, but this
will change shortly.

Seeing as how the asynchronous admit interface is going to be the
primary once once we enable replication admission control by default,
for IO control, we no longer need the storeWriteDone interfaces and
corresponding types. It's being used by our current (and soon-to-be
legacy) above-raft IO admission control to inform granters of when the
write was actually done, post-admission. For above-raft IO control, at
admit-time we do not have sizing info for the writes, so by intercepting
these writes at write-done time we're able to make any outstanding token
adjustments in the granter.

To reflect this new world, we:
- Rename setAdmittedDoneModels to setLinearModels.
- Introduce a storeReplicatedWorkAdmittedInfo[^3]. It provides
  information about the size of replicated work once it's admitted
  (which happens asynchronously from the work itself). This lets us use
  the underlying linear models for L0 {writes,ingests} to deduct an
  appropriate number of tokens from the granter, for the admitted work
  size[^4].
- Rename the granterWithStoreWriteDone interface to
  granterWithStoreReplicatedWorkAdmitted. We'll still intercept the
  actual point of admission for some token adjustments, through the
  the storeReplicatedWorkAdmittedLocked API shown below. There are two
  callstacks through which this API gets invoked, one where the coord.mu
  is already held, and one where it isn't. We plumb this information
  through so the lock is acquired if not already held. The locking
  structure is unfortunate, but this was a minimally invasive diff.

   storeReplicatedWorkAdmittedLocked(
    originalTokens int64,
    admittedInfo storeReplicatedWorkAdmittedInfo,
   ) (additionalTokens int64)

While here, we also export an admission.TestingReverseWorkPriorityDict.
There are at least three tests that have re-invented the wheel.

[^1]: This will happen through the kvflowcontrol.Dispatch interface
      introduced back in cockroachdb#97766, after integrating it with the RaftTransport
      layer.
[^2]: Introduced in cockroachdb#97599, for replicated write work.
[^3]: Identical to the previous StoreWorkDoneInfo.
[^4]: There's a peculiarity here in that at enqueuing-time we actually
      know the size of the write, so we could have deducted the right
      number of tokens upfront and avoid this post-admit granter token
      adjustment. We inherit this structure from earlier, and just leave
      a TODO for now.

Release note: None
  • Loading branch information
irfansharif committed Apr 28, 2023
1 parent e59abeb commit 40d1ab5
Show file tree
Hide file tree
Showing 15 changed files with 222 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,6 @@ func TestDispatch(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

reverseWorkPriorityDict := make(map[string]admissionpb.WorkPriority)
for k, v := range admissionpb.WorkPriorityDict {
reverseWorkPriorityDict[v] = k
}

datadriven.Walk(t, datapathutils.TestDataPath(t), func(t *testing.T, path string) {
var dispatch *Dispatch
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
Expand Down Expand Up @@ -82,7 +77,7 @@ func TestDispatch(t *testing.T) {

case strings.HasPrefix(parts[i], "pri="):
// Parse pri=<string>.
pri, found := reverseWorkPriorityDict[arg]
pri, found := admissionpb.TestingReverseWorkPriorityDict[arg]
require.True(t, found)
entries.AdmissionPriority = int32(pri)

Expand Down
15 changes: 3 additions & 12 deletions pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,10 @@ func TestTracker(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

reverseWorkPriorityDict := make(map[string]admissionpb.WorkPriority)
for k, v := range admissionpb.WorkPriorityDict {
reverseWorkPriorityDict[v] = k
}

ctx := context.Background()
datadriven.Walk(t, datapathutils.TestDataPath(t), func(t *testing.T, path string) {
var tracker *Tracker
knobs := &kvflowcontrol.TestingKnobs{
UntrackTokensInterceptor: func(tokens kvflowcontrol.Tokens, pos kvflowcontrolpb.RaftLogPosition) {

},
}
knobs := &kvflowcontrol.TestingKnobs{}
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "init":
Expand Down Expand Up @@ -73,7 +64,7 @@ func TestTracker(t *testing.T) {
switch {
case strings.HasPrefix(parts[i], "pri="):
var found bool
pri, found = reverseWorkPriorityDict[arg]
pri, found = admissionpb.TestingReverseWorkPriorityDict[arg]
require.True(t, found)

case strings.HasPrefix(parts[i], "tokens="):
Expand Down Expand Up @@ -103,7 +94,7 @@ func TestTracker(t *testing.T) {
var priStr, logPositionStr string
d.ScanArgs(t, "pri", &priStr)
d.ScanArgs(t, "up-to-log-position", &logPositionStr)
pri, found := reverseWorkPriorityDict[priStr]
pri, found := admissionpb.TestingReverseWorkPriorityDict[priStr]
require.True(t, found)
logPosition := parseLogPosition(t, logPositionStr)

Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
if opts, ok := cfg.TestingKnobs.AdmissionControl.(*admission.Options); ok {
admissionOptions.Override(opts)
}
gcoords := admission.NewGrantCoordinators(cfg.AmbientCtx, st, admissionOptions, registry)
gcoords := admission.NewGrantCoordinators(cfg.AmbientCtx, st, admissionOptions, registry, &admission.NoopOnLogEntryAdmitted{})

engines, err := cfg.CreateEngines(ctx)
if err != nil {
Expand Down
40 changes: 25 additions & 15 deletions pkg/util/admission/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ type granter interface {
// is a possibility that that raced with cancellation.
//
// Do not use this for doing store IO-related token adjustments when work is
// done -- that should be done via granterWithStoreWriteDone.storeWriteDone.
// done -- that should be done via granterWithStoreReplicatedWorkAdmitted.storeWriteDone.
//
// REQUIRES: count > 0. count == 1 for slots.
returnGrant(count int64)
Expand All @@ -195,7 +195,7 @@ type granter interface {
// work turned out to be an underestimate.
//
// Do not use this for doing store IO-related token adjustments when work is
// done -- that should be done via granterWithStoreWriteDone.storeWriteDone.
// done -- that should be done via granterWithStoreReplicatedWorkAdmitted.storeWriteDone.
//
// REQUIRES: count > 0. count == 1 for slots.
tookWithoutPermission(count int64)
Expand Down Expand Up @@ -274,23 +274,33 @@ type granterWithIOTokens interface {
// getDiskTokensUsedAndReset returns the disk bandwidth tokens used
// since the last such call.
getDiskTokensUsedAndReset() [admissionpb.NumWorkClasses]int64
// setAdmittedDoneModelsLocked supplies the models to use when
// storeWriteDone is called, to adjust token consumption. Note that these
// models are not used for token adjustment at admission time -- that is
// handled by StoreWorkQueue and is not in scope of this granter. This
// asymmetry is due to the need to use all the functionality of WorkQueue at
// admission time. See the long explanatory comment at the beginning of
// store_token_estimation.go, regarding token estimation.
setAdmittedDoneModels(l0WriteLM tokensLinearModel, l0IngestLM tokensLinearModel,
ingestLM tokensLinearModel)
// setLinearModels supplies the models to use when storeWriteDone or
// storeReplicatedWorkAdmittedLocked is called, to adjust token consumption.
// Note that these models are not used for token adjustment at admission
// time -- that is handled by StoreWorkQueue and is not in scope of this
// granter. This asymmetry is due to the need to use all the functionality
// of WorkQueue at admission time. See the long explanatory comment at the
// beginning of store_token_estimation.go, regarding token estimation.
setLinearModels(l0WriteLM, l0IngestLM, ingestLM tokensLinearModel)
}

// granterWithStoreWriteDone is used to abstract kvStoreTokenGranter for
// testing. The interface is used by StoreWorkQueue to pass on sizing
// information provided when the work was completed.
type granterWithStoreWriteDone interface {
// granterWithStoreReplicatedWorkAdmitted is used to abstract
// kvStoreTokenGranter for testing. The interface is used by StoreWorkQueue to
// pass on sizing information provided when the work is either done (for legacy,
// above-raft IO admission) or admitted (for below-raft, asynchronous admission
// control.
type granterWithStoreReplicatedWorkAdmitted interface {
granter
// storeWriteDone is used by legacy, above-raft IO admission control to
// inform granters of when the write was actually done, post-admission. At
// admit-time we did not have sizing info for these writes, so by
// intercepting these writes at admit time we're able to make any
// outstanding token adjustments in the granter.
storeWriteDone(originalTokens int64, doneInfo StoreWorkDoneInfo) (additionalTokens int64)
// storeReplicatedWorkAdmittedLocked is used by below-raft admission control
// to inform granters of work being admitted in order for them. It's invoked
// with the coord.mu held.
storeReplicatedWorkAdmittedLocked(originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo) (additionalTokens int64)
}

// cpuOverloadIndicator is meant to be an instantaneous indicator of cpu
Expand Down
13 changes: 12 additions & 1 deletion pkg/util/admission/admissionpb/admissionpb.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (w WorkPriority) SafeFormat(p redact.SafePrinter, verb rune) {
p.Print(s)
return
}
p.Printf("custom-pri=%d", w)
p.Printf("custom-pri=%d", int8(w))
}

// WorkPriorityDict is a mapping of the priorities to a short string name. The
Expand All @@ -69,6 +69,17 @@ var WorkPriorityDict = map[WorkPriority]string{
HighPri: "high-pri",
}

// TestingReverseWorkPriorityDict is the reverse-lookup dictionary for
// WorkPriorityDict, for use in tests.
var TestingReverseWorkPriorityDict map[string]WorkPriority

func init() {
TestingReverseWorkPriorityDict = make(map[string]WorkPriority)
for k, v := range WorkPriorityDict {
TestingReverseWorkPriorityDict[v] = k
}
}

// WorkClass represents the class of work, which is defined entirely by its
// WorkPriority. Namely, everything less than NormalPri is defined to be
// "Elastic", while everything above and including NormalPri is considered
Expand Down
5 changes: 2 additions & 3 deletions pkg/util/admission/elastic_cpu_work_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,8 @@ func TestingNewElasticCPUHandle() *ElasticCPUWorkHandle {
return newElasticCPUWorkHandle(420 * time.Hour) // use a very high allotment
}

// TestingNewElasticCPUWithCallback constructs an
// ElascticCPUWorkHandle with a testing override for the behaviour of
// OverLimit().
// TestingNewElasticCPUHandleWithCallback constructs an ElasticCPUWorkHandle
// with a testing override for the behaviour of OverLimit().
func TestingNewElasticCPUHandleWithCallback(cb func() (bool, time.Duration)) *ElasticCPUWorkHandle {
h := TestingNewElasticCPUHandle()
h.testingOverrideOverLimit = cb
Expand Down
28 changes: 23 additions & 5 deletions pkg/util/admission/grant_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type StoreGrantCoordinators struct {
// api.
numStores int
pebbleMetricsProvider PebbleMetricsProvider
onLogEntryAdmitted OnLogEntryAdmitted
closeCh chan struct{}

disableTickerForTesting bool
Expand Down Expand Up @@ -157,7 +158,7 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID)
// This is IO work, so override the usesTokens value.
opts.usesTokens = true
// TODO(sumeer): add per-store WorkQueue state for debug.zip and db console.
granters := [admissionpb.NumWorkClasses]granterWithStoreWriteDone{
granters := [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted{
&kvStoreTokenChildGranter{
workClass: admissionpb.RegularWorkClass,
parent: kvg,
Expand All @@ -168,7 +169,17 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID)
},
}

storeReq := sgc.makeStoreRequesterFunc(sgc.ambientCtx, storeID, granters, sgc.settings, sgc.workQueueMetrics, opts, nil)
storeReq := sgc.makeStoreRequesterFunc(
sgc.ambientCtx,
storeID,
granters,
sgc.settings,
sgc.workQueueMetrics,
opts,
nil, /* knobs */
sgc.onLogEntryAdmitted,
&coord.mu.Mutex,
)
coord.queues[KVWork] = storeReq
requesters := storeReq.getRequesters()
kvg.regularRequester = requesters[admissionpb.RegularWorkClass]
Expand Down Expand Up @@ -336,8 +347,9 @@ type makeRequesterFunc func(
metrics *WorkQueueMetrics, opts workQueueOptions) requester

type makeStoreRequesterFunc func(
_ log.AmbientContext, storeID roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone,
_ log.AmbientContext, storeID roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted,
settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, knobs *TestingKnobs,
onLogEntryAdmitted OnLogEntryAdmitted, coordMu *syncutil.Mutex,
) storeRequester

// NewGrantCoordinators constructs GrantCoordinators and WorkQueues for a
Expand All @@ -356,13 +368,17 @@ type makeStoreRequesterFunc func(
// GrantCoordinators since they are not trying to control CPU usage, so we turn
// off grant chaining in those coordinators.
func NewGrantCoordinators(
ambientCtx log.AmbientContext, st *cluster.Settings, opts Options, registry *metric.Registry,
ambientCtx log.AmbientContext,
st *cluster.Settings,
opts Options,
registry *metric.Registry,
onLogEntryAdmitted OnLogEntryAdmitted,
) GrantCoordinators {
metrics := makeGrantCoordinatorMetrics()
registry.AddMetricStruct(metrics)

return GrantCoordinators{
Stores: makeStoresGrantCoordinators(ambientCtx, opts, st, metrics, registry),
Stores: makeStoresGrantCoordinators(ambientCtx, opts, st, metrics, registry, onLogEntryAdmitted),
Regular: makeRegularGrantCoordinator(ambientCtx, opts, st, metrics, registry),
Elastic: makeElasticGrantCoordinator(ambientCtx, st, registry),
}
Expand Down Expand Up @@ -399,6 +415,7 @@ func makeStoresGrantCoordinators(
st *cluster.Settings,
metrics GrantCoordinatorMetrics,
registry *metric.Registry,
onLogEntryAdmitted OnLogEntryAdmitted,
) *StoreGrantCoordinators {
// These metrics are shared across all stores and broken down by priority for
// the common priorities.
Expand All @@ -417,6 +434,7 @@ func makeStoresGrantCoordinators(
makeStoreRequesterFunc: makeStoreRequester,
kvIOTokensExhaustedDuration: metrics.KVIOTokensExhaustedDuration,
workQueueMetrics: storeWorkQueueMetrics,
onLogEntryAdmitted: onLogEntryAdmitted,
}
return storeCoordinators
}
Expand Down
47 changes: 21 additions & 26 deletions pkg/util/admission/granter.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ type kvStoreTokenChildGranter struct {
parent *kvStoreTokenGranter
}

var _ granterWithStoreWriteDone = &kvStoreTokenChildGranter{}
var _ granterWithStoreReplicatedWorkAdmitted = &kvStoreTokenChildGranter{}
var _ granter = &kvStoreTokenChildGranter{}

// grantKind implements granter.
Expand Down Expand Up @@ -352,11 +352,23 @@ func (cg *kvStoreTokenChildGranter) continueGrantChain(grantChainID grantChainID
// Ignore since grant chains are not used for store tokens.
}

// storeWriteDone implements granterWithStoreWriteDone.
// storeWriteDone implements granterWithStoreReplicatedWorkAdmitted.
func (cg *kvStoreTokenChildGranter) storeWriteDone(
originalTokens int64, doneInfo StoreWorkDoneInfo,
) (additionalTokens int64) {
return cg.parent.storeWriteDone(cg.workClass, originalTokens, doneInfo)
cg.parent.coord.mu.Lock()
defer cg.parent.coord.mu.Unlock()
// NB: the token/metric adjustments we want to make here are the same as we
// want to make through the storeReplicatedWorkAdmittedLocked, so we (ab)use it.
return cg.parent.storeReplicatedWorkAdmittedLocked(
cg.workClass, originalTokens, storeReplicatedWorkAdmittedInfo(doneInfo))
}

// storeReplicatedWorkAdmitted implements granterWithStoreReplicatedWorkAdmitted.
func (cg *kvStoreTokenChildGranter) storeReplicatedWorkAdmittedLocked(
originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo,
) (additionalTokens int64) {
return cg.parent.storeReplicatedWorkAdmittedLocked(cg.workClass, originalTokens, admittedInfo)
}

func (sg *kvStoreTokenGranter) tryGet(workClass admissionpb.WorkClass, count int64) bool {
Expand Down Expand Up @@ -522,7 +534,7 @@ func (sg *kvStoreTokenGranter) getDiskTokensUsedAndReset() [admissionpb.NumWorkC
}

// setAdmittedModelsLocked implements granterWithIOTokens.
func (sg *kvStoreTokenGranter) setAdmittedDoneModels(
func (sg *kvStoreTokenGranter) setLinearModels(
l0WriteLM tokensLinearModel, l0IngestLM tokensLinearModel, ingestLM tokensLinearModel,
) {
sg.coord.mu.Lock()
Expand All @@ -532,37 +544,21 @@ func (sg *kvStoreTokenGranter) setAdmittedDoneModels(
sg.ingestLM = ingestLM
}

// storeWriteDone implements granterWithStoreWriteDone.
func (sg *kvStoreTokenGranter) storeWriteDone(
wc admissionpb.WorkClass, originalTokens int64, doneInfo StoreWorkDoneInfo,
func (sg *kvStoreTokenGranter) storeReplicatedWorkAdmittedLocked(
wc admissionpb.WorkClass, originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo,
) (additionalTokens int64) {
// Normally, we follow the structure of a foo() method calling into a foo()
// method on the GrantCoordinator, which then calls fooLocked() on the
// kvStoreTokenGranter. For example, returnGrant follows this structure.
// This allows the GrantCoordinator to do two things (a) acquire the mu
// before calling into kvStoreTokenGranter, (b) do side-effects, like
// terminating grant chains and doing more grants after the call into the
// fooLocked() method.
// For storeWriteDone we don't bother with this structure involving the
// GrantCoordinator (which has served us well across various methods and
// various granter implementations), since the decision on when the
// GrantCoordinator should call tryGrantLocked is more complicated. And since this
// storeWriteDone is unique to the kvStoreTokenGranter (and not implemented
// by other granters) this approach seems acceptable.

// Reminder: coord.mu protects the state in the kvStoreTokenGranter.
sg.coord.mu.Lock()
exhaustedFunc := func() bool {
return sg.coordMu.availableIOTokens <= 0 ||
(wc == admissionpb.ElasticWorkClass && sg.coordMu.elasticDiskBWTokensAvailable <= 0)
}
wasExhausted := exhaustedFunc()
actualL0WriteTokens := sg.l0WriteLM.applyLinearModel(doneInfo.WriteBytes)
actualL0IngestTokens := sg.l0IngestLM.applyLinearModel(doneInfo.IngestedBytes)
actualL0WriteTokens := sg.l0WriteLM.applyLinearModel(admittedInfo.WriteBytes)
actualL0IngestTokens := sg.l0IngestLM.applyLinearModel(admittedInfo.IngestedBytes)
actualL0Tokens := actualL0WriteTokens + actualL0IngestTokens
additionalL0TokensNeeded := actualL0Tokens - originalTokens
sg.subtractTokensLocked(additionalL0TokensNeeded, false)
actualIngestTokens := sg.ingestLM.applyLinearModel(doneInfo.IngestedBytes)
actualIngestTokens := sg.ingestLM.applyLinearModel(admittedInfo.IngestedBytes)
additionalDiskBWTokensNeeded := (actualL0WriteTokens + actualIngestTokens) - originalTokens
if wc == admissionpb.ElasticWorkClass {
sg.coordMu.elasticDiskBWTokensAvailable -= additionalDiskBWTokensNeeded
Expand All @@ -574,7 +570,6 @@ func (sg *kvStoreTokenGranter) storeWriteDone(
sg.coord.tryGrantLocked()
}
}
sg.coord.mu.Unlock()
// For multi-tenant fairness accounting, we choose to ignore disk bandwidth
// tokens. Ideally, we'd have multiple resource dimensions for the fairness
// decisions, but we don't necessarily need something more sophisticated
Expand Down
Loading

0 comments on commit 40d1ab5

Please sign in to comment.