From 8e307e02b176264df7b730e882a8485f722d0914 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Tue, 8 Aug 2023 11:58:47 -0400 Subject: [PATCH] admission,kvadmission: use tenant cpu consumption for inter-tenant fairness Previously, we were using the instantaneous slots consumed, since that code predated the grunning instrumentation for cpu consumption. The reset logic for tenantInfo.used is now the same for WorkQueues that use slots and tokens. Additionally, there was a bug in WorkQueue.adjustTenantTokens in that it forgot to fix the heap -- this is fixed and tested. Fixes #91533 Epic: none Release note: None --- .../admission_control_multitenant_fairness.go | 4 + pkg/kv/kvserver/kvadmission/BUILD.bazel | 1 + pkg/kv/kvserver/kvadmission/kvadmission.go | 17 +- pkg/util/admission/elastic_cpu_work_handle.go | 10 +- .../admission/elastic_cpu_work_handle_test.go | 5 +- pkg/util/admission/elastic_cpu_work_queue.go | 5 +- .../admission/elastic_cpu_work_queue_test.go | 14 +- .../admission/testdata/elastic_cpu_work_queue | 4 +- pkg/util/admission/testdata/work_queue | 98 ++++++++-- pkg/util/admission/work_queue.go | 170 +++++++++--------- pkg/util/admission/work_queue_test.go | 13 +- 11 files changed, 231 insertions(+), 110 deletions(-) diff --git a/pkg/cmd/roachtest/tests/admission_control_multitenant_fairness.go b/pkg/cmd/roachtest/tests/admission_control_multitenant_fairness.go index 80608b448682..96809bc28f74 100644 --- a/pkg/cmd/roachtest/tests/admission_control_multitenant_fairness.go +++ b/pkg/cmd/roachtest/tests/admission_control_multitenant_fairness.go @@ -40,6 +40,10 @@ import ( // [1]: Co-locating the SQL pod and the workload generator is a bit funky, but // it works fine enough as written and saves us from using another 4 nodes // per test. +// +// TODO(sumeer): Now that we are counting actual CPU for inter-tenant +// fairness, alter the read-heavy workloads to perform different sized work, +// and evaluate fairness. func registerMultiTenantFairness(r registry.Registry) { specs := []multiTenantFairnessSpec{ { diff --git a/pkg/kv/kvserver/kvadmission/BUILD.bazel b/pkg/kv/kvserver/kvadmission/BUILD.bazel index 093e5e0a7f50..f1f2095ee8ce 100644 --- a/pkg/kv/kvserver/kvadmission/BUILD.bazel +++ b/pkg/kv/kvserver/kvadmission/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//pkg/util/admission", "//pkg/util/admission/admissionpb", "//pkg/util/buildutil", + "//pkg/util/grunning", "//pkg/util/log", "//pkg/util/stop", "//pkg/util/timeutil", diff --git a/pkg/kv/kvserver/kvadmission/kvadmission.go b/pkg/kv/kvserver/kvadmission/kvadmission.go index 8ad8dbbafcdc..922354f2b62e 100644 --- a/pkg/kv/kvserver/kvadmission/kvadmission.go +++ b/pkg/kv/kvserver/kvadmission/kvadmission.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/buildutil" + "github.com/cockroachdb/cockroach/pkg/util/grunning" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -222,6 +223,7 @@ type Handle struct { raftAdmissionMeta *kvflowcontrolpb.RaftAdmissionMeta callAdmittedWorkDoneOnKVAdmissionQ bool + cpuStart time.Duration } // AnnotateCtx annotates the given context with request-scoped admission @@ -403,6 +405,11 @@ func (n *controllerImpl) AdmitKVWork( if err != nil { return Handle{}, err } + if callAdmittedWorkDoneOnKVAdmissionQ { + // We include the time to do other activities like intent resolution, + // since it is acceptable to charge them to the tenant. + ah.cpuStart = grunning.Time() + } ah.callAdmittedWorkDoneOnKVAdmissionQ = callAdmittedWorkDoneOnKVAdmissionQ } } @@ -413,7 +420,15 @@ func (n *controllerImpl) AdmitKVWork( func (n *controllerImpl) AdmittedKVWorkDone(ah Handle, writeBytes *StoreWriteBytes) { n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.AdmittedWorkDone(ah.elasticCPUWorkHandle) if ah.callAdmittedWorkDoneOnKVAdmissionQ { - n.kvAdmissionQ.AdmittedWorkDone(ah.tenantID) + cpuTime := grunning.Time() - ah.cpuStart + if cpuTime < 0 { + // See https://github.com/cockroachdb/cockroach/issues/95529. Count 1 + // nanosecond, arbitrarily. + // + // TODO(sumeer): remove this hack when that bug is fixed. + cpuTime = 1 + } + n.kvAdmissionQ.AdmittedWorkDone(ah.tenantID, cpuTime) } if ah.storeAdmissionQ != nil { var doneInfo admission.StoreWorkDoneInfo diff --git a/pkg/util/admission/elastic_cpu_work_handle.go b/pkg/util/admission/elastic_cpu_work_handle.go index cbb15858dbfe..2b377690e7b5 100644 --- a/pkg/util/admission/elastic_cpu_work_handle.go +++ b/pkg/util/admission/elastic_cpu_work_handle.go @@ -14,6 +14,7 @@ import ( "context" "time" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/grunning" ) @@ -21,6 +22,7 @@ import ( // specifically how much on-CPU time a request is allowed to make use of (used // for cooperative scheduling with elastic CPU granters). type ElasticCPUWorkHandle struct { + tenantID roachpb.TenantID // cpuStart captures the running time of the calling goroutine when this // handle is constructed. cpuStart time.Duration @@ -51,8 +53,10 @@ type ElasticCPUWorkHandle struct { testingOverrideOverLimit func() (bool, time.Duration) } -func newElasticCPUWorkHandle(allotted time.Duration) *ElasticCPUWorkHandle { - h := &ElasticCPUWorkHandle{allotted: allotted} +func newElasticCPUWorkHandle( + tenantID roachpb.TenantID, allotted time.Duration, +) *ElasticCPUWorkHandle { + h := &ElasticCPUWorkHandle{tenantID: tenantID, allotted: allotted} h.cpuStart = grunning.Time() return h } @@ -180,7 +184,7 @@ func ElasticCPUWorkHandleFromContext(ctx context.Context) *ElasticCPUWorkHandle // TestingNewElasticCPUHandle exports the ElasticCPUWorkHandle constructor for // testing purposes. func TestingNewElasticCPUHandle() *ElasticCPUWorkHandle { - return newElasticCPUWorkHandle(420 * time.Hour) // use a very high allotment + return newElasticCPUWorkHandle(roachpb.SystemTenantID, 420*time.Hour) // use a very high allotment } // TestingNewElasticCPUHandleWithCallback constructs an ElasticCPUWorkHandle diff --git a/pkg/util/admission/elastic_cpu_work_handle_test.go b/pkg/util/admission/elastic_cpu_work_handle_test.go index 29043703d378..71fd6d0756f9 100644 --- a/pkg/util/admission/elastic_cpu_work_handle_test.go +++ b/pkg/util/admission/elastic_cpu_work_handle_test.go @@ -14,6 +14,7 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/stretchr/testify/require" ) @@ -34,7 +35,7 @@ func TestElasticCPUWorkHandle(t *testing.T) { setRunning(zero) - handle := newElasticCPUWorkHandle(allotment) + handle := newElasticCPUWorkHandle(roachpb.SystemTenantID, allotment) handle.testingOverrideRunningTime = func() time.Duration { overrideMu.Lock() defer overrideMu.Unlock() @@ -176,7 +177,7 @@ func TestElasticCPUWorkHandlePreWork(t *testing.T) { setRunning(zero) - handle := newElasticCPUWorkHandle(allotment) + handle := newElasticCPUWorkHandle(roachpb.SystemTenantID, allotment) handle.testingOverrideRunningTime = func() time.Duration { overrideMu.Lock() defer overrideMu.Unlock() diff --git a/pkg/util/admission/elastic_cpu_work_queue.go b/pkg/util/admission/elastic_cpu_work_queue.go index bd46f08befad..53bce09edb31 100644 --- a/pkg/util/admission/elastic_cpu_work_queue.go +++ b/pkg/util/admission/elastic_cpu_work_queue.go @@ -14,6 +14,7 @@ import ( "context" "time" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" ) @@ -52,6 +53,7 @@ type elasticCPUInternalWorkQueue interface { requester Admit(ctx context.Context, info WorkInfo) (enabled bool, err error) SetTenantWeights(tenantWeights map[uint64]uint32) + adjustTenantUsed(tenantID roachpb.TenantID, additionalUsed int64) } func makeElasticCPUWorkQueue( @@ -90,7 +92,7 @@ func (e *ElasticCPUWorkQueue) Admit( return nil, nil } e.metrics.AcquiredNanos.Inc(duration.Nanoseconds()) - return newElasticCPUWorkHandle(duration), nil + return newElasticCPUWorkHandle(info.TenantID, duration), nil } // AdmittedWorkDone indicates to the queue that the admitted work has @@ -102,6 +104,7 @@ func (e *ElasticCPUWorkQueue) AdmittedWorkDone(h *ElasticCPUWorkHandle) { e.metrics.PreWorkNanos.Inc(h.preWork.Nanoseconds()) _, difference := h.OverLimit() + e.workQueue.adjustTenantUsed(h.tenantID, difference.Nanoseconds()) if difference > 0 { // We've used up our allotted slice, which we've already deducted tokens // for. But we've gone over by difference, which we now need to deduct diff --git a/pkg/util/admission/elastic_cpu_work_queue_test.go b/pkg/util/admission/elastic_cpu_work_queue_test.go index 291791b43c13..99eeb27be352 100644 --- a/pkg/util/admission/elastic_cpu_work_queue_test.go +++ b/pkg/util/admission/elastic_cpu_work_queue_test.go @@ -18,6 +18,7 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" "github.com/cockroachdb/datadriven" @@ -64,7 +65,7 @@ func TestElasticCPUWorkQueue(t *testing.T) { d.ScanArgs(t, "disabled", &elasticCPUInternalWorkQueue.disabled) } - handle, err := elasticWorkQ.Admit(ctx, duration, WorkInfo{}) + handle, err := elasticWorkQ.Admit(ctx, duration, WorkInfo{TenantID: roachpb.SystemTenantID}) require.NoError(t, err) var buf strings.Builder @@ -97,7 +98,7 @@ func TestElasticCPUWorkQueue(t *testing.T) { allotted, err := time.ParseDuration(allottedStr) require.NoError(t, err) - handle := &ElasticCPUWorkHandle{} + handle := &ElasticCPUWorkHandle{tenantID: roachpb.SystemTenantID} handle.testingOverrideRunningTime = func() time.Duration { return running } @@ -170,6 +171,15 @@ func (t *testElasticCPUInternalWorkQueue) SetTenantWeights(tenantWeights map[uin panic("unimplemented") } +func (t *testElasticCPUInternalWorkQueue) adjustTenantUsed( + tenantID roachpb.TenantID, additionalUsed int64, +) { + if !t.disabled { + fmt.Fprintf(&t.buf, "adjust-tenant-used: tenant=%s additional-used=%s", + tenantID.String(), time.Duration(additionalUsed).String()) + } +} + func (t *testElasticCPUInternalWorkQueue) hasWaitingRequests() bool { panic("unimplemented") } diff --git a/pkg/util/admission/testdata/elastic_cpu_work_queue b/pkg/util/admission/testdata/elastic_cpu_work_queue index a08f58468d77..eb8bc6e84e3c 100644 --- a/pkg/util/admission/testdata/elastic_cpu_work_queue +++ b/pkg/util/admission/testdata/elastic_cpu_work_queue @@ -63,7 +63,7 @@ handle: 50ms admitted-work-done running=10ms allotted=50ms ---- granter: return-grant=40ms -work-queue: +work-queue: adjust-tenant-used: tenant=system additional-used=-40ms metrics: acquired=50ms returned=40ms max-available=8s # Repeat the same but this time simulate what happens if we've taken less than @@ -83,7 +83,7 @@ handle: 50ms admitted-work-done running=70ms allotted=50ms ---- granter: took-without-permission=20ms -work-queue: +work-queue: adjust-tenant-used: tenant=system additional-used=20ms metrics: acquired=70ms returned=0s max-available=8s # vim:ft=sh diff --git a/pkg/util/admission/testdata/work_queue b/pkg/util/admission/testdata/work_queue index a625fea3755c..fac51ea21884 100644 --- a/pkg/util/admission/testdata/work_queue +++ b/pkg/util/admission/testdata/work_queue @@ -47,7 +47,7 @@ admit id=4 tenant=71 priority=-128 create-time-millis=4 bypass=false admit id=5 tenant=71 priority=0 create-time-millis=5 bypass=false ---- -# Tenant 71 is the top of the heap since not using any slots. +# Tenant 71 is the top of the heap since has not used any cpu time. print ---- closed epoch: 0 tenantHeap len: 2 top tenant: 71 @@ -60,8 +60,8 @@ continueGrantChain 5 id 5: admit succeeded granted: returned 1 -# Both tenants are using 1 slot. The tie is broken arbitrarily in favor of -# tenant 71. +# Both tenants have used 1 cpu nano time. The tie is broken arbitrarily in +# favor of tenant 71. print ---- closed epoch: 0 tenantHeap len: 2 top tenant: 71 @@ -79,12 +79,12 @@ closed epoch: 0 tenantHeap len: 2 top tenant: 71 tenant-id: 53 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 3, epoch: 0, qt: 100] tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100] -# The work admitted for tenant 53 is done. -work-done id=1 +# The work admitted for tenant 53 is done and consumed no cpu-time +work-done id=1 cpu-time=0 ---- returnGrant 1 -# Tenant 53 now using fewer slots so it becomes the top of the heap. +# Tenant 53 has used no cpu, so it becomes the top of the heap. print ---- closed epoch: 0 tenantHeap len: 2 top tenant: 53 @@ -105,25 +105,103 @@ closed epoch: 0 tenantHeap len: 2 top tenant: 53 tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 3, epoch: 0, qt: 100] tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100] +# The system tenant work is done and consumed 10 cpu nanos. +work-done id=6 cpu-time=10 +---- +returnGrant 1 + +print +---- +closed epoch: 0 tenantHeap len: 2 top tenant: 53 + tenant-id: 1 used: 10, w: 1, fifo: -128 + tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 3, epoch: 0, qt: 100] + tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100] + +# Another request from tenant 53, which is behind the existing request in the heap. +admit id=7 tenant=53 priority=0 create-time-millis=5 bypass=false +---- + +print +---- +closed epoch: 0 tenantHeap len: 2 top tenant: 53 + tenant-id: 1 used: 10, w: 1, fifo: -128 + tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 3, epoch: 0, qt: 100] [1: pri: normal-pri, ct: 5, epoch: 0, qt: 100] + tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100] + granted chain-id=7 ---- continueGrantChain 7 id 2: admit succeeded granted: returned 1 +# Both tenants have used 1 cpu nano time. The tie is broken arbitrarily in +# favor of tenant 53. +print +---- +closed epoch: 0 tenantHeap len: 2 top tenant: 53 + tenant-id: 1 used: 10, w: 1, fifo: -128 + tenant-id: 53 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 5, epoch: 0, qt: 100] + tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100] + +# The work admitted for tenant 53 is done and has consumed 20 cpu nanos, so +# tenant 71 moves to the top of the heap. +work-done id=2 cpu-time=20 +---- +returnGrant 1 + +print +---- +closed epoch: 0 tenantHeap len: 2 top tenant: 71 + tenant-id: 1 used: 10, w: 1, fifo: -128 + tenant-id: 53 used: 20, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 5, epoch: 0, qt: 100] + tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100] + +gc-tenants-and-reset-used +---- +closed epoch: 0 tenantHeap len: 2 top tenant: 71 + tenant-id: 1 used: 0, w: 1, fifo: -128 + tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 5, epoch: 0, qt: 100] + tenant-id: 71 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100] + granted chain-id=9 ---- continueGrantChain 9 id 4: admit succeeded granted: returned 1 +# Tenant 71 has used 1 cpu nano. +print +---- +closed epoch: 0 tenantHeap len: 1 top tenant: 53 + tenant-id: 1 used: 0, w: 1, fifo: -128 + tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 5, epoch: 0, qt: 100] + tenant-id: 71 used: 1, w: 1, fifo: -128 + +# Try to return more cpu than used, to check that there is no overflow. +work-done id=4 cpu-time=-5 +---- +returnGrant 1 + +print +---- +closed epoch: 0 tenantHeap len: 1 top tenant: 53 + tenant-id: 1 used: 0, w: 1, fifo: -128 + tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 5, epoch: 0, qt: 100] + tenant-id: 71 used: 0, w: 1, fifo: -128 + +granted chain-id=10 +---- +continueGrantChain 10 +id 7: admit succeeded +granted: returned 1 + # No more waiting requests. print ---- closed epoch: 0 tenantHeap len: 0 - tenant-id: 1 used: 1, w: 1, fifo: -128 + tenant-id: 1 used: 0, w: 1, fifo: -128 tenant-id: 53 used: 1, w: 1, fifo: -128 - tenant-id: 71 used: 2, w: 1, fifo: -128 + tenant-id: 71 used: 0, w: 1, fifo: -128 # Granted returns false. granted chain-id=10 @@ -133,9 +211,9 @@ granted: returned 0 print ---- closed epoch: 0 tenantHeap len: 0 - tenant-id: 1 used: 1, w: 1, fifo: -128 + tenant-id: 1 used: 0, w: 1, fifo: -128 tenant-id: 53 used: 1, w: 1, fifo: -128 - tenant-id: 71 used: 2, w: 1, fifo: -128 + tenant-id: 71 used: 0, w: 1, fifo: -128 init ---- diff --git a/pkg/util/admission/work_queue.go b/pkg/util/admission/work_queue.go index 2dc6f6dfda93..98da8a399e18 100644 --- a/pkg/util/admission/work_queue.go +++ b/pkg/util/admission/work_queue.go @@ -333,6 +333,9 @@ type workQueueOptions struct { timeSource timeutil.TimeSource // The epoch closing goroutine can be disabled for tests. disableEpochClosingGoroutine bool + // The background resetting of used and GC'ing of tenants can be disabled + // for tests. + disableGCTenantsAndResetUsed bool } func makeWorkQueueOptions(workKind WorkKind) workQueueOptions { @@ -403,18 +406,20 @@ func initWorkQueue( q.mu.tenants = make(map[uint64]*tenantInfo) q.sampleEpochLIFOSettingsLocked() }() - go func() { - ticker := time.NewTicker(time.Second) - for { - select { - case <-ticker.C: - q.gcTenantsAndResetTokens() - case <-stopCh: - // Channel closed. - return + if !opts.disableGCTenantsAndResetUsed { + go func() { + ticker := time.NewTicker(time.Second) + for { + select { + case <-ticker.C: + q.gcTenantsAndResetUsed() + case <-stopCh: + // Channel closed. + return + } } - } - }() + }() + } q.tryCloseEpoch(q.timeNow()) if !opts.disableEpochClosingGoroutine { q.startClosingEpochs() @@ -677,29 +682,19 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err // the state of the requesters to see if there is any queued work that // can be granted admission. q.mu.Lock() - prevTenant := tenant - // The tenant could have been removed when using tokens. See the comment - // where the tenantInfo struct is declared. + // The tenant could have been removed. See the comment where the + // tenantInfo struct is declared. tenant, ok = q.mu.tenants[tenantID] - if !q.usesTokens { - if !ok || prevTenant != tenant { - panic("prev tenantInfo no longer in map") - } - if tenant.used < uint64(info.RequestedCount) { - panic(errors.AssertionFailedf("tenant.used %d < info.RequestedCount %d", - tenant.used, info.RequestedCount)) - } + if !ok { + tenant = newTenantInfo(tenantID, q.getTenantWeightLocked(tenantID)) + q.mu.tenants[tenantID] = tenant + } + // Don't want to overflow tenant.used if it has decreased because of being + // reset to 0 by the GC goroutine. + if tenant.used >= uint64(info.RequestedCount) { tenant.used -= uint64(info.RequestedCount) } else { - if !ok { - tenant = newTenantInfo(tenantID, q.getTenantWeightLocked(tenantID)) - q.mu.tenants[tenantID] = tenant - } - // Don't want to overflow tenant.used if it is already 0 because of - // being reset to 0 by the GC goroutine. - if tenant.used >= uint64(info.RequestedCount) { - tenant.used -= uint64(info.RequestedCount) - } + tenant.used = 0 } } @@ -755,7 +750,7 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err ) } - return // return without waiting (admission is asynchronous) + return false, nil // return without waiting (admission is asynchronous) } // Start waiting for admission. @@ -772,16 +767,11 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err // are too short, we could underestimate the actual wait time. tenant.priorityStates.updateDelayLocked(work.priority, waitDur, true /* canceled */) if work.heapIndex == -1 { - // No longer in heap. Raced with token/slot grant. - if !q.usesTokens { - if tenant.used < uint64(info.RequestedCount) { - panic(errors.AssertionFailedf("tenant.used %d < info.RequestedCount %d", - tenant.used, info.RequestedCount)) - } - tenant.used -= uint64(info.RequestedCount) - } - // Else, we don't decrement tenant.used since we don't want to race with - // the gc goroutine that will set used=0. + // No longer in heap. Raced with token/slot grant. Don't bother + // decrementing tenant.used since we don't want to race with the gc + // goroutine that sets used=0 and could have GC'd tenant and returned it + // to the sync.Pool. We can fix this if needed by calling + // adjustTenantUsedLocked. q.mu.Unlock() q.granter.returnGrant(info.RequestedCount) // The channel is sent to after releasing mu, so we don't need to hold @@ -828,22 +818,18 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err // AdmittedWorkDone is used to inform the WorkQueue that some admitted work is // finished. It must be called iff the WorkKind of this WorkQueue uses slots // (not tokens), i.e., KVWork, SQLStatementLeafStartWork, -// SQLStatementRootStartWork. -func (q *WorkQueue) AdmittedWorkDone(tenantID roachpb.TenantID) { +// SQLStatementRootStartWork. Note, there is no support for SQLStatementLeafStartWork, +// SQLStatementRootStartWork in the code yet. +func (q *WorkQueue) AdmittedWorkDone(tenantID roachpb.TenantID, cpuTime time.Duration) { if q.usesTokens { panic(errors.AssertionFailedf("tokens should not be returned")) } - // Single slot is allocated for the work. - q.mu.Lock() - tenant, ok := q.mu.tenants[tenantID.ToUint64()] - if !ok { - panic(errors.AssertionFailedf("tenant not found")) - } - tenant.used-- - if isInTenantHeap(tenant) { - q.mu.tenantHeap.fix(tenant) + // Single slot is allocated for the work in the granter, and tenant.used was + // incremented by 1. + additionalUsed := cpuTime - 1 + if additionalUsed != 0 { + q.adjustTenantUsed(tenantID, additionalUsed.Nanoseconds()) } - q.mu.Unlock() q.granter.returnGrant(1) } @@ -922,7 +908,7 @@ func (q *WorkQueue) granted(grantChainID grantChainID) int64 { return requestedCount } -func (q *WorkQueue) gcTenantsAndResetTokens() { +func (q *WorkQueue) gcTenantsAndResetUsed() { q.mu.Lock() defer q.mu.Unlock() // With large numbers of active tenants, this iteration could hold the lock @@ -932,7 +918,7 @@ func (q *WorkQueue) gcTenantsAndResetTokens() { if info.used == 0 && !isInTenantHeap(info) { delete(q.mu.tenants, id) releaseTenantInfo(info) - } else if q.usesTokens { + } else { info.used = 0 // All the heap members will reset used=0, so no need to change heap // ordering. @@ -940,26 +926,30 @@ func (q *WorkQueue) gcTenantsAndResetTokens() { } } -// adjustTenantTokens is used internally by StoreWorkQueue. The -// additionalTokensNeeded count can be negative, in which case it is returning -// tokens. This is only for WorkQueue's own accounting -- it should not call -// into granter. -func (q *WorkQueue) adjustTenantTokens(tenantID roachpb.TenantID, additionalTokensNeeded int64) { +// adjustTenantUsed is used internally by StoreWorkQueue, and by the KV queue +// in AdmittedWorkDone. The additionalUsed count can be negative, in which +// case it is returning unused resources. This is only for WorkQueue's own +// accounting -- it should not call into granter. +func (q *WorkQueue) adjustTenantUsed(tenantID roachpb.TenantID, additionalUsed int64) { tid := tenantID.ToUint64() q.mu.Lock() defer q.mu.Unlock() tenant, ok := q.mu.tenants[tid] - if ok { - if additionalTokensNeeded < 0 { - toReturn := uint64(-additionalTokensNeeded) - if tenant.used < toReturn { - tenant.used = 0 - } else { - tenant.used -= toReturn - } + if !ok { + return + } + if additionalUsed < 0 { + toReturn := uint64(-additionalUsed) + if tenant.used < toReturn { + tenant.used = 0 } else { - tenant.used += uint64(additionalTokensNeeded) + tenant.used -= toReturn } + } else { + tenant.used += uint64(additionalUsed) + } + if isInTenantHeap(tenant) { + q.mu.tenantHeap.fix(tenant) } } @@ -1264,31 +1254,37 @@ type tenantInfo struct { id uint64 // The weight assigned to the tenant. Must be > 0. weight uint32 - // used can be the currently used slots, or the tokens granted within the last - // interval. + // used is computed over an interval and periodically reset. Ordering + // between tenants, for fair sharing, utilizes this value. + // + // - For slots, used represents cpu time duration consumed by the tenant. It + // is incremented by 1 (for non-elastic work) or some prediction of cpu + // time (for elastic work) when the work is admitted. A correction is + // applied when the work is done based on the actual cpu time consumed. + // - For tokens, used represents the tokens consumed. A prediction of tokens + // that will be consumed is deducted at admission time, and a correction + // is applied later. // // tenantInfo will not be GC'd until both used==0 and // len(waitingWorkHeap)==0. // - // Note that used can be reset to 0 periodically, iff the WorkQueue is using - // tokens (not slots). This creates a risk since callers of Admit hold - // references to tenantInfo. We do not want a race condition where the - // tenantInfo held in Admit is returned to the sync.Pool. Note that this - // race is almost impossible to reproduce in practice since GC loop runs at - // 1s intervals and needs two iterations to GC a tenantInfo -- first to - // reset used=0 and then the next time to GC it. We fix this by being - // careful in the code of Admit by not reusing a reference to tenantInfo, - // and instead grab a new reference from the map. + // The used value is reset to 0 periodically. This creates a risk since + // callers of Admit hold references to tenantInfo. We do not want a race + // condition where the tenantInfo held in Admit is returned to the + // sync.Pool. Note that this race is almost impossible to reproduce in + // practice since GC loop runs at 1s intervals and needs two iterations to + // GC a tenantInfo -- first to reset used=0 and then the next time to GC it. + // We fix this by being careful in the code of Admit by not reusing a + // reference to tenantInfo, and instead grab a new reference from the map. // // The above fix for the GC race condition is insufficient to prevent // overflow of the used field if the reset to used=0 happens between used++ // and used-- within Admit. Properly fixing that would need to track the // count of used==0 resets and gate the used-- on the count not having // changed. This was considered unnecessarily complicated and instead we - // simply (a) do not do used-- for the tokens case, if used is already zero, - // or (b) do not do used-- for the tokens case if the request was canceled. - // This does imply some inaccuracy in token counting -- it can be fixed if - // needed. + // simply (a) do not do used--, if used is already zero, or (b) do not do + // used-- if the request was canceled. This does imply some inaccuracy in + // accounting -- it can be fixed if needed. used uint64 waitingWorkHeap waitingWorkHeap openEpochsHeap openEpochsHeap @@ -1997,7 +1993,7 @@ func (q *StoreWorkQueue) admittedReplicatedWork( if !coordMuLocked { q.coordMu.Unlock() } - q.q[wc].adjustTenantTokens(tenantID, additionalTokensNeeded) + q.q[wc].adjustTenantUsed(tenantID, additionalTokensNeeded) // Inform callers of the entry we just admitted. // @@ -2042,7 +2038,7 @@ func (q *StoreWorkQueue) AdmittedWorkDone(h StoreWorkHandle, doneInfo StoreWorkD } q.updateStoreStatsAfterWorkDone(1, doneInfo, false) additionalTokens := q.granters[h.workClass].storeWriteDone(h.writeTokens, doneInfo) - q.q[h.workClass].adjustTenantTokens(h.tenantID, additionalTokens) + q.q[h.workClass].adjustTenantUsed(h.tenantID, additionalTokens) return nil } diff --git a/pkg/util/admission/work_queue_test.go b/pkg/util/admission/work_queue_test.go index bdb287c81a6f..1c30b9c5a3f5 100644 --- a/pkg/util/admission/work_queue_test.go +++ b/pkg/util/admission/work_queue_test.go @@ -208,6 +208,7 @@ func TestWorkQueueBasic(t *testing.T) { timeSource = timeutil.NewManualTime(initialTime) opts.timeSource = timeSource opts.disableEpochClosingGoroutine = true + opts.disableGCTenantsAndResetUsed = true st = cluster.MakeTestingClusterSettings() q = makeWorkQueue(log.MakeTestingAmbientContext(tracing.NewTracer()), KVWork, tg, st, metrics, opts).(*WorkQueue) @@ -289,7 +290,11 @@ func TestWorkQueueBasic(t *testing.T) { if !work.admitted { return fmt.Sprintf("id not admitted: %d\n", id) } - q.AdmittedWorkDone(work.tenantID) + cpuTime := int64(1) + if d.HasArg("cpu-time") { + d.ScanArgs(t, "cpu-time", &cpuTime) + } + q.AdmittedWorkDone(work.tenantID, time.Duration(cpuTime)) wrkMap.delete(id) return buf.stringAndReset() @@ -324,6 +329,10 @@ func TestWorkQueueBasic(t *testing.T) { q.tryCloseEpoch(timeSource.Now()) return q.String() + case "gc-tenants-and-reset-used": + q.gcTenantsAndResetUsed() + return q.String() + default: return fmt.Sprintf("unknown command: %s", d.Cmd) } @@ -406,7 +415,7 @@ func TestWorkQueueTokenResetRace(t *testing.T) { // This hot loop with GC calls is able to trigger the previously buggy // code by squeezing in multiple times between the token grant and // cancellation. - q.gcTenantsAndResetTokens() + q.gcTenantsAndResetUsed() } } }()