Skip to content

Commit

Permalink
admission: account for write-amp in disk bandwidth limiter
Browse files Browse the repository at this point in the history
Previously, we would calculate elastic disk bandwidth tokens using
arbitrary load thresholds and an estimate on incoming bytes into the LSM
through flushes and ingestions. This calculation lacked accounting for
write amplification in the LSM.

This patch simplifies the disk bandwidth limiter to remove the disk load
watcher and simply adjust tokens using the known provisioned disk
bandwidth. For token deducation, we create a write-amp model that is a
relationship between incoming LSM bytes to actual disk writes.

The token granting semantics are as follows:
- elastic writes: deduct tokens, and wait for positive count in bucket.
- regular writes: deduct tokens, but proceed even with no tokens
  available.

The requested write bytes are "inflated" using the estimated write
amplification to account for async compactions in the LSM.

This patch also lays the framework for future integrations where we can
account for range snapshot ingestions separately as those don't incur
the same write amplification as flushed LSM bytes do.

Informs: #86857

Release note: None
  • Loading branch information
aadityasondhi committed Sep 17, 2024
1 parent 74d2173 commit 55f1edd
Show file tree
Hide file tree
Showing 16 changed files with 747 additions and 848 deletions.
12 changes: 6 additions & 6 deletions pkg/util/admission/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,21 +274,21 @@ type granterWithIOTokens interface {
// negative, though that will be rare, since it is possible for tokens to be
// returned.
setAvailableTokens(
ioTokens int64, elasticIOTokens int64, elasticDiskBandwidthTokens int64,
ioTokensCapacity int64, elasticIOTokenCapacity int64, elasticDiskBandwidthTokensCapacity int64,
ioTokens int64, elasticIOTokens int64, elasticDiskWriteTokens int64,
ioTokensCapacity int64, elasticIOTokenCapacity int64, elasticDiskWriteTokensCapacity int64,
lastTick bool,
) (tokensUsed int64, tokensUsedByElasticWork int64)
// getDiskTokensUsedAndReset returns the disk bandwidth tokens used
// since the last such call.
getDiskTokensUsedAndReset() [admissionpb.NumWorkClasses]int64
// getDiskTokensUsedAndReset returns the disk bandwidth tokens used since the
// last such call.
getDiskTokensUsedAndReset() [admissionpb.NumWorkClasses]diskTokens
// setLinearModels supplies the models to use when storeWriteDone or
// storeReplicatedWorkAdmittedLocked is called, to adjust token consumption.
// Note that these models are not used for token adjustment at admission
// time -- that is handled by StoreWorkQueue and is not in scope of this
// granter. This asymmetry is due to the need to use all the functionality
// of WorkQueue at admission time. See the long explanatory comment at the
// beginning of store_token_estimation.go, regarding token estimation.
setLinearModels(l0WriteLM, l0IngestLM, ingestLM tokensLinearModel)
setLinearModels(l0WriteLM, l0IngestLM, ingestLM, writeAmpLM tokensLinearModel)
}

// granterWithStoreReplicatedWorkAdmitted is used to abstract
Expand Down
367 changes: 95 additions & 272 deletions pkg/util/admission/disk_bandwidth.go

Large diffs are not rendered by default.

72 changes: 18 additions & 54 deletions pkg/util/admission/disk_bandwidth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,89 +11,53 @@
package admission

import (
"context"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/redact"
)

func TestDiskLoadWatcher(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var dlw diskLoadWatcher
watcherToString := func() string {
level := dlw.getLoadLevel()
return fmt.Sprintf("%s\nload-level: %s", redact.Sprint(dlw),
diskLoadLevelString(level))
}

datadriven.RunTest(t, datapathutils.TestDataPath(t, "disk_load_watcher"),
func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "init":
dlw = diskLoadWatcher{}
return watcherToString()

case "interval-info":
var readBandwidth, writeBandwidth, provisionedBandwidth int
d.ScanArgs(t, "read-bw", &readBandwidth)
d.ScanArgs(t, "write-bw", &writeBandwidth)
d.ScanArgs(t, "provisioned-bw", &provisionedBandwidth)
dlw.setIntervalInfo(intervalDiskLoadInfo{
readBandwidth: int64(readBandwidth),
writeBandwidth: int64(writeBandwidth),
provisionedBandwidth: int64(provisionedBandwidth),
})
return watcherToString()

default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
}
})
}

func TestDiskBandwidthLimiter(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var dbl diskBandwidthLimiter
var dbl *diskBandwidthLimiter
dblToString := func() string {
return string(redact.Sprint(&dbl))
return string(redact.Sprint(dbl))
}

datadriven.RunTest(t, datapathutils.TestDataPath(t, "disk_bandwidth_limiter"),
func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "init":
dbl = makeDiskBandwidthLimiter()
dbl = newDiskBandwidthLimiter()
return dblToString()

case "compute":
var readBandwidth, writeBandwidth, provisionedBandwidth int
d.ScanArgs(t, "read-bw", &readBandwidth)
d.ScanArgs(t, "write-bw", &writeBandwidth)
d.ScanArgs(t, "provisioned-bw", &provisionedBandwidth)
var readBytes, writeBytes, intProvisionedBytes int
d.ScanArgs(t, "int-read-bytes", &readBytes)
d.ScanArgs(t, "int-write-bytes", &writeBytes)
d.ScanArgs(t, "int-provisioned-bytes", &intProvisionedBytes)
diskLoad := intervalDiskLoadInfo{
readBandwidth: int64(readBandwidth),
writeBandwidth: int64(writeBandwidth),
provisionedBandwidth: int64(provisionedBandwidth),
intReadBytes: int64(readBytes),
intWriteBytes: int64(writeBytes),
intProvisionedDiskBytes: int64(intProvisionedBytes),
elasticBandwidthMaxUtil: 0.9,
}
var incomingBytes, regularTokensUsed, elasticTokensUsed int
d.ScanArgs(t, "incoming-bytes", &incomingBytes)
var regularTokensUsed, elasticTokensUsed int64
d.ScanArgs(t, "regular-tokens-used", &regularTokensUsed)
d.ScanArgs(t, "elastic-tokens-used", &elasticTokensUsed)
lsmInfo := intervalLSMInfo{
incomingBytes: int64(incomingBytes),
regularTokensUsed: int64(regularTokensUsed),
elasticTokensUsed: int64(elasticTokensUsed),
usedTokens := [admissionpb.NumWorkClasses]diskTokens{
{writeByteTokens: regularTokensUsed}, // regular
{writeByteTokens: elasticTokensUsed}, // elastic
}
dbl.computeElasticTokens(context.Background(), diskLoad, lsmInfo)

dbl.computeElasticTokens(diskLoad, usedTokens)
return dblToString()

default:
Expand Down
8 changes: 4 additions & 4 deletions pkg/util/admission/grant_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID)
}
kvg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] = unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval()
kvg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] = kvg.coordMu.availableIOTokens[admissionpb.RegularWorkClass]
kvg.coordMu.elasticDiskBWTokensAvailable = unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval()
kvg.coordMu.diskTokensAvailable.writeByteTokens = unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval()

opts := makeWorkQueueOptions(KVWork)
// This is IO work, so override the usesTokens value.
Expand Down Expand Up @@ -214,7 +214,7 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID)
settings: sgc.settings,
kvRequester: storeReq,
perWorkTokenEstimator: makeStorePerWorkTokenEstimator(),
diskBandwidthLimiter: makeDiskBandwidthLimiter(),
diskBandwidthLimiter: newDiskBandwidthLimiter(),
kvGranter: kvg,
l0CompactedBytes: sgc.l0CompactedBytes,
l0TokensProduced: sgc.l0TokensProduced,
Expand Down Expand Up @@ -987,9 +987,9 @@ func (coord *GrantCoordinator) SafeFormat(s redact.SafePrinter, _ rune) {
case *slotGranter:
s.Printf("%s%s: used: %d, total: %d", curSep, kind, g.usedSlots, g.totalSlots)
case *kvStoreTokenGranter:
s.Printf(" io-avail: %d(%d), elastic-disk-bw-tokens-avail: %d", g.coordMu.availableIOTokens[admissionpb.RegularWorkClass],
s.Printf(" io-avail: %d(%d), disk-write-tokens-avail: %d", g.coordMu.availableIOTokens[admissionpb.RegularWorkClass],
g.coordMu.availableIOTokens[admissionpb.ElasticWorkClass],
g.coordMu.elasticDiskBWTokensAvailable)
g.coordMu.diskTokensAvailable.writeByteTokens)
}
case SQLStatementLeafStartWork, SQLStatementRootStartWork:
if coord.granters[i] != nil {
Expand Down
97 changes: 66 additions & 31 deletions pkg/util/admission/granter.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,10 @@ type kvStoreTokenGranter struct {
// blocks if availableElasticIOTokens <= 0.
availableIOTokens [admissionpb.NumWorkClasses]int64
elasticIOTokensUsedByElastic int64
// TODO(aaditya): add support for read/IOPS tokens.
// Disk bandwidth tokens.
elasticDiskBWTokensAvailable int64

diskBWTokensUsed [admissionpb.NumWorkClasses]int64
diskTokensAvailable diskTokens
diskTokensUsed [admissionpb.NumWorkClasses]diskTokens
}

ioTokensExhaustedDurationMetric [admissionpb.NumWorkClasses]*metric.Counter
Expand All @@ -320,7 +320,7 @@ type kvStoreTokenGranter struct {
tokensTakenMetric *metric.Counter

// Estimation models.
l0WriteLM, l0IngestLM, ingestLM tokensLinearModel
l0WriteLM, l0IngestLM, ingestLM, writeAmpLM tokensLinearModel
}

var _ granterWithLockedCalls = &kvStoreTokenGranter{}
Expand Down Expand Up @@ -397,21 +397,42 @@ func (sg *kvStoreTokenGranter) tryGetLocked(count int64, demuxHandle int8) grant
// queue, and since the only case where tryGetLocked is called for
// elasticWorkClass is when the queue is empty, this case should be rare
// (and not cause a performance isolation failure).

// NB: For disk write tokens, we apply `writeAmpLM` to account for write
// amplification. This model is used again later in
// `storeReplicatedWorkAdmittedLocked()`. There is an obvious gap in
// accounting here, since the model could change between the two calls for the
// same request. For example:
// (1) We have a request that requests 50 tokens, and write-amp LM is
// currently 10.0x + 1. We will deduct 501 tokens for disk writes.
// (2) Before we adjust the actual write bytes used by the write request, the
// write-amp model is updated to 5.0x + 1.
// (3) In `storeReplicatedWorkAdmittedLocked()`, we learn that the request
// used 200 actual bytes. Now we will apply the new LM to get 251 tokens
// initially deducted, and apply the LM for 1001 actual bytes used. We
// will now deduct 750 additional tokens from the bucket. Now we have
// deducted 1251 tokens rather than 1001.
// This can also go the other way, where we deduct fewer tokens than actually
// needed. We are generally okay with this since the model changes
// infrequently (every 15s), and the disk bandwidth limiter is designed to
// generally under admit and only pace elastic work.
adjustedDiskWriteTokens := sg.writeAmpLM.applyLinearModel(count)
switch wc {
case admissionpb.RegularWorkClass:
if sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] > 0 {
sg.subtractTokensLocked(count, count, false)
sg.coordMu.diskBWTokensUsed[wc] += count
sg.coordMu.diskTokensAvailable.writeByteTokens -= adjustedDiskWriteTokens
sg.coordMu.diskTokensUsed[wc].writeByteTokens += adjustedDiskWriteTokens
return grantSuccess
}
case admissionpb.ElasticWorkClass:
if sg.coordMu.elasticDiskBWTokensAvailable > 0 &&
if sg.coordMu.diskTokensAvailable.writeByteTokens > 0 &&
sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] > 0 &&
sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] > 0 {
sg.coordMu.elasticDiskBWTokensAvailable -= count
sg.subtractTokensLocked(count, count, false)
sg.coordMu.elasticIOTokensUsedByElastic += count
sg.coordMu.diskBWTokensUsed[wc] += count
sg.coordMu.diskTokensAvailable.writeByteTokens -= adjustedDiskWriteTokens
sg.coordMu.diskTokensUsed[wc].writeByteTokens += adjustedDiskWriteTokens
return grantSuccess
}
}
Expand All @@ -428,11 +449,12 @@ func (sg *kvStoreTokenGranter) returnGrantLocked(count int64, demuxHandle int8)
// Return count tokens to the "IO tokens".
sg.subtractTokensLocked(-count, -count, false)
if wc == admissionpb.ElasticWorkClass {
// Return count tokens to the elastic disk bandwidth tokens.
sg.coordMu.elasticDiskBWTokensAvailable += count
sg.coordMu.elasticIOTokensUsedByElastic -= count
}
sg.coordMu.diskBWTokensUsed[wc] -= count
// Return tokens to disk bandwidth bucket.
diskTokenCount := sg.writeAmpLM.applyLinearModel(count)
sg.coordMu.diskTokensAvailable.writeByteTokens += diskTokenCount
sg.coordMu.diskTokensUsed[wc].writeByteTokens -= diskTokenCount
}

func (sg *kvStoreTokenGranter) tookWithoutPermission(workClass admissionpb.WorkClass, count int64) {
Expand All @@ -444,10 +466,11 @@ func (sg *kvStoreTokenGranter) tookWithoutPermissionLocked(count int64, demuxHan
wc := admissionpb.WorkClass(demuxHandle)
sg.subtractTokensLocked(count, count, false)
if wc == admissionpb.ElasticWorkClass {
sg.coordMu.elasticDiskBWTokensAvailable -= count
sg.coordMu.elasticIOTokensUsedByElastic += count
}
sg.coordMu.diskBWTokensUsed[wc] += count
diskTokenCount := sg.writeAmpLM.applyLinearModel(count)
sg.coordMu.diskTokensAvailable.writeByteTokens -= diskTokenCount
sg.coordMu.diskTokensUsed[wc].writeByteTokens += diskTokenCount
}

// subtractTokensLocked is a helper function that subtracts count tokens (count
Expand Down Expand Up @@ -497,7 +520,7 @@ func (sg *kvStoreTokenGranter) requesterHasWaitingRequests() bool {
// tryGrantLocked implements granterWithLockedCalls.
func (sg *kvStoreTokenGranter) tryGrantLocked(grantChainID grantChainID) grantResult {
// First try granting to regular requester.
for wc := range sg.coordMu.diskBWTokensUsed {
for wc := range sg.coordMu.diskTokensUsed {
req := sg.regularRequester
if admissionpb.WorkClass(wc) == admissionpb.ElasticWorkClass {
req = sg.elasticRequester
Expand Down Expand Up @@ -534,10 +557,10 @@ func (sg *kvStoreTokenGranter) tryGrantLocked(grantChainID grantChainID) grantRe
func (sg *kvStoreTokenGranter) setAvailableTokens(
ioTokens int64,
elasticIOTokens int64,
elasticDiskBandwidthTokens int64,
elasticDiskWriteTokens int64,
ioTokenCapacity int64,
elasticIOTokenCapacity int64,
elasticDiskBandwidthTokensCapacity int64,
elasticDiskWriteTokensCapacity int64,
lastTick bool,
) (ioTokensUsed int64, ioTokensUsedByElasticWork int64) {
sg.coord.mu.Lock()
Expand Down Expand Up @@ -577,34 +600,40 @@ func (sg *kvStoreTokenGranter) setAvailableTokens(
}
sg.startingIOTokens = sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass]

sg.coordMu.elasticDiskBWTokensAvailable += elasticDiskBandwidthTokens
if sg.coordMu.elasticDiskBWTokensAvailable > elasticDiskBandwidthTokensCapacity {
sg.coordMu.elasticDiskBWTokensAvailable = elasticDiskBandwidthTokensCapacity
sg.coordMu.diskTokensAvailable.writeByteTokens += elasticDiskWriteTokens
if sg.coordMu.diskTokensAvailable.writeByteTokens > elasticDiskWriteTokensCapacity {
sg.coordMu.diskTokensAvailable.writeByteTokens = elasticDiskWriteTokensCapacity
}

return ioTokensUsed, ioTokensUsedByElasticWork
}

// getDiskTokensUsedAndResetLocked implements granterWithIOTokens.
func (sg *kvStoreTokenGranter) getDiskTokensUsedAndReset() [admissionpb.NumWorkClasses]int64 {
func (sg *kvStoreTokenGranter) getDiskTokensUsedAndReset() (
usedTokens [admissionpb.NumWorkClasses]diskTokens,
) {
sg.coord.mu.Lock()
defer sg.coord.mu.Unlock()
result := sg.coordMu.diskBWTokensUsed
for i := range sg.coordMu.diskBWTokensUsed {
sg.coordMu.diskBWTokensUsed[i] = 0
for i := 0; i < int(admissionpb.NumWorkClasses); i++ {
usedTokens[i] = sg.coordMu.diskTokensUsed[i]
sg.coordMu.diskTokensUsed[i] = diskTokens{}
}
return result
return usedTokens
}

// setAdmittedModelsLocked implements granterWithIOTokens.
func (sg *kvStoreTokenGranter) setLinearModels(
l0WriteLM tokensLinearModel, l0IngestLM tokensLinearModel, ingestLM tokensLinearModel,
l0WriteLM tokensLinearModel,
l0IngestLM tokensLinearModel,
ingestLM tokensLinearModel,
writeAmpLM tokensLinearModel,
) {
sg.coord.mu.Lock()
defer sg.coord.mu.Unlock()
sg.l0WriteLM = l0WriteLM
sg.l0IngestLM = l0IngestLM
sg.ingestLM = ingestLM
sg.writeAmpLM = writeAmpLM
}

func (sg *kvStoreTokenGranter) storeReplicatedWorkAdmittedLocked(
Expand All @@ -616,7 +645,7 @@ func (sg *kvStoreTokenGranter) storeReplicatedWorkAdmittedLocked(
// Reminder: coord.mu protects the state in the kvStoreTokenGranter.
exhaustedFunc := func() bool {
return sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] <= 0 ||
(wc == admissionpb.ElasticWorkClass && (sg.coordMu.elasticDiskBWTokensAvailable <= 0 ||
(wc == admissionpb.ElasticWorkClass && (sg.coordMu.diskTokensAvailable.writeByteTokens <= 0 ||
sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] <= 0))
}
wasExhausted := exhaustedFunc()
Expand All @@ -625,14 +654,20 @@ func (sg *kvStoreTokenGranter) storeReplicatedWorkAdmittedLocked(
actualL0Tokens := actualL0WriteTokens + actualL0IngestTokens
additionalL0TokensNeeded := actualL0Tokens - originalTokens
sg.subtractTokensLocked(additionalL0TokensNeeded, additionalL0TokensNeeded, false)
actualIngestTokens := sg.ingestLM.applyLinearModel(admittedInfo.IngestedBytes)
additionalDiskBWTokensNeeded := (actualL0WriteTokens + actualIngestTokens) - originalTokens
if wc == admissionpb.ElasticWorkClass {
sg.coordMu.elasticDiskBWTokensAvailable -= additionalDiskBWTokensNeeded
sg.coordMu.elasticIOTokensUsedByElastic += additionalL0TokensNeeded
}
sg.coordMu.diskBWTokensUsed[wc] += additionalDiskBWTokensNeeded
if canGrantAnother && (additionalL0TokensNeeded < 0 || additionalDiskBWTokensNeeded < 0) {

// Adjust disk write tokens.
ingestIntoLSM := sg.ingestLM.applyLinearModel(admittedInfo.IngestedBytes)
totalBytesIntoLSM := actualL0WriteTokens + ingestIntoLSM
actualDiskWriteTokens := sg.writeAmpLM.applyLinearModel(totalBytesIntoLSM)
originalDiskTokens := sg.writeAmpLM.applyLinearModel(originalTokens)
additionalDiskWriteTokens := actualDiskWriteTokens - originalDiskTokens
sg.coordMu.diskTokensAvailable.writeByteTokens -= additionalDiskWriteTokens
sg.coordMu.diskTokensUsed[wc].writeByteTokens += additionalDiskWriteTokens

if canGrantAnother && (additionalL0TokensNeeded < 0) {
isExhausted := exhaustedFunc()
if (wasExhausted && !isExhausted) || sg.coord.knobs.AlwaysTryGrantWhenAdmitted {
sg.coord.tryGrantLocked()
Expand Down
Loading

0 comments on commit 55f1edd

Please sign in to comment.