Skip to content

Commit

Permalink
Merge #108364
Browse files Browse the repository at this point in the history
108364: admission,kvadmission: use tenant cpu consumption for inter-tenant fa… r=irfansharif a=sumeerbhola

…irness

Previously, we were using the instantaneous slots consumed, since that code predated the grunning instrumentation for cpu consumption. The reset logic for tenantInfo.used is now the same for WorkQueues that use slots and tokens. Additionally, there was a bug in WorkQueue.adjustTenantTokens in that it forgot to fix the heap -- this is fixed and tested.

Fixes #91533

Epic: none

Release note: None

Co-authored-by: sumeerbhola <[email protected]>
  • Loading branch information
craig[bot] and sumeerbhola committed Aug 25, 2023
2 parents 6cf3b69 + 8e307e0 commit 1b8b7c8
Show file tree
Hide file tree
Showing 11 changed files with 231 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ import (
// [1]: Co-locating the SQL pod and the workload generator is a bit funky, but
// it works fine enough as written and saves us from using another 4 nodes
// per test.
//
// TODO(sumeer): Now that we are counting actual CPU for inter-tenant
// fairness, alter the read-heavy workloads to perform different sized work,
// and evaluate fairness.
func registerMultiTenantFairness(r registry.Registry) {
specs := []multiTenantFairnessSpec{
{
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvadmission/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//pkg/util/admission",
"//pkg/util/admission/admissionpb",
"//pkg/util/buildutil",
"//pkg/util/grunning",
"//pkg/util/log",
"//pkg/util/stop",
"//pkg/util/timeutil",
Expand Down
17 changes: 16 additions & 1 deletion pkg/kv/kvserver/kvadmission/kvadmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/grunning"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -222,6 +223,7 @@ type Handle struct {
raftAdmissionMeta *kvflowcontrolpb.RaftAdmissionMeta

callAdmittedWorkDoneOnKVAdmissionQ bool
cpuStart time.Duration
}

// AnnotateCtx annotates the given context with request-scoped admission
Expand Down Expand Up @@ -403,6 +405,11 @@ func (n *controllerImpl) AdmitKVWork(
if err != nil {
return Handle{}, err
}
if callAdmittedWorkDoneOnKVAdmissionQ {
// We include the time to do other activities like intent resolution,
// since it is acceptable to charge them to the tenant.
ah.cpuStart = grunning.Time()
}
ah.callAdmittedWorkDoneOnKVAdmissionQ = callAdmittedWorkDoneOnKVAdmissionQ
}
}
Expand All @@ -413,7 +420,15 @@ func (n *controllerImpl) AdmitKVWork(
func (n *controllerImpl) AdmittedKVWorkDone(ah Handle, writeBytes *StoreWriteBytes) {
n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.AdmittedWorkDone(ah.elasticCPUWorkHandle)
if ah.callAdmittedWorkDoneOnKVAdmissionQ {
n.kvAdmissionQ.AdmittedWorkDone(ah.tenantID)
cpuTime := grunning.Time() - ah.cpuStart
if cpuTime < 0 {
// See https://github.com/cockroachdb/cockroach/issues/95529. Count 1
// nanosecond, arbitrarily.
//
// TODO(sumeer): remove this hack when that bug is fixed.
cpuTime = 1
}
n.kvAdmissionQ.AdmittedWorkDone(ah.tenantID, cpuTime)
}
if ah.storeAdmissionQ != nil {
var doneInfo admission.StoreWorkDoneInfo
Expand Down
10 changes: 7 additions & 3 deletions pkg/util/admission/elastic_cpu_work_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/grunning"
)

// ElasticCPUWorkHandle groups relevant data for admitted elastic CPU work,
// specifically how much on-CPU time a request is allowed to make use of (used
// for cooperative scheduling with elastic CPU granters).
type ElasticCPUWorkHandle struct {
tenantID roachpb.TenantID
// cpuStart captures the running time of the calling goroutine when this
// handle is constructed.
cpuStart time.Duration
Expand Down Expand Up @@ -51,8 +53,10 @@ type ElasticCPUWorkHandle struct {
testingOverrideOverLimit func() (bool, time.Duration)
}

func newElasticCPUWorkHandle(allotted time.Duration) *ElasticCPUWorkHandle {
h := &ElasticCPUWorkHandle{allotted: allotted}
func newElasticCPUWorkHandle(
tenantID roachpb.TenantID, allotted time.Duration,
) *ElasticCPUWorkHandle {
h := &ElasticCPUWorkHandle{tenantID: tenantID, allotted: allotted}
h.cpuStart = grunning.Time()
return h
}
Expand Down Expand Up @@ -180,7 +184,7 @@ func ElasticCPUWorkHandleFromContext(ctx context.Context) *ElasticCPUWorkHandle
// TestingNewElasticCPUHandle exports the ElasticCPUWorkHandle constructor for
// testing purposes.
func TestingNewElasticCPUHandle() *ElasticCPUWorkHandle {
return newElasticCPUWorkHandle(420 * time.Hour) // use a very high allotment
return newElasticCPUWorkHandle(roachpb.SystemTenantID, 420*time.Hour) // use a very high allotment
}

// TestingNewElasticCPUHandleWithCallback constructs an ElasticCPUWorkHandle
Expand Down
5 changes: 3 additions & 2 deletions pkg/util/admission/elastic_cpu_work_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/stretchr/testify/require"
)
Expand All @@ -34,7 +35,7 @@ func TestElasticCPUWorkHandle(t *testing.T) {

setRunning(zero)

handle := newElasticCPUWorkHandle(allotment)
handle := newElasticCPUWorkHandle(roachpb.SystemTenantID, allotment)
handle.testingOverrideRunningTime = func() time.Duration {
overrideMu.Lock()
defer overrideMu.Unlock()
Expand Down Expand Up @@ -176,7 +177,7 @@ func TestElasticCPUWorkHandlePreWork(t *testing.T) {

setRunning(zero)

handle := newElasticCPUWorkHandle(allotment)
handle := newElasticCPUWorkHandle(roachpb.SystemTenantID, allotment)
handle.testingOverrideRunningTime = func() time.Duration {
overrideMu.Lock()
defer overrideMu.Unlock()
Expand Down
5 changes: 4 additions & 1 deletion pkg/util/admission/elastic_cpu_work_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
)
Expand Down Expand Up @@ -52,6 +53,7 @@ type elasticCPUInternalWorkQueue interface {
requester
Admit(ctx context.Context, info WorkInfo) (enabled bool, err error)
SetTenantWeights(tenantWeights map[uint64]uint32)
adjustTenantUsed(tenantID roachpb.TenantID, additionalUsed int64)
}

func makeElasticCPUWorkQueue(
Expand Down Expand Up @@ -90,7 +92,7 @@ func (e *ElasticCPUWorkQueue) Admit(
return nil, nil
}
e.metrics.AcquiredNanos.Inc(duration.Nanoseconds())
return newElasticCPUWorkHandle(duration), nil
return newElasticCPUWorkHandle(info.TenantID, duration), nil
}

// AdmittedWorkDone indicates to the queue that the admitted work has
Expand All @@ -102,6 +104,7 @@ func (e *ElasticCPUWorkQueue) AdmittedWorkDone(h *ElasticCPUWorkHandle) {

e.metrics.PreWorkNanos.Inc(h.preWork.Nanoseconds())
_, difference := h.OverLimit()
e.workQueue.adjustTenantUsed(h.tenantID, difference.Nanoseconds())
if difference > 0 {
// We've used up our allotted slice, which we've already deducted tokens
// for. But we've gone over by difference, which we now need to deduct
Expand Down
14 changes: 12 additions & 2 deletions pkg/util/admission/elastic_cpu_work_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
"github.com/cockroachdb/datadriven"
Expand Down Expand Up @@ -64,7 +65,7 @@ func TestElasticCPUWorkQueue(t *testing.T) {
d.ScanArgs(t, "disabled", &elasticCPUInternalWorkQueue.disabled)
}

handle, err := elasticWorkQ.Admit(ctx, duration, WorkInfo{})
handle, err := elasticWorkQ.Admit(ctx, duration, WorkInfo{TenantID: roachpb.SystemTenantID})
require.NoError(t, err)

var buf strings.Builder
Expand Down Expand Up @@ -97,7 +98,7 @@ func TestElasticCPUWorkQueue(t *testing.T) {
allotted, err := time.ParseDuration(allottedStr)
require.NoError(t, err)

handle := &ElasticCPUWorkHandle{}
handle := &ElasticCPUWorkHandle{tenantID: roachpb.SystemTenantID}
handle.testingOverrideRunningTime = func() time.Duration {
return running
}
Expand Down Expand Up @@ -170,6 +171,15 @@ func (t *testElasticCPUInternalWorkQueue) SetTenantWeights(tenantWeights map[uin
panic("unimplemented")
}

func (t *testElasticCPUInternalWorkQueue) adjustTenantUsed(
tenantID roachpb.TenantID, additionalUsed int64,
) {
if !t.disabled {
fmt.Fprintf(&t.buf, "adjust-tenant-used: tenant=%s additional-used=%s",
tenantID.String(), time.Duration(additionalUsed).String())
}
}

func (t *testElasticCPUInternalWorkQueue) hasWaitingRequests() bool {
panic("unimplemented")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/admission/testdata/elastic_cpu_work_queue
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ handle: 50ms
admitted-work-done running=10ms allotted=50ms
----
granter: return-grant=40ms
work-queue:
work-queue: adjust-tenant-used: tenant=system additional-used=-40ms
metrics: acquired=50ms returned=40ms max-available=8s

# Repeat the same but this time simulate what happens if we've taken less than
Expand All @@ -83,7 +83,7 @@ handle: 50ms
admitted-work-done running=70ms allotted=50ms
----
granter: took-without-permission=20ms
work-queue:
work-queue: adjust-tenant-used: tenant=system additional-used=20ms
metrics: acquired=70ms returned=0s max-available=8s

# vim:ft=sh
98 changes: 88 additions & 10 deletions pkg/util/admission/testdata/work_queue
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ admit id=4 tenant=71 priority=-128 create-time-millis=4 bypass=false
admit id=5 tenant=71 priority=0 create-time-millis=5 bypass=false
----

# Tenant 71 is the top of the heap since not using any slots.
# Tenant 71 is the top of the heap since has not used any cpu time.
print
----
closed epoch: 0 tenantHeap len: 2 top tenant: 71
Expand All @@ -60,8 +60,8 @@ continueGrantChain 5
id 5: admit succeeded
granted: returned 1

# Both tenants are using 1 slot. The tie is broken arbitrarily in favor of
# tenant 71.
# Both tenants have used 1 cpu nano time. The tie is broken arbitrarily in
# favor of tenant 71.
print
----
closed epoch: 0 tenantHeap len: 2 top tenant: 71
Expand All @@ -79,12 +79,12 @@ closed epoch: 0 tenantHeap len: 2 top tenant: 71
tenant-id: 53 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 3, epoch: 0, qt: 100]
tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100]

# The work admitted for tenant 53 is done.
work-done id=1
# The work admitted for tenant 53 is done and consumed no cpu-time
work-done id=1 cpu-time=0
----
returnGrant 1

# Tenant 53 now using fewer slots so it becomes the top of the heap.
# Tenant 53 has used no cpu, so it becomes the top of the heap.
print
----
closed epoch: 0 tenantHeap len: 2 top tenant: 53
Expand All @@ -105,25 +105,103 @@ closed epoch: 0 tenantHeap len: 2 top tenant: 53
tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 3, epoch: 0, qt: 100]
tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100]

# The system tenant work is done and consumed 10 cpu nanos.
work-done id=6 cpu-time=10
----
returnGrant 1

print
----
closed epoch: 0 tenantHeap len: 2 top tenant: 53
tenant-id: 1 used: 10, w: 1, fifo: -128
tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 3, epoch: 0, qt: 100]
tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100]

# Another request from tenant 53, which is behind the existing request in the heap.
admit id=7 tenant=53 priority=0 create-time-millis=5 bypass=false
----

print
----
closed epoch: 0 tenantHeap len: 2 top tenant: 53
tenant-id: 1 used: 10, w: 1, fifo: -128
tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 3, epoch: 0, qt: 100] [1: pri: normal-pri, ct: 5, epoch: 0, qt: 100]
tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100]

granted chain-id=7
----
continueGrantChain 7
id 2: admit succeeded
granted: returned 1

# Both tenants have used 1 cpu nano time. The tie is broken arbitrarily in
# favor of tenant 53.
print
----
closed epoch: 0 tenantHeap len: 2 top tenant: 53
tenant-id: 1 used: 10, w: 1, fifo: -128
tenant-id: 53 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 5, epoch: 0, qt: 100]
tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100]

# The work admitted for tenant 53 is done and has consumed 20 cpu nanos, so
# tenant 71 moves to the top of the heap.
work-done id=2 cpu-time=20
----
returnGrant 1

print
----
closed epoch: 0 tenantHeap len: 2 top tenant: 71
tenant-id: 1 used: 10, w: 1, fifo: -128
tenant-id: 53 used: 20, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 5, epoch: 0, qt: 100]
tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100]

gc-tenants-and-reset-used
----
closed epoch: 0 tenantHeap len: 2 top tenant: 71
tenant-id: 1 used: 0, w: 1, fifo: -128
tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 5, epoch: 0, qt: 100]
tenant-id: 71 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100]

granted chain-id=9
----
continueGrantChain 9
id 4: admit succeeded
granted: returned 1

# Tenant 71 has used 1 cpu nano.
print
----
closed epoch: 0 tenantHeap len: 1 top tenant: 53
tenant-id: 1 used: 0, w: 1, fifo: -128
tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 5, epoch: 0, qt: 100]
tenant-id: 71 used: 1, w: 1, fifo: -128

# Try to return more cpu than used, to check that there is no overflow.
work-done id=4 cpu-time=-5
----
returnGrant 1

print
----
closed epoch: 0 tenantHeap len: 1 top tenant: 53
tenant-id: 1 used: 0, w: 1, fifo: -128
tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 5, epoch: 0, qt: 100]
tenant-id: 71 used: 0, w: 1, fifo: -128

granted chain-id=10
----
continueGrantChain 10
id 7: admit succeeded
granted: returned 1

# No more waiting requests.
print
----
closed epoch: 0 tenantHeap len: 0
tenant-id: 1 used: 1, w: 1, fifo: -128
tenant-id: 1 used: 0, w: 1, fifo: -128
tenant-id: 53 used: 1, w: 1, fifo: -128
tenant-id: 71 used: 2, w: 1, fifo: -128
tenant-id: 71 used: 0, w: 1, fifo: -128

# Granted returns false.
granted chain-id=10
Expand All @@ -133,9 +211,9 @@ granted: returned 0
print
----
closed epoch: 0 tenantHeap len: 0
tenant-id: 1 used: 1, w: 1, fifo: -128
tenant-id: 1 used: 0, w: 1, fifo: -128
tenant-id: 53 used: 1, w: 1, fifo: -128
tenant-id: 71 used: 2, w: 1, fifo: -128
tenant-id: 71 used: 0, w: 1, fifo: -128

init
----
Expand Down
Loading

0 comments on commit 1b8b7c8

Please sign in to comment.