diff --git a/pkg/cmd/roachtest/tests/admission_control_multitenant_fairness.go b/pkg/cmd/roachtest/tests/admission_control_multitenant_fairness.go index 80608b448682..96809bc28f74 100644 --- a/pkg/cmd/roachtest/tests/admission_control_multitenant_fairness.go +++ b/pkg/cmd/roachtest/tests/admission_control_multitenant_fairness.go @@ -40,6 +40,10 @@ import ( // [1]: Co-locating the SQL pod and the workload generator is a bit funky, but // it works fine enough as written and saves us from using another 4 nodes // per test. +// +// TODO(sumeer): Now that we are counting actual CPU for inter-tenant +// fairness, alter the read-heavy workloads to perform different sized work, +// and evaluate fairness. func registerMultiTenantFairness(r registry.Registry) { specs := []multiTenantFairnessSpec{ { diff --git a/pkg/kv/kvserver/kvadmission/BUILD.bazel b/pkg/kv/kvserver/kvadmission/BUILD.bazel index 093e5e0a7f50..f1f2095ee8ce 100644 --- a/pkg/kv/kvserver/kvadmission/BUILD.bazel +++ b/pkg/kv/kvserver/kvadmission/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//pkg/util/admission", "//pkg/util/admission/admissionpb", "//pkg/util/buildutil", + "//pkg/util/grunning", "//pkg/util/log", "//pkg/util/stop", "//pkg/util/timeutil", diff --git a/pkg/kv/kvserver/kvadmission/kvadmission.go b/pkg/kv/kvserver/kvadmission/kvadmission.go index 8ad8dbbafcdc..922354f2b62e 100644 --- a/pkg/kv/kvserver/kvadmission/kvadmission.go +++ b/pkg/kv/kvserver/kvadmission/kvadmission.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/buildutil" + "github.com/cockroachdb/cockroach/pkg/util/grunning" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -222,6 +223,7 @@ type Handle struct { raftAdmissionMeta *kvflowcontrolpb.RaftAdmissionMeta callAdmittedWorkDoneOnKVAdmissionQ bool + cpuStart time.Duration } // AnnotateCtx annotates the given context with request-scoped admission @@ -403,6 +405,11 @@ func (n *controllerImpl) AdmitKVWork( if err != nil { return Handle{}, err } + if callAdmittedWorkDoneOnKVAdmissionQ { + // We include the time to do other activities like intent resolution, + // since it is acceptable to charge them to the tenant. + ah.cpuStart = grunning.Time() + } ah.callAdmittedWorkDoneOnKVAdmissionQ = callAdmittedWorkDoneOnKVAdmissionQ } } @@ -413,7 +420,15 @@ func (n *controllerImpl) AdmitKVWork( func (n *controllerImpl) AdmittedKVWorkDone(ah Handle, writeBytes *StoreWriteBytes) { n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.AdmittedWorkDone(ah.elasticCPUWorkHandle) if ah.callAdmittedWorkDoneOnKVAdmissionQ { - n.kvAdmissionQ.AdmittedWorkDone(ah.tenantID) + cpuTime := grunning.Time() - ah.cpuStart + if cpuTime < 0 { + // See https://github.com/cockroachdb/cockroach/issues/95529. Count 1 + // nanosecond, arbitrarily. + // + // TODO(sumeer): remove this hack when that bug is fixed. + cpuTime = 1 + } + n.kvAdmissionQ.AdmittedWorkDone(ah.tenantID, cpuTime) } if ah.storeAdmissionQ != nil { var doneInfo admission.StoreWorkDoneInfo diff --git a/pkg/util/admission/elastic_cpu_work_handle.go b/pkg/util/admission/elastic_cpu_work_handle.go index cbb15858dbfe..2b377690e7b5 100644 --- a/pkg/util/admission/elastic_cpu_work_handle.go +++ b/pkg/util/admission/elastic_cpu_work_handle.go @@ -14,6 +14,7 @@ import ( "context" "time" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/grunning" ) @@ -21,6 +22,7 @@ import ( // specifically how much on-CPU time a request is allowed to make use of (used // for cooperative scheduling with elastic CPU granters). type ElasticCPUWorkHandle struct { + tenantID roachpb.TenantID // cpuStart captures the running time of the calling goroutine when this // handle is constructed. cpuStart time.Duration @@ -51,8 +53,10 @@ type ElasticCPUWorkHandle struct { testingOverrideOverLimit func() (bool, time.Duration) } -func newElasticCPUWorkHandle(allotted time.Duration) *ElasticCPUWorkHandle { - h := &ElasticCPUWorkHandle{allotted: allotted} +func newElasticCPUWorkHandle( + tenantID roachpb.TenantID, allotted time.Duration, +) *ElasticCPUWorkHandle { + h := &ElasticCPUWorkHandle{tenantID: tenantID, allotted: allotted} h.cpuStart = grunning.Time() return h } @@ -180,7 +184,7 @@ func ElasticCPUWorkHandleFromContext(ctx context.Context) *ElasticCPUWorkHandle // TestingNewElasticCPUHandle exports the ElasticCPUWorkHandle constructor for // testing purposes. func TestingNewElasticCPUHandle() *ElasticCPUWorkHandle { - return newElasticCPUWorkHandle(420 * time.Hour) // use a very high allotment + return newElasticCPUWorkHandle(roachpb.SystemTenantID, 420*time.Hour) // use a very high allotment } // TestingNewElasticCPUHandleWithCallback constructs an ElasticCPUWorkHandle diff --git a/pkg/util/admission/elastic_cpu_work_handle_test.go b/pkg/util/admission/elastic_cpu_work_handle_test.go index 29043703d378..71fd6d0756f9 100644 --- a/pkg/util/admission/elastic_cpu_work_handle_test.go +++ b/pkg/util/admission/elastic_cpu_work_handle_test.go @@ -14,6 +14,7 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/stretchr/testify/require" ) @@ -34,7 +35,7 @@ func TestElasticCPUWorkHandle(t *testing.T) { setRunning(zero) - handle := newElasticCPUWorkHandle(allotment) + handle := newElasticCPUWorkHandle(roachpb.SystemTenantID, allotment) handle.testingOverrideRunningTime = func() time.Duration { overrideMu.Lock() defer overrideMu.Unlock() @@ -176,7 +177,7 @@ func TestElasticCPUWorkHandlePreWork(t *testing.T) { setRunning(zero) - handle := newElasticCPUWorkHandle(allotment) + handle := newElasticCPUWorkHandle(roachpb.SystemTenantID, allotment) handle.testingOverrideRunningTime = func() time.Duration { overrideMu.Lock() defer overrideMu.Unlock() diff --git a/pkg/util/admission/elastic_cpu_work_queue.go b/pkg/util/admission/elastic_cpu_work_queue.go index bd46f08befad..53bce09edb31 100644 --- a/pkg/util/admission/elastic_cpu_work_queue.go +++ b/pkg/util/admission/elastic_cpu_work_queue.go @@ -14,6 +14,7 @@ import ( "context" "time" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" ) @@ -52,6 +53,7 @@ type elasticCPUInternalWorkQueue interface { requester Admit(ctx context.Context, info WorkInfo) (enabled bool, err error) SetTenantWeights(tenantWeights map[uint64]uint32) + adjustTenantUsed(tenantID roachpb.TenantID, additionalUsed int64) } func makeElasticCPUWorkQueue( @@ -90,7 +92,7 @@ func (e *ElasticCPUWorkQueue) Admit( return nil, nil } e.metrics.AcquiredNanos.Inc(duration.Nanoseconds()) - return newElasticCPUWorkHandle(duration), nil + return newElasticCPUWorkHandle(info.TenantID, duration), nil } // AdmittedWorkDone indicates to the queue that the admitted work has @@ -102,6 +104,7 @@ func (e *ElasticCPUWorkQueue) AdmittedWorkDone(h *ElasticCPUWorkHandle) { e.metrics.PreWorkNanos.Inc(h.preWork.Nanoseconds()) _, difference := h.OverLimit() + e.workQueue.adjustTenantUsed(h.tenantID, difference.Nanoseconds()) if difference > 0 { // We've used up our allotted slice, which we've already deducted tokens // for. But we've gone over by difference, which we now need to deduct diff --git a/pkg/util/admission/elastic_cpu_work_queue_test.go b/pkg/util/admission/elastic_cpu_work_queue_test.go index 291791b43c13..99eeb27be352 100644 --- a/pkg/util/admission/elastic_cpu_work_queue_test.go +++ b/pkg/util/admission/elastic_cpu_work_queue_test.go @@ -18,6 +18,7 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" "github.com/cockroachdb/datadriven" @@ -64,7 +65,7 @@ func TestElasticCPUWorkQueue(t *testing.T) { d.ScanArgs(t, "disabled", &elasticCPUInternalWorkQueue.disabled) } - handle, err := elasticWorkQ.Admit(ctx, duration, WorkInfo{}) + handle, err := elasticWorkQ.Admit(ctx, duration, WorkInfo{TenantID: roachpb.SystemTenantID}) require.NoError(t, err) var buf strings.Builder @@ -97,7 +98,7 @@ func TestElasticCPUWorkQueue(t *testing.T) { allotted, err := time.ParseDuration(allottedStr) require.NoError(t, err) - handle := &ElasticCPUWorkHandle{} + handle := &ElasticCPUWorkHandle{tenantID: roachpb.SystemTenantID} handle.testingOverrideRunningTime = func() time.Duration { return running } @@ -170,6 +171,15 @@ func (t *testElasticCPUInternalWorkQueue) SetTenantWeights(tenantWeights map[uin panic("unimplemented") } +func (t *testElasticCPUInternalWorkQueue) adjustTenantUsed( + tenantID roachpb.TenantID, additionalUsed int64, +) { + if !t.disabled { + fmt.Fprintf(&t.buf, "adjust-tenant-used: tenant=%s additional-used=%s", + tenantID.String(), time.Duration(additionalUsed).String()) + } +} + func (t *testElasticCPUInternalWorkQueue) hasWaitingRequests() bool { panic("unimplemented") } diff --git a/pkg/util/admission/testdata/elastic_cpu_work_queue b/pkg/util/admission/testdata/elastic_cpu_work_queue index a08f58468d77..eb8bc6e84e3c 100644 --- a/pkg/util/admission/testdata/elastic_cpu_work_queue +++ b/pkg/util/admission/testdata/elastic_cpu_work_queue @@ -63,7 +63,7 @@ handle: 50ms admitted-work-done running=10ms allotted=50ms ---- granter: return-grant=40ms -work-queue: +work-queue: adjust-tenant-used: tenant=system additional-used=-40ms metrics: acquired=50ms returned=40ms max-available=8s # Repeat the same but this time simulate what happens if we've taken less than @@ -83,7 +83,7 @@ handle: 50ms admitted-work-done running=70ms allotted=50ms ---- granter: took-without-permission=20ms -work-queue: +work-queue: adjust-tenant-used: tenant=system additional-used=20ms metrics: acquired=70ms returned=0s max-available=8s # vim:ft=sh diff --git a/pkg/util/admission/testdata/work_queue b/pkg/util/admission/testdata/work_queue index a625fea3755c..fac51ea21884 100644 --- a/pkg/util/admission/testdata/work_queue +++ b/pkg/util/admission/testdata/work_queue @@ -47,7 +47,7 @@ admit id=4 tenant=71 priority=-128 create-time-millis=4 bypass=false admit id=5 tenant=71 priority=0 create-time-millis=5 bypass=false ---- -# Tenant 71 is the top of the heap since not using any slots. +# Tenant 71 is the top of the heap since has not used any cpu time. print ---- closed epoch: 0 tenantHeap len: 2 top tenant: 71 @@ -60,8 +60,8 @@ continueGrantChain 5 id 5: admit succeeded granted: returned 1 -# Both tenants are using 1 slot. The tie is broken arbitrarily in favor of -# tenant 71. +# Both tenants have used 1 cpu nano time. The tie is broken arbitrarily in +# favor of tenant 71. print ---- closed epoch: 0 tenantHeap len: 2 top tenant: 71 @@ -79,12 +79,12 @@ closed epoch: 0 tenantHeap len: 2 top tenant: 71 tenant-id: 53 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 3, epoch: 0, qt: 100] tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100] -# The work admitted for tenant 53 is done. -work-done id=1 +# The work admitted for tenant 53 is done and consumed no cpu-time +work-done id=1 cpu-time=0 ---- returnGrant 1 -# Tenant 53 now using fewer slots so it becomes the top of the heap. +# Tenant 53 has used no cpu, so it becomes the top of the heap. print ---- closed epoch: 0 tenantHeap len: 2 top tenant: 53 @@ -105,25 +105,103 @@ closed epoch: 0 tenantHeap len: 2 top tenant: 53 tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 3, epoch: 0, qt: 100] tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100] +# The system tenant work is done and consumed 10 cpu nanos. +work-done id=6 cpu-time=10 +---- +returnGrant 1 + +print +---- +closed epoch: 0 tenantHeap len: 2 top tenant: 53 + tenant-id: 1 used: 10, w: 1, fifo: -128 + tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 3, epoch: 0, qt: 100] + tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100] + +# Another request from tenant 53, which is behind the existing request in the heap. +admit id=7 tenant=53 priority=0 create-time-millis=5 bypass=false +---- + +print +---- +closed epoch: 0 tenantHeap len: 2 top tenant: 53 + tenant-id: 1 used: 10, w: 1, fifo: -128 + tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 3, epoch: 0, qt: 100] [1: pri: normal-pri, ct: 5, epoch: 0, qt: 100] + tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100] + granted chain-id=7 ---- continueGrantChain 7 id 2: admit succeeded granted: returned 1 +# Both tenants have used 1 cpu nano time. The tie is broken arbitrarily in +# favor of tenant 53. +print +---- +closed epoch: 0 tenantHeap len: 2 top tenant: 53 + tenant-id: 1 used: 10, w: 1, fifo: -128 + tenant-id: 53 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 5, epoch: 0, qt: 100] + tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100] + +# The work admitted for tenant 53 is done and has consumed 20 cpu nanos, so +# tenant 71 moves to the top of the heap. +work-done id=2 cpu-time=20 +---- +returnGrant 1 + +print +---- +closed epoch: 0 tenantHeap len: 2 top tenant: 71 + tenant-id: 1 used: 10, w: 1, fifo: -128 + tenant-id: 53 used: 20, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 5, epoch: 0, qt: 100] + tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100] + +gc-tenants-and-reset-used +---- +closed epoch: 0 tenantHeap len: 2 top tenant: 71 + tenant-id: 1 used: 0, w: 1, fifo: -128 + tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 5, epoch: 0, qt: 100] + tenant-id: 71 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100] + granted chain-id=9 ---- continueGrantChain 9 id 4: admit succeeded granted: returned 1 +# Tenant 71 has used 1 cpu nano. +print +---- +closed epoch: 0 tenantHeap len: 1 top tenant: 53 + tenant-id: 1 used: 0, w: 1, fifo: -128 + tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 5, epoch: 0, qt: 100] + tenant-id: 71 used: 1, w: 1, fifo: -128 + +# Try to return more cpu than used, to check that there is no overflow. +work-done id=4 cpu-time=-5 +---- +returnGrant 1 + +print +---- +closed epoch: 0 tenantHeap len: 1 top tenant: 53 + tenant-id: 1 used: 0, w: 1, fifo: -128 + tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 5, epoch: 0, qt: 100] + tenant-id: 71 used: 0, w: 1, fifo: -128 + +granted chain-id=10 +---- +continueGrantChain 10 +id 7: admit succeeded +granted: returned 1 + # No more waiting requests. print ---- closed epoch: 0 tenantHeap len: 0 - tenant-id: 1 used: 1, w: 1, fifo: -128 + tenant-id: 1 used: 0, w: 1, fifo: -128 tenant-id: 53 used: 1, w: 1, fifo: -128 - tenant-id: 71 used: 2, w: 1, fifo: -128 + tenant-id: 71 used: 0, w: 1, fifo: -128 # Granted returns false. granted chain-id=10 @@ -133,9 +211,9 @@ granted: returned 0 print ---- closed epoch: 0 tenantHeap len: 0 - tenant-id: 1 used: 1, w: 1, fifo: -128 + tenant-id: 1 used: 0, w: 1, fifo: -128 tenant-id: 53 used: 1, w: 1, fifo: -128 - tenant-id: 71 used: 2, w: 1, fifo: -128 + tenant-id: 71 used: 0, w: 1, fifo: -128 init ---- diff --git a/pkg/util/admission/work_queue.go b/pkg/util/admission/work_queue.go index 2dc6f6dfda93..98da8a399e18 100644 --- a/pkg/util/admission/work_queue.go +++ b/pkg/util/admission/work_queue.go @@ -333,6 +333,9 @@ type workQueueOptions struct { timeSource timeutil.TimeSource // The epoch closing goroutine can be disabled for tests. disableEpochClosingGoroutine bool + // The background resetting of used and GC'ing of tenants can be disabled + // for tests. + disableGCTenantsAndResetUsed bool } func makeWorkQueueOptions(workKind WorkKind) workQueueOptions { @@ -403,18 +406,20 @@ func initWorkQueue( q.mu.tenants = make(map[uint64]*tenantInfo) q.sampleEpochLIFOSettingsLocked() }() - go func() { - ticker := time.NewTicker(time.Second) - for { - select { - case <-ticker.C: - q.gcTenantsAndResetTokens() - case <-stopCh: - // Channel closed. - return + if !opts.disableGCTenantsAndResetUsed { + go func() { + ticker := time.NewTicker(time.Second) + for { + select { + case <-ticker.C: + q.gcTenantsAndResetUsed() + case <-stopCh: + // Channel closed. + return + } } - } - }() + }() + } q.tryCloseEpoch(q.timeNow()) if !opts.disableEpochClosingGoroutine { q.startClosingEpochs() @@ -677,29 +682,19 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err // the state of the requesters to see if there is any queued work that // can be granted admission. q.mu.Lock() - prevTenant := tenant - // The tenant could have been removed when using tokens. See the comment - // where the tenantInfo struct is declared. + // The tenant could have been removed. See the comment where the + // tenantInfo struct is declared. tenant, ok = q.mu.tenants[tenantID] - if !q.usesTokens { - if !ok || prevTenant != tenant { - panic("prev tenantInfo no longer in map") - } - if tenant.used < uint64(info.RequestedCount) { - panic(errors.AssertionFailedf("tenant.used %d < info.RequestedCount %d", - tenant.used, info.RequestedCount)) - } + if !ok { + tenant = newTenantInfo(tenantID, q.getTenantWeightLocked(tenantID)) + q.mu.tenants[tenantID] = tenant + } + // Don't want to overflow tenant.used if it has decreased because of being + // reset to 0 by the GC goroutine. + if tenant.used >= uint64(info.RequestedCount) { tenant.used -= uint64(info.RequestedCount) } else { - if !ok { - tenant = newTenantInfo(tenantID, q.getTenantWeightLocked(tenantID)) - q.mu.tenants[tenantID] = tenant - } - // Don't want to overflow tenant.used if it is already 0 because of - // being reset to 0 by the GC goroutine. - if tenant.used >= uint64(info.RequestedCount) { - tenant.used -= uint64(info.RequestedCount) - } + tenant.used = 0 } } @@ -755,7 +750,7 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err ) } - return // return without waiting (admission is asynchronous) + return false, nil // return without waiting (admission is asynchronous) } // Start waiting for admission. @@ -772,16 +767,11 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err // are too short, we could underestimate the actual wait time. tenant.priorityStates.updateDelayLocked(work.priority, waitDur, true /* canceled */) if work.heapIndex == -1 { - // No longer in heap. Raced with token/slot grant. - if !q.usesTokens { - if tenant.used < uint64(info.RequestedCount) { - panic(errors.AssertionFailedf("tenant.used %d < info.RequestedCount %d", - tenant.used, info.RequestedCount)) - } - tenant.used -= uint64(info.RequestedCount) - } - // Else, we don't decrement tenant.used since we don't want to race with - // the gc goroutine that will set used=0. + // No longer in heap. Raced with token/slot grant. Don't bother + // decrementing tenant.used since we don't want to race with the gc + // goroutine that sets used=0 and could have GC'd tenant and returned it + // to the sync.Pool. We can fix this if needed by calling + // adjustTenantUsedLocked. q.mu.Unlock() q.granter.returnGrant(info.RequestedCount) // The channel is sent to after releasing mu, so we don't need to hold @@ -828,22 +818,18 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err // AdmittedWorkDone is used to inform the WorkQueue that some admitted work is // finished. It must be called iff the WorkKind of this WorkQueue uses slots // (not tokens), i.e., KVWork, SQLStatementLeafStartWork, -// SQLStatementRootStartWork. -func (q *WorkQueue) AdmittedWorkDone(tenantID roachpb.TenantID) { +// SQLStatementRootStartWork. Note, there is no support for SQLStatementLeafStartWork, +// SQLStatementRootStartWork in the code yet. +func (q *WorkQueue) AdmittedWorkDone(tenantID roachpb.TenantID, cpuTime time.Duration) { if q.usesTokens { panic(errors.AssertionFailedf("tokens should not be returned")) } - // Single slot is allocated for the work. - q.mu.Lock() - tenant, ok := q.mu.tenants[tenantID.ToUint64()] - if !ok { - panic(errors.AssertionFailedf("tenant not found")) - } - tenant.used-- - if isInTenantHeap(tenant) { - q.mu.tenantHeap.fix(tenant) + // Single slot is allocated for the work in the granter, and tenant.used was + // incremented by 1. + additionalUsed := cpuTime - 1 + if additionalUsed != 0 { + q.adjustTenantUsed(tenantID, additionalUsed.Nanoseconds()) } - q.mu.Unlock() q.granter.returnGrant(1) } @@ -922,7 +908,7 @@ func (q *WorkQueue) granted(grantChainID grantChainID) int64 { return requestedCount } -func (q *WorkQueue) gcTenantsAndResetTokens() { +func (q *WorkQueue) gcTenantsAndResetUsed() { q.mu.Lock() defer q.mu.Unlock() // With large numbers of active tenants, this iteration could hold the lock @@ -932,7 +918,7 @@ func (q *WorkQueue) gcTenantsAndResetTokens() { if info.used == 0 && !isInTenantHeap(info) { delete(q.mu.tenants, id) releaseTenantInfo(info) - } else if q.usesTokens { + } else { info.used = 0 // All the heap members will reset used=0, so no need to change heap // ordering. @@ -940,26 +926,30 @@ func (q *WorkQueue) gcTenantsAndResetTokens() { } } -// adjustTenantTokens is used internally by StoreWorkQueue. The -// additionalTokensNeeded count can be negative, in which case it is returning -// tokens. This is only for WorkQueue's own accounting -- it should not call -// into granter. -func (q *WorkQueue) adjustTenantTokens(tenantID roachpb.TenantID, additionalTokensNeeded int64) { +// adjustTenantUsed is used internally by StoreWorkQueue, and by the KV queue +// in AdmittedWorkDone. The additionalUsed count can be negative, in which +// case it is returning unused resources. This is only for WorkQueue's own +// accounting -- it should not call into granter. +func (q *WorkQueue) adjustTenantUsed(tenantID roachpb.TenantID, additionalUsed int64) { tid := tenantID.ToUint64() q.mu.Lock() defer q.mu.Unlock() tenant, ok := q.mu.tenants[tid] - if ok { - if additionalTokensNeeded < 0 { - toReturn := uint64(-additionalTokensNeeded) - if tenant.used < toReturn { - tenant.used = 0 - } else { - tenant.used -= toReturn - } + if !ok { + return + } + if additionalUsed < 0 { + toReturn := uint64(-additionalUsed) + if tenant.used < toReturn { + tenant.used = 0 } else { - tenant.used += uint64(additionalTokensNeeded) + tenant.used -= toReturn } + } else { + tenant.used += uint64(additionalUsed) + } + if isInTenantHeap(tenant) { + q.mu.tenantHeap.fix(tenant) } } @@ -1264,31 +1254,37 @@ type tenantInfo struct { id uint64 // The weight assigned to the tenant. Must be > 0. weight uint32 - // used can be the currently used slots, or the tokens granted within the last - // interval. + // used is computed over an interval and periodically reset. Ordering + // between tenants, for fair sharing, utilizes this value. + // + // - For slots, used represents cpu time duration consumed by the tenant. It + // is incremented by 1 (for non-elastic work) or some prediction of cpu + // time (for elastic work) when the work is admitted. A correction is + // applied when the work is done based on the actual cpu time consumed. + // - For tokens, used represents the tokens consumed. A prediction of tokens + // that will be consumed is deducted at admission time, and a correction + // is applied later. // // tenantInfo will not be GC'd until both used==0 and // len(waitingWorkHeap)==0. // - // Note that used can be reset to 0 periodically, iff the WorkQueue is using - // tokens (not slots). This creates a risk since callers of Admit hold - // references to tenantInfo. We do not want a race condition where the - // tenantInfo held in Admit is returned to the sync.Pool. Note that this - // race is almost impossible to reproduce in practice since GC loop runs at - // 1s intervals and needs two iterations to GC a tenantInfo -- first to - // reset used=0 and then the next time to GC it. We fix this by being - // careful in the code of Admit by not reusing a reference to tenantInfo, - // and instead grab a new reference from the map. + // The used value is reset to 0 periodically. This creates a risk since + // callers of Admit hold references to tenantInfo. We do not want a race + // condition where the tenantInfo held in Admit is returned to the + // sync.Pool. Note that this race is almost impossible to reproduce in + // practice since GC loop runs at 1s intervals and needs two iterations to + // GC a tenantInfo -- first to reset used=0 and then the next time to GC it. + // We fix this by being careful in the code of Admit by not reusing a + // reference to tenantInfo, and instead grab a new reference from the map. // // The above fix for the GC race condition is insufficient to prevent // overflow of the used field if the reset to used=0 happens between used++ // and used-- within Admit. Properly fixing that would need to track the // count of used==0 resets and gate the used-- on the count not having // changed. This was considered unnecessarily complicated and instead we - // simply (a) do not do used-- for the tokens case, if used is already zero, - // or (b) do not do used-- for the tokens case if the request was canceled. - // This does imply some inaccuracy in token counting -- it can be fixed if - // needed. + // simply (a) do not do used--, if used is already zero, or (b) do not do + // used-- if the request was canceled. This does imply some inaccuracy in + // accounting -- it can be fixed if needed. used uint64 waitingWorkHeap waitingWorkHeap openEpochsHeap openEpochsHeap @@ -1997,7 +1993,7 @@ func (q *StoreWorkQueue) admittedReplicatedWork( if !coordMuLocked { q.coordMu.Unlock() } - q.q[wc].adjustTenantTokens(tenantID, additionalTokensNeeded) + q.q[wc].adjustTenantUsed(tenantID, additionalTokensNeeded) // Inform callers of the entry we just admitted. // @@ -2042,7 +2038,7 @@ func (q *StoreWorkQueue) AdmittedWorkDone(h StoreWorkHandle, doneInfo StoreWorkD } q.updateStoreStatsAfterWorkDone(1, doneInfo, false) additionalTokens := q.granters[h.workClass].storeWriteDone(h.writeTokens, doneInfo) - q.q[h.workClass].adjustTenantTokens(h.tenantID, additionalTokens) + q.q[h.workClass].adjustTenantUsed(h.tenantID, additionalTokens) return nil } diff --git a/pkg/util/admission/work_queue_test.go b/pkg/util/admission/work_queue_test.go index bdb287c81a6f..1c30b9c5a3f5 100644 --- a/pkg/util/admission/work_queue_test.go +++ b/pkg/util/admission/work_queue_test.go @@ -208,6 +208,7 @@ func TestWorkQueueBasic(t *testing.T) { timeSource = timeutil.NewManualTime(initialTime) opts.timeSource = timeSource opts.disableEpochClosingGoroutine = true + opts.disableGCTenantsAndResetUsed = true st = cluster.MakeTestingClusterSettings() q = makeWorkQueue(log.MakeTestingAmbientContext(tracing.NewTracer()), KVWork, tg, st, metrics, opts).(*WorkQueue) @@ -289,7 +290,11 @@ func TestWorkQueueBasic(t *testing.T) { if !work.admitted { return fmt.Sprintf("id not admitted: %d\n", id) } - q.AdmittedWorkDone(work.tenantID) + cpuTime := int64(1) + if d.HasArg("cpu-time") { + d.ScanArgs(t, "cpu-time", &cpuTime) + } + q.AdmittedWorkDone(work.tenantID, time.Duration(cpuTime)) wrkMap.delete(id) return buf.stringAndReset() @@ -324,6 +329,10 @@ func TestWorkQueueBasic(t *testing.T) { q.tryCloseEpoch(timeSource.Now()) return q.String() + case "gc-tenants-and-reset-used": + q.gcTenantsAndResetUsed() + return q.String() + default: return fmt.Sprintf("unknown command: %s", d.Cmd) } @@ -406,7 +415,7 @@ func TestWorkQueueTokenResetRace(t *testing.T) { // This hot loop with GC calls is able to trigger the previously buggy // code by squeezing in multiple times between the token grant and // cancellation. - q.gcTenantsAndResetTokens() + q.gcTenantsAndResetUsed() } } }()