Skip to content

Commit

Permalink
admission: account for write-amp in disk bandwidth limiter
Browse files Browse the repository at this point in the history
Previously, we would calculate elastic disk bandwidth tokens using
arbitrary load thresholds and an estimate on incoming bytes into the LSM
through flushes and ingestions. This calculation lacked accounting for
write amplification in the LSM.

This patch simplifies the disk bandwidth limiter to remove the disk load
watcher and simply adjust tokens using the known provisioned disk
bandwidth. For token deducation, we create a write-amp model that is a
relationship between incoming LSM bytes to actual disk writes.

The token granting semantics are as follows:
- elastic writes: deduct tokens, and wait for positive count in bucket.
- regular writes: deduct tokens, but proceed even with no tokens
  available.

The requested write bytes are "inflated" using the estimated write
amplification to account for async compactions in the LSM.

This patch also lays the framework for future integrations where we can
account for range snapshot ingestions separately as those don't incur
the same write amplification as flushed LSM bytes do.

Informs: #86857

Release note: None
  • Loading branch information
aadityasondhi committed Sep 11, 2024
1 parent df4c2cd commit 65b4ad1
Show file tree
Hide file tree
Showing 16 changed files with 737 additions and 847 deletions.
12 changes: 6 additions & 6 deletions pkg/util/admission/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,21 +274,21 @@ type granterWithIOTokens interface {
// negative, though that will be rare, since it is possible for tokens to be
// returned.
setAvailableTokens(
ioTokens int64, elasticIOTokens int64, elasticDiskBandwidthTokens int64,
ioTokensCapacity int64, elasticIOTokenCapacity int64, elasticDiskBandwidthTokensCapacity int64,
ioTokens int64, elasticIOTokens int64, elasticDiskWriteTokens int64,
ioTokensCapacity int64, elasticIOTokenCapacity int64, elasticDiskWriteTokensCapacity int64,
lastTick bool,
) (tokensUsed int64, tokensUsedByElasticWork int64)
// getDiskTokensUsedAndReset returns the disk bandwidth tokens used
// since the last such call.
getDiskTokensUsedAndReset() [admissionpb.NumWorkClasses]int64
// getDiskTokensUsedAndReset returns the disk bandwidth tokens used since the
// last such call.
getDiskTokensUsedAndReset() [admissionpb.NumWorkClasses]diskTokens
// setLinearModels supplies the models to use when storeWriteDone or
// storeReplicatedWorkAdmittedLocked is called, to adjust token consumption.
// Note that these models are not used for token adjustment at admission
// time -- that is handled by StoreWorkQueue and is not in scope of this
// granter. This asymmetry is due to the need to use all the functionality
// of WorkQueue at admission time. See the long explanatory comment at the
// beginning of store_token_estimation.go, regarding token estimation.
setLinearModels(l0WriteLM, l0IngestLM, ingestLM tokensLinearModel)
setLinearModels(l0WriteLM, l0IngestLM, ingestLM, writeAmpLM tokensLinearModel)
}

// granterWithStoreReplicatedWorkAdmitted is used to abstract
Expand Down
367 changes: 95 additions & 272 deletions pkg/util/admission/disk_bandwidth.go

Large diffs are not rendered by default.

72 changes: 18 additions & 54 deletions pkg/util/admission/disk_bandwidth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,89 +11,53 @@
package admission

import (
"context"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/redact"
)

func TestDiskLoadWatcher(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var dlw diskLoadWatcher
watcherToString := func() string {
level := dlw.getLoadLevel()
return fmt.Sprintf("%s\nload-level: %s", redact.Sprint(dlw),
diskLoadLevelString(level))
}

datadriven.RunTest(t, datapathutils.TestDataPath(t, "disk_load_watcher"),
func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "init":
dlw = diskLoadWatcher{}
return watcherToString()

case "interval-info":
var readBandwidth, writeBandwidth, provisionedBandwidth int
d.ScanArgs(t, "read-bw", &readBandwidth)
d.ScanArgs(t, "write-bw", &writeBandwidth)
d.ScanArgs(t, "provisioned-bw", &provisionedBandwidth)
dlw.setIntervalInfo(intervalDiskLoadInfo{
readBandwidth: int64(readBandwidth),
writeBandwidth: int64(writeBandwidth),
provisionedBandwidth: int64(provisionedBandwidth),
})
return watcherToString()

default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
}
})
}

func TestDiskBandwidthLimiter(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var dbl diskBandwidthLimiter
var dbl *diskBandwidthLimiter
dblToString := func() string {
return string(redact.Sprint(&dbl))
return string(redact.Sprint(dbl))
}

datadriven.RunTest(t, datapathutils.TestDataPath(t, "disk_bandwidth_limiter"),
func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "init":
dbl = makeDiskBandwidthLimiter()
dbl = newDiskBandwidthLimiter()
return dblToString()

case "compute":
var readBandwidth, writeBandwidth, provisionedBandwidth int
d.ScanArgs(t, "read-bw", &readBandwidth)
d.ScanArgs(t, "write-bw", &writeBandwidth)
d.ScanArgs(t, "provisioned-bw", &provisionedBandwidth)
var readBytes, writeBytes, intProvisionedBytes int
d.ScanArgs(t, "int-read-bytes", &readBytes)
d.ScanArgs(t, "int-write-bytes", &writeBytes)
d.ScanArgs(t, "int-provisioned-bytes", &intProvisionedBytes)
diskLoad := intervalDiskLoadInfo{
readBandwidth: int64(readBandwidth),
writeBandwidth: int64(writeBandwidth),
provisionedBandwidth: int64(provisionedBandwidth),
intReadBytes: int64(readBytes),
intWriteBytes: int64(writeBytes),
intProvisionedDiskBytes: int64(intProvisionedBytes),
elasticBandwidthMaxUtil: 0.9,
}
var incomingBytes, regularTokensUsed, elasticTokensUsed int
d.ScanArgs(t, "incoming-bytes", &incomingBytes)
var regularTokensUsed, elasticTokensUsed int64
d.ScanArgs(t, "regular-tokens-used", &regularTokensUsed)
d.ScanArgs(t, "elastic-tokens-used", &elasticTokensUsed)
lsmInfo := intervalLSMInfo{
incomingBytes: int64(incomingBytes),
regularTokensUsed: int64(regularTokensUsed),
elasticTokensUsed: int64(elasticTokensUsed),
usedTokens := [admissionpb.NumWorkClasses]diskTokens{
{writeByteTokens: regularTokensUsed}, // regular
{writeByteTokens: elasticTokensUsed}, // elastic
}
dbl.computeElasticTokens(context.Background(), diskLoad, lsmInfo)

dbl.computeElasticTokens(diskLoad, usedTokens)
return dblToString()

default:
Expand Down
8 changes: 4 additions & 4 deletions pkg/util/admission/grant_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID)
}
kvg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] = unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval()
kvg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] = kvg.coordMu.availableIOTokens[admissionpb.RegularWorkClass]
kvg.coordMu.elasticDiskBWTokensAvailable = unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval()
kvg.coordMu.diskTokensAvailable.writeByteTokens = unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval()

opts := makeWorkQueueOptions(KVWork)
// This is IO work, so override the usesTokens value.
Expand Down Expand Up @@ -214,7 +214,7 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID)
settings: sgc.settings,
kvRequester: storeReq,
perWorkTokenEstimator: makeStorePerWorkTokenEstimator(),
diskBandwidthLimiter: makeDiskBandwidthLimiter(),
diskBandwidthLimiter: newDiskBandwidthLimiter(),
kvGranter: kvg,
l0CompactedBytes: sgc.l0CompactedBytes,
l0TokensProduced: sgc.l0TokensProduced,
Expand Down Expand Up @@ -987,9 +987,9 @@ func (coord *GrantCoordinator) SafeFormat(s redact.SafePrinter, _ rune) {
case *slotGranter:
s.Printf("%s%s: used: %d, total: %d", curSep, kind, g.usedSlots, g.totalSlots)
case *kvStoreTokenGranter:
s.Printf(" io-avail: %d(%d), elastic-disk-bw-tokens-avail: %d", g.coordMu.availableIOTokens[admissionpb.RegularWorkClass],
s.Printf(" io-avail: %d(%d), disk-write-tokens-avail: %d", g.coordMu.availableIOTokens[admissionpb.RegularWorkClass],
g.coordMu.availableIOTokens[admissionpb.ElasticWorkClass],
g.coordMu.elasticDiskBWTokensAvailable)
g.coordMu.diskTokensAvailable.writeByteTokens)
}
case SQLStatementLeafStartWork, SQLStatementRootStartWork:
if coord.granters[i] != nil {
Expand Down
89 changes: 58 additions & 31 deletions pkg/util/admission/granter.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,10 @@ type kvStoreTokenGranter struct {
// blocks if availableElasticIOTokens <= 0.
availableIOTokens [admissionpb.NumWorkClasses]int64
elasticIOTokensUsedByElastic int64
// TODO(aaditya): add support for read/IOPS tokens.
// Disk bandwidth tokens.
elasticDiskBWTokensAvailable int64

diskBWTokensUsed [admissionpb.NumWorkClasses]int64
diskTokensAvailable diskTokens
diskTokensUsed [admissionpb.NumWorkClasses]diskTokens
}

ioTokensExhaustedDurationMetric [admissionpb.NumWorkClasses]*metric.Counter
Expand All @@ -320,7 +320,7 @@ type kvStoreTokenGranter struct {
tokensTakenMetric *metric.Counter

// Estimation models.
l0WriteLM, l0IngestLM, ingestLM tokensLinearModel
l0WriteLM, l0IngestLM, ingestLM, writeAmpLM tokensLinearModel
}

var _ granterWithLockedCalls = &kvStoreTokenGranter{}
Expand Down Expand Up @@ -397,21 +397,31 @@ func (sg *kvStoreTokenGranter) tryGetLocked(count int64, demuxHandle int8) grant
// queue, and since the only case where tryGetLocked is called for
// elasticWorkClass is when the queue is empty, this case should be rare
// (and not cause a performance isolation failure).

// NB: For disk write tokens, we apply `writeAmpLM` to account for write
// amplification. This model is used again later in
// `storeReplicatedWorkAdmittedLocked()`. There is an obvious gap in
// accounting here, since the model could change between the two calls for the
// same request. It is acceptable to do so because the second call in
// `storeReplicatedWorkAdmittedLocked()` is a better estimate of actual disk
// writes, and we adjust tokens accordingly there.
adjustedDiskWriteTokens := sg.writeAmpLM.applyLinearModel(count)
switch wc {
case admissionpb.RegularWorkClass:
if sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] > 0 {
sg.subtractTokensLocked(count, count, false)
sg.coordMu.diskBWTokensUsed[wc] += count
sg.coordMu.diskTokensAvailable.writeByteTokens -= adjustedDiskWriteTokens
sg.coordMu.diskTokensUsed[wc].writeByteTokens += adjustedDiskWriteTokens
return grantSuccess
}
case admissionpb.ElasticWorkClass:
if sg.coordMu.elasticDiskBWTokensAvailable > 0 &&
if sg.coordMu.diskTokensAvailable.writeByteTokens > 0 &&
sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] > 0 &&
sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] > 0 {
sg.coordMu.elasticDiskBWTokensAvailable -= count
sg.subtractTokensLocked(count, count, false)
sg.coordMu.elasticIOTokensUsedByElastic += count
sg.coordMu.diskBWTokensUsed[wc] += count
sg.coordMu.diskTokensAvailable.writeByteTokens -= adjustedDiskWriteTokens
sg.coordMu.diskTokensUsed[wc].writeByteTokens += adjustedDiskWriteTokens
return grantSuccess
}
}
Expand All @@ -428,11 +438,12 @@ func (sg *kvStoreTokenGranter) returnGrantLocked(count int64, demuxHandle int8)
// Return count tokens to the "IO tokens".
sg.subtractTokensLocked(-count, -count, false)
if wc == admissionpb.ElasticWorkClass {
// Return count tokens to the elastic disk bandwidth tokens.
sg.coordMu.elasticDiskBWTokensAvailable += count
sg.coordMu.elasticIOTokensUsedByElastic -= count
}
sg.coordMu.diskBWTokensUsed[wc] -= count
// Return tokens to disk bandwidth bucket.
diskTokenCount := sg.writeAmpLM.applyLinearModel(count)
sg.coordMu.diskTokensAvailable.writeByteTokens += diskTokenCount
sg.coordMu.diskTokensUsed[wc].writeByteTokens -= diskTokenCount
}

func (sg *kvStoreTokenGranter) tookWithoutPermission(workClass admissionpb.WorkClass, count int64) {
Expand All @@ -444,10 +455,11 @@ func (sg *kvStoreTokenGranter) tookWithoutPermissionLocked(count int64, demuxHan
wc := admissionpb.WorkClass(demuxHandle)
sg.subtractTokensLocked(count, count, false)
if wc == admissionpb.ElasticWorkClass {
sg.coordMu.elasticDiskBWTokensAvailable -= count
sg.coordMu.elasticIOTokensUsedByElastic += count
}
sg.coordMu.diskBWTokensUsed[wc] += count
diskTokenCount := sg.writeAmpLM.applyLinearModel(count)
sg.coordMu.diskTokensAvailable.writeByteTokens -= diskTokenCount
sg.coordMu.diskTokensUsed[wc].writeByteTokens += diskTokenCount
}

// subtractTokensLocked is a helper function that subtracts count tokens (count
Expand Down Expand Up @@ -497,7 +509,7 @@ func (sg *kvStoreTokenGranter) requesterHasWaitingRequests() bool {
// tryGrantLocked implements granterWithLockedCalls.
func (sg *kvStoreTokenGranter) tryGrantLocked(grantChainID grantChainID) grantResult {
// First try granting to regular requester.
for wc := range sg.coordMu.diskBWTokensUsed {
for wc := range sg.coordMu.diskTokensUsed {
req := sg.regularRequester
if admissionpb.WorkClass(wc) == admissionpb.ElasticWorkClass {
req = sg.elasticRequester
Expand Down Expand Up @@ -534,10 +546,10 @@ func (sg *kvStoreTokenGranter) tryGrantLocked(grantChainID grantChainID) grantRe
func (sg *kvStoreTokenGranter) setAvailableTokens(
ioTokens int64,
elasticIOTokens int64,
elasticDiskBandwidthTokens int64,
elasticDiskWriteTokens int64,
ioTokenCapacity int64,
elasticIOTokenCapacity int64,
elasticDiskBandwidthTokensCapacity int64,
elasticDiskWriteTokensCapacity int64,
lastTick bool,
) (ioTokensUsed int64, ioTokensUsedByElasticWork int64) {
sg.coord.mu.Lock()
Expand Down Expand Up @@ -566,6 +578,9 @@ func (sg *kvStoreTokenGranter) setAvailableTokens(
if lastTick {
sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] =
max(sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass], 0)
// Same for disk write tokens.
sg.coordMu.diskTokensAvailable.writeByteTokens =
max(sg.coordMu.diskTokensAvailable.writeByteTokens, 0)
// It is possible that availableIOTokens[RegularWorkClass] is negative, in
// which case we want availableIOTokens[ElasticWorkClass] to not exceed it.
sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] =
Expand All @@ -577,34 +592,40 @@ func (sg *kvStoreTokenGranter) setAvailableTokens(
}
sg.startingIOTokens = sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass]

sg.coordMu.elasticDiskBWTokensAvailable += elasticDiskBandwidthTokens
if sg.coordMu.elasticDiskBWTokensAvailable > elasticDiskBandwidthTokensCapacity {
sg.coordMu.elasticDiskBWTokensAvailable = elasticDiskBandwidthTokensCapacity
sg.coordMu.diskTokensAvailable.writeByteTokens += elasticDiskWriteTokens
if sg.coordMu.diskTokensAvailable.writeByteTokens > elasticDiskWriteTokensCapacity {
sg.coordMu.diskTokensAvailable.writeByteTokens = elasticDiskWriteTokensCapacity
}

return ioTokensUsed, ioTokensUsedByElasticWork
}

// getDiskTokensUsedAndResetLocked implements granterWithIOTokens.
func (sg *kvStoreTokenGranter) getDiskTokensUsedAndReset() [admissionpb.NumWorkClasses]int64 {
func (sg *kvStoreTokenGranter) getDiskTokensUsedAndReset() (
usedTokens [admissionpb.NumWorkClasses]diskTokens,
) {
sg.coord.mu.Lock()
defer sg.coord.mu.Unlock()
result := sg.coordMu.diskBWTokensUsed
for i := range sg.coordMu.diskBWTokensUsed {
sg.coordMu.diskBWTokensUsed[i] = 0
for i := 0; i < int(admissionpb.NumWorkClasses); i++ {
usedTokens[i] = sg.coordMu.diskTokensUsed[i]
sg.coordMu.diskTokensUsed[i] = diskTokens{}
}
return result
return usedTokens
}

// setAdmittedModelsLocked implements granterWithIOTokens.
func (sg *kvStoreTokenGranter) setLinearModels(
l0WriteLM tokensLinearModel, l0IngestLM tokensLinearModel, ingestLM tokensLinearModel,
l0WriteLM tokensLinearModel,
l0IngestLM tokensLinearModel,
ingestLM tokensLinearModel,
writeAmpLM tokensLinearModel,
) {
sg.coord.mu.Lock()
defer sg.coord.mu.Unlock()
sg.l0WriteLM = l0WriteLM
sg.l0IngestLM = l0IngestLM
sg.ingestLM = ingestLM
sg.writeAmpLM = writeAmpLM
}

func (sg *kvStoreTokenGranter) storeReplicatedWorkAdmittedLocked(
Expand All @@ -616,7 +637,7 @@ func (sg *kvStoreTokenGranter) storeReplicatedWorkAdmittedLocked(
// Reminder: coord.mu protects the state in the kvStoreTokenGranter.
exhaustedFunc := func() bool {
return sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] <= 0 ||
(wc == admissionpb.ElasticWorkClass && (sg.coordMu.elasticDiskBWTokensAvailable <= 0 ||
(wc == admissionpb.ElasticWorkClass && (sg.coordMu.diskTokensAvailable.writeByteTokens <= 0 ||
sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] <= 0))
}
wasExhausted := exhaustedFunc()
Expand All @@ -625,14 +646,20 @@ func (sg *kvStoreTokenGranter) storeReplicatedWorkAdmittedLocked(
actualL0Tokens := actualL0WriteTokens + actualL0IngestTokens
additionalL0TokensNeeded := actualL0Tokens - originalTokens
sg.subtractTokensLocked(additionalL0TokensNeeded, additionalL0TokensNeeded, false)
actualIngestTokens := sg.ingestLM.applyLinearModel(admittedInfo.IngestedBytes)
additionalDiskBWTokensNeeded := (actualL0WriteTokens + actualIngestTokens) - originalTokens
if wc == admissionpb.ElasticWorkClass {
sg.coordMu.elasticDiskBWTokensAvailable -= additionalDiskBWTokensNeeded
sg.coordMu.elasticIOTokensUsedByElastic += additionalL0TokensNeeded
}
sg.coordMu.diskBWTokensUsed[wc] += additionalDiskBWTokensNeeded
if canGrantAnother && (additionalL0TokensNeeded < 0 || additionalDiskBWTokensNeeded < 0) {

// Adjust disk write tokens.
ingestIntoLSM := sg.ingestLM.applyLinearModel(admittedInfo.IngestedBytes)
totalBytesIntoLSM := actualL0WriteTokens + ingestIntoLSM
actualDiskWriteTokens := sg.writeAmpLM.applyLinearModel(totalBytesIntoLSM)
originalDiskTokens := sg.writeAmpLM.applyLinearModel(originalTokens)
additionalDiskWriteTokens := actualDiskWriteTokens - originalDiskTokens
sg.coordMu.diskTokensAvailable.writeByteTokens -= additionalDiskWriteTokens
sg.coordMu.diskTokensUsed[wc].writeByteTokens += additionalDiskWriteTokens

if canGrantAnother && (additionalL0TokensNeeded < 0) {
isExhausted := exhaustedFunc()
if (wasExhausted && !isExhausted) || sg.coord.knobs.AlwaysTryGrantWhenAdmitted {
sg.coord.tryGrantLocked()
Expand Down
Loading

0 comments on commit 65b4ad1

Please sign in to comment.