Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

admission: account for write-amp in disk bandwidth limiter #129005

Merged
merged 1 commit into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions pkg/util/admission/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,21 +274,21 @@ type granterWithIOTokens interface {
// negative, though that will be rare, since it is possible for tokens to be
// returned.
setAvailableTokens(
ioTokens int64, elasticIOTokens int64, elasticDiskBandwidthTokens int64,
ioTokensCapacity int64, elasticIOTokenCapacity int64, elasticDiskBandwidthTokensCapacity int64,
ioTokens int64, elasticIOTokens int64, elasticDiskWriteTokens int64,
ioTokensCapacity int64, elasticIOTokenCapacity int64, elasticDiskWriteTokensCapacity int64,
lastTick bool,
) (tokensUsed int64, tokensUsedByElasticWork int64)
// getDiskTokensUsedAndReset returns the disk bandwidth tokens used
// since the last such call.
getDiskTokensUsedAndReset() [admissionpb.NumWorkClasses]int64
// getDiskTokensUsedAndReset returns the disk bandwidth tokens used since the
// last such call.
getDiskTokensUsedAndReset() [admissionpb.NumWorkClasses]diskTokens
// setLinearModels supplies the models to use when storeWriteDone or
// storeReplicatedWorkAdmittedLocked is called, to adjust token consumption.
// Note that these models are not used for token adjustment at admission
// time -- that is handled by StoreWorkQueue and is not in scope of this
// granter. This asymmetry is due to the need to use all the functionality
// of WorkQueue at admission time. See the long explanatory comment at the
// beginning of store_token_estimation.go, regarding token estimation.
setLinearModels(l0WriteLM, l0IngestLM, ingestLM tokensLinearModel)
setLinearModels(l0WriteLM, l0IngestLM, ingestLM, writeAmpLM tokensLinearModel)
}

// granterWithStoreReplicatedWorkAdmitted is used to abstract
Expand Down
367 changes: 95 additions & 272 deletions pkg/util/admission/disk_bandwidth.go

Large diffs are not rendered by default.

72 changes: 18 additions & 54 deletions pkg/util/admission/disk_bandwidth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,89 +11,53 @@
package admission

import (
"context"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/redact"
)

func TestDiskLoadWatcher(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var dlw diskLoadWatcher
watcherToString := func() string {
level := dlw.getLoadLevel()
return fmt.Sprintf("%s\nload-level: %s", redact.Sprint(dlw),
diskLoadLevelString(level))
}

datadriven.RunTest(t, datapathutils.TestDataPath(t, "disk_load_watcher"),
func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "init":
dlw = diskLoadWatcher{}
return watcherToString()

case "interval-info":
var readBandwidth, writeBandwidth, provisionedBandwidth int
d.ScanArgs(t, "read-bw", &readBandwidth)
d.ScanArgs(t, "write-bw", &writeBandwidth)
d.ScanArgs(t, "provisioned-bw", &provisionedBandwidth)
dlw.setIntervalInfo(intervalDiskLoadInfo{
readBandwidth: int64(readBandwidth),
writeBandwidth: int64(writeBandwidth),
provisionedBandwidth: int64(provisionedBandwidth),
})
return watcherToString()

default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
}
})
}

func TestDiskBandwidthLimiter(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var dbl diskBandwidthLimiter
var dbl *diskBandwidthLimiter
dblToString := func() string {
return string(redact.Sprint(&dbl))
return string(redact.Sprint(dbl))
}

datadriven.RunTest(t, datapathutils.TestDataPath(t, "disk_bandwidth_limiter"),
func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "init":
dbl = makeDiskBandwidthLimiter()
dbl = newDiskBandwidthLimiter()
return dblToString()

case "compute":
var readBandwidth, writeBandwidth, provisionedBandwidth int
d.ScanArgs(t, "read-bw", &readBandwidth)
d.ScanArgs(t, "write-bw", &writeBandwidth)
d.ScanArgs(t, "provisioned-bw", &provisionedBandwidth)
var readBytes, writeBytes, intProvisionedBytes int
d.ScanArgs(t, "int-read-bytes", &readBytes)
d.ScanArgs(t, "int-write-bytes", &writeBytes)
d.ScanArgs(t, "int-provisioned-bytes", &intProvisionedBytes)
diskLoad := intervalDiskLoadInfo{
readBandwidth: int64(readBandwidth),
writeBandwidth: int64(writeBandwidth),
provisionedBandwidth: int64(provisionedBandwidth),
intReadBytes: int64(readBytes),
intWriteBytes: int64(writeBytes),
intProvisionedDiskBytes: int64(intProvisionedBytes),
elasticBandwidthMaxUtil: 0.9,
}
var incomingBytes, regularTokensUsed, elasticTokensUsed int
d.ScanArgs(t, "incoming-bytes", &incomingBytes)
var regularTokensUsed, elasticTokensUsed int64
d.ScanArgs(t, "regular-tokens-used", &regularTokensUsed)
d.ScanArgs(t, "elastic-tokens-used", &elasticTokensUsed)
lsmInfo := intervalLSMInfo{
incomingBytes: int64(incomingBytes),
regularTokensUsed: int64(regularTokensUsed),
elasticTokensUsed: int64(elasticTokensUsed),
usedTokens := [admissionpb.NumWorkClasses]diskTokens{
{writeByteTokens: regularTokensUsed}, // regular
{writeByteTokens: elasticTokensUsed}, // elastic
}
dbl.computeElasticTokens(context.Background(), diskLoad, lsmInfo)

dbl.computeElasticTokens(diskLoad, usedTokens)
return dblToString()

default:
Expand Down
8 changes: 4 additions & 4 deletions pkg/util/admission/grant_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID)
}
kvg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] = unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval()
kvg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] = kvg.coordMu.availableIOTokens[admissionpb.RegularWorkClass]
kvg.coordMu.elasticDiskBWTokensAvailable = unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval()
kvg.coordMu.diskTokensAvailable.writeByteTokens = unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval()

opts := makeWorkQueueOptions(KVWork)
// This is IO work, so override the usesTokens value.
Expand Down Expand Up @@ -214,7 +214,7 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID)
settings: sgc.settings,
kvRequester: storeReq,
perWorkTokenEstimator: makeStorePerWorkTokenEstimator(),
diskBandwidthLimiter: makeDiskBandwidthLimiter(),
diskBandwidthLimiter: newDiskBandwidthLimiter(),
kvGranter: kvg,
l0CompactedBytes: sgc.l0CompactedBytes,
l0TokensProduced: sgc.l0TokensProduced,
Expand Down Expand Up @@ -987,9 +987,9 @@ func (coord *GrantCoordinator) SafeFormat(s redact.SafePrinter, _ rune) {
case *slotGranter:
s.Printf("%s%s: used: %d, total: %d", curSep, kind, g.usedSlots, g.totalSlots)
case *kvStoreTokenGranter:
s.Printf(" io-avail: %d(%d), elastic-disk-bw-tokens-avail: %d", g.coordMu.availableIOTokens[admissionpb.RegularWorkClass],
s.Printf(" io-avail: %d(%d), disk-write-tokens-avail: %d", g.coordMu.availableIOTokens[admissionpb.RegularWorkClass],
g.coordMu.availableIOTokens[admissionpb.ElasticWorkClass],
g.coordMu.elasticDiskBWTokensAvailable)
g.coordMu.diskTokensAvailable.writeByteTokens)
}
case SQLStatementLeafStartWork, SQLStatementRootStartWork:
if coord.granters[i] != nil {
Expand Down
97 changes: 66 additions & 31 deletions pkg/util/admission/granter.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,10 @@ type kvStoreTokenGranter struct {
// blocks if availableElasticIOTokens <= 0.
availableIOTokens [admissionpb.NumWorkClasses]int64
elasticIOTokensUsedByElastic int64
// TODO(aaditya): add support for read/IOPS tokens.
// Disk bandwidth tokens.
elasticDiskBWTokensAvailable int64

diskBWTokensUsed [admissionpb.NumWorkClasses]int64
diskTokensAvailable diskTokens
diskTokensUsed [admissionpb.NumWorkClasses]diskTokens
}

ioTokensExhaustedDurationMetric [admissionpb.NumWorkClasses]*metric.Counter
Expand All @@ -320,7 +320,7 @@ type kvStoreTokenGranter struct {
tokensTakenMetric *metric.Counter

// Estimation models.
l0WriteLM, l0IngestLM, ingestLM tokensLinearModel
l0WriteLM, l0IngestLM, ingestLM, writeAmpLM tokensLinearModel
}

var _ granterWithLockedCalls = &kvStoreTokenGranter{}
Expand Down Expand Up @@ -397,21 +397,42 @@ func (sg *kvStoreTokenGranter) tryGetLocked(count int64, demuxHandle int8) grant
// queue, and since the only case where tryGetLocked is called for
// elasticWorkClass is when the queue is empty, this case should be rare
// (and not cause a performance isolation failure).

// NB: For disk write tokens, we apply `writeAmpLM` to account for write
// amplification. This model is used again later in
// `storeReplicatedWorkAdmittedLocked()`. There is an obvious gap in
// accounting here, since the model could change between the two calls for the
// same request. For example:
// (1) We have a request that requests 50 tokens, and write-amp LM is
// currently 10.0x + 1. We will deduct 501 tokens for disk writes.
// (2) Before we adjust the actual write bytes used by the write request, the
// write-amp model is updated to 5.0x + 1.
// (3) In `storeReplicatedWorkAdmittedLocked()`, we learn that the request
// used 200 actual bytes. Now we will apply the new LM to get 251 tokens
// initially deducted, and apply the LM for 1001 actual bytes used. We
// will now deduct 750 additional tokens from the bucket. Now we have
// deducted 1251 tokens rather than 1001.
// This can also go the other way, where we deduct fewer tokens than actually
// needed. We are generally okay with this since the model changes
// infrequently (every 15s), and the disk bandwidth limiter is designed to
// generally under admit and only pace elastic work.
adjustedDiskWriteTokens := sg.writeAmpLM.applyLinearModel(count)
switch wc {
case admissionpb.RegularWorkClass:
if sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] > 0 {
sg.subtractTokensLocked(count, count, false)
sg.coordMu.diskBWTokensUsed[wc] += count
sg.coordMu.diskTokensAvailable.writeByteTokens -= adjustedDiskWriteTokens
sg.coordMu.diskTokensUsed[wc].writeByteTokens += adjustedDiskWriteTokens
return grantSuccess
}
case admissionpb.ElasticWorkClass:
if sg.coordMu.elasticDiskBWTokensAvailable > 0 &&
if sg.coordMu.diskTokensAvailable.writeByteTokens > 0 &&
sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] > 0 &&
sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] > 0 {
sg.coordMu.elasticDiskBWTokensAvailable -= count
sg.subtractTokensLocked(count, count, false)
sg.coordMu.elasticIOTokensUsedByElastic += count
sg.coordMu.diskBWTokensUsed[wc] += count
sg.coordMu.diskTokensAvailable.writeByteTokens -= adjustedDiskWriteTokens
sg.coordMu.diskTokensUsed[wc].writeByteTokens += adjustedDiskWriteTokens
return grantSuccess
}
}
Expand All @@ -428,11 +449,12 @@ func (sg *kvStoreTokenGranter) returnGrantLocked(count int64, demuxHandle int8)
// Return count tokens to the "IO tokens".
sg.subtractTokensLocked(-count, -count, false)
if wc == admissionpb.ElasticWorkClass {
// Return count tokens to the elastic disk bandwidth tokens.
sg.coordMu.elasticDiskBWTokensAvailable += count
sg.coordMu.elasticIOTokensUsedByElastic -= count
}
sg.coordMu.diskBWTokensUsed[wc] -= count
// Return tokens to disk bandwidth bucket.
diskTokenCount := sg.writeAmpLM.applyLinearModel(count)
sg.coordMu.diskTokensAvailable.writeByteTokens += diskTokenCount
sg.coordMu.diskTokensUsed[wc].writeByteTokens -= diskTokenCount
}

func (sg *kvStoreTokenGranter) tookWithoutPermission(workClass admissionpb.WorkClass, count int64) {
Expand All @@ -444,10 +466,11 @@ func (sg *kvStoreTokenGranter) tookWithoutPermissionLocked(count int64, demuxHan
wc := admissionpb.WorkClass(demuxHandle)
sg.subtractTokensLocked(count, count, false)
if wc == admissionpb.ElasticWorkClass {
sg.coordMu.elasticDiskBWTokensAvailable -= count
sg.coordMu.elasticIOTokensUsedByElastic += count
}
sg.coordMu.diskBWTokensUsed[wc] += count
diskTokenCount := sg.writeAmpLM.applyLinearModel(count)
sg.coordMu.diskTokensAvailable.writeByteTokens -= diskTokenCount
sg.coordMu.diskTokensUsed[wc].writeByteTokens += diskTokenCount
}

// subtractTokensLocked is a helper function that subtracts count tokens (count
Expand Down Expand Up @@ -497,7 +520,7 @@ func (sg *kvStoreTokenGranter) requesterHasWaitingRequests() bool {
// tryGrantLocked implements granterWithLockedCalls.
func (sg *kvStoreTokenGranter) tryGrantLocked(grantChainID grantChainID) grantResult {
// First try granting to regular requester.
for wc := range sg.coordMu.diskBWTokensUsed {
for wc := range sg.coordMu.diskTokensUsed {
req := sg.regularRequester
if admissionpb.WorkClass(wc) == admissionpb.ElasticWorkClass {
req = sg.elasticRequester
Expand Down Expand Up @@ -534,10 +557,10 @@ func (sg *kvStoreTokenGranter) tryGrantLocked(grantChainID grantChainID) grantRe
func (sg *kvStoreTokenGranter) setAvailableTokens(
ioTokens int64,
elasticIOTokens int64,
elasticDiskBandwidthTokens int64,
elasticDiskWriteTokens int64,
ioTokenCapacity int64,
elasticIOTokenCapacity int64,
elasticDiskBandwidthTokensCapacity int64,
elasticDiskWriteTokensCapacity int64,
lastTick bool,
) (ioTokensUsed int64, ioTokensUsedByElasticWork int64) {
sg.coord.mu.Lock()
Expand Down Expand Up @@ -577,34 +600,40 @@ func (sg *kvStoreTokenGranter) setAvailableTokens(
}
sg.startingIOTokens = sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass]

sg.coordMu.elasticDiskBWTokensAvailable += elasticDiskBandwidthTokens
if sg.coordMu.elasticDiskBWTokensAvailable > elasticDiskBandwidthTokensCapacity {
sg.coordMu.elasticDiskBWTokensAvailable = elasticDiskBandwidthTokensCapacity
sg.coordMu.diskTokensAvailable.writeByteTokens += elasticDiskWriteTokens
if sg.coordMu.diskTokensAvailable.writeByteTokens > elasticDiskWriteTokensCapacity {
sg.coordMu.diskTokensAvailable.writeByteTokens = elasticDiskWriteTokensCapacity
}

return ioTokensUsed, ioTokensUsedByElasticWork
}

// getDiskTokensUsedAndResetLocked implements granterWithIOTokens.
func (sg *kvStoreTokenGranter) getDiskTokensUsedAndReset() [admissionpb.NumWorkClasses]int64 {
func (sg *kvStoreTokenGranter) getDiskTokensUsedAndReset() (
usedTokens [admissionpb.NumWorkClasses]diskTokens,
) {
sg.coord.mu.Lock()
defer sg.coord.mu.Unlock()
result := sg.coordMu.diskBWTokensUsed
for i := range sg.coordMu.diskBWTokensUsed {
sg.coordMu.diskBWTokensUsed[i] = 0
for i := 0; i < int(admissionpb.NumWorkClasses); i++ {
usedTokens[i] = sg.coordMu.diskTokensUsed[i]
sg.coordMu.diskTokensUsed[i] = diskTokens{}
}
return result
return usedTokens
}

// setAdmittedModelsLocked implements granterWithIOTokens.
func (sg *kvStoreTokenGranter) setLinearModels(
l0WriteLM tokensLinearModel, l0IngestLM tokensLinearModel, ingestLM tokensLinearModel,
l0WriteLM tokensLinearModel,
l0IngestLM tokensLinearModel,
ingestLM tokensLinearModel,
writeAmpLM tokensLinearModel,
) {
sg.coord.mu.Lock()
defer sg.coord.mu.Unlock()
sg.l0WriteLM = l0WriteLM
sg.l0IngestLM = l0IngestLM
sg.ingestLM = ingestLM
sg.writeAmpLM = writeAmpLM
}

func (sg *kvStoreTokenGranter) storeReplicatedWorkAdmittedLocked(
Expand All @@ -616,7 +645,7 @@ func (sg *kvStoreTokenGranter) storeReplicatedWorkAdmittedLocked(
// Reminder: coord.mu protects the state in the kvStoreTokenGranter.
exhaustedFunc := func() bool {
return sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] <= 0 ||
(wc == admissionpb.ElasticWorkClass && (sg.coordMu.elasticDiskBWTokensAvailable <= 0 ||
(wc == admissionpb.ElasticWorkClass && (sg.coordMu.diskTokensAvailable.writeByteTokens <= 0 ||
sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] <= 0))
}
wasExhausted := exhaustedFunc()
Expand All @@ -625,14 +654,20 @@ func (sg *kvStoreTokenGranter) storeReplicatedWorkAdmittedLocked(
actualL0Tokens := actualL0WriteTokens + actualL0IngestTokens
additionalL0TokensNeeded := actualL0Tokens - originalTokens
sg.subtractTokensLocked(additionalL0TokensNeeded, additionalL0TokensNeeded, false)
actualIngestTokens := sg.ingestLM.applyLinearModel(admittedInfo.IngestedBytes)
additionalDiskBWTokensNeeded := (actualL0WriteTokens + actualIngestTokens) - originalTokens
if wc == admissionpb.ElasticWorkClass {
sg.coordMu.elasticDiskBWTokensAvailable -= additionalDiskBWTokensNeeded
sg.coordMu.elasticIOTokensUsedByElastic += additionalL0TokensNeeded
}
sg.coordMu.diskBWTokensUsed[wc] += additionalDiskBWTokensNeeded
if canGrantAnother && (additionalL0TokensNeeded < 0 || additionalDiskBWTokensNeeded < 0) {

// Adjust disk write tokens.
ingestIntoLSM := sg.ingestLM.applyLinearModel(admittedInfo.IngestedBytes)
totalBytesIntoLSM := actualL0WriteTokens + ingestIntoLSM
actualDiskWriteTokens := sg.writeAmpLM.applyLinearModel(totalBytesIntoLSM)
originalDiskTokens := sg.writeAmpLM.applyLinearModel(originalTokens)
additionalDiskWriteTokens := actualDiskWriteTokens - originalDiskTokens
sg.coordMu.diskTokensAvailable.writeByteTokens -= additionalDiskWriteTokens
sg.coordMu.diskTokensUsed[wc].writeByteTokens += additionalDiskWriteTokens

if canGrantAnother && (additionalL0TokensNeeded < 0) {
isExhausted := exhaustedFunc()
if (wasExhausted && !isExhausted) || sg.coord.knobs.AlwaysTryGrantWhenAdmitted {
sg.coord.tryGrantLocked()
Expand Down
Loading
Loading