Skip to content

Commit

Permalink
admission,kvadmission: use tenant cpu consumption for inter-tenant fa…
Browse files Browse the repository at this point in the history
…irness

Previously, we were using the instantaneous slots consumed, since that
code predated the grunning instrumentation for cpu consumption. The
reset logic for tenantInfo.used is now the same for WorkQueues that use
slots and tokens. Additionally, there was a bug in
WorkQueue.adjustTenantTokens in that it forgot to fix the heap -- this
is fixed and tested.

Fixes cockroachdb#91533

Epic: none

Release note: None
  • Loading branch information
sumeerbhola committed Aug 8, 2023
1 parent 1382b26 commit 970caa0
Show file tree
Hide file tree
Showing 10 changed files with 219 additions and 109 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvadmission/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//pkg/util/admission",
"//pkg/util/admission/admissionpb",
"//pkg/util/buildutil",
"//pkg/util/grunning",
"//pkg/util/log",
"//pkg/util/stop",
"//pkg/util/timeutil",
Expand Down
10 changes: 9 additions & 1 deletion pkg/kv/kvserver/kvadmission/kvadmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/grunning"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -214,6 +215,7 @@ type Handle struct {
raftAdmissionMeta *kvflowcontrolpb.RaftAdmissionMeta

callAdmittedWorkDoneOnKVAdmissionQ bool
cpuStart time.Duration
}

// AnnotateCtx annotates the given context with request-scoped admission
Expand Down Expand Up @@ -373,6 +375,11 @@ func (n *controllerImpl) AdmitKVWork(
if err != nil {
return Handle{}, err
}
if callAdmittedWorkDoneOnKVAdmissionQ {
// We include the time to do other activities like intent resolution,
// since it is acceptable to charge them to the tenant.
ah.cpuStart = grunning.Time()
}
ah.callAdmittedWorkDoneOnKVAdmissionQ = callAdmittedWorkDoneOnKVAdmissionQ
}
}
Expand All @@ -383,7 +390,8 @@ func (n *controllerImpl) AdmitKVWork(
func (n *controllerImpl) AdmittedKVWorkDone(ah Handle, writeBytes *StoreWriteBytes) {
n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.AdmittedWorkDone(ah.elasticCPUWorkHandle)
if ah.callAdmittedWorkDoneOnKVAdmissionQ {
n.kvAdmissionQ.AdmittedWorkDone(ah.tenantID)
cpuTime := grunning.Time() - ah.cpuStart
n.kvAdmissionQ.AdmittedWorkDone(ah.tenantID, cpuTime)
}
if ah.storeAdmissionQ != nil {
var doneInfo admission.StoreWorkDoneInfo
Expand Down
10 changes: 7 additions & 3 deletions pkg/util/admission/elastic_cpu_work_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/grunning"
)

// ElasticCPUWorkHandle groups relevant data for admitted elastic CPU work,
// specifically how much on-CPU time a request is allowed to make use of (used
// for cooperative scheduling with elastic CPU granters).
type ElasticCPUWorkHandle struct {
tenantID roachpb.TenantID
// cpuStart captures the running time of the calling goroutine when this
// handle is constructed.
cpuStart time.Duration
Expand Down Expand Up @@ -51,8 +53,10 @@ type ElasticCPUWorkHandle struct {
testingOverrideOverLimit func() (bool, time.Duration)
}

func newElasticCPUWorkHandle(allotted time.Duration) *ElasticCPUWorkHandle {
h := &ElasticCPUWorkHandle{allotted: allotted}
func newElasticCPUWorkHandle(
tenantID roachpb.TenantID, allotted time.Duration,
) *ElasticCPUWorkHandle {
h := &ElasticCPUWorkHandle{tenantID: tenantID, allotted: allotted}
h.cpuStart = grunning.Time()
return h
}
Expand Down Expand Up @@ -180,7 +184,7 @@ func ElasticCPUWorkHandleFromContext(ctx context.Context) *ElasticCPUWorkHandle
// TestingNewElasticCPUHandle exports the ElasticCPUWorkHandle constructor for
// testing purposes.
func TestingNewElasticCPUHandle() *ElasticCPUWorkHandle {
return newElasticCPUWorkHandle(420 * time.Hour) // use a very high allotment
return newElasticCPUWorkHandle(roachpb.SystemTenantID, 420*time.Hour) // use a very high allotment
}

// TestingNewElasticCPUHandleWithCallback constructs an ElasticCPUWorkHandle
Expand Down
5 changes: 3 additions & 2 deletions pkg/util/admission/elastic_cpu_work_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/stretchr/testify/require"
)
Expand All @@ -34,7 +35,7 @@ func TestElasticCPUWorkHandle(t *testing.T) {

setRunning(zero)

handle := newElasticCPUWorkHandle(allotment)
handle := newElasticCPUWorkHandle(roachpb.SystemTenantID, allotment)
handle.testingOverrideRunningTime = func() time.Duration {
overrideMu.Lock()
defer overrideMu.Unlock()
Expand Down Expand Up @@ -176,7 +177,7 @@ func TestElasticCPUWorkHandlePreWork(t *testing.T) {

setRunning(zero)

handle := newElasticCPUWorkHandle(allotment)
handle := newElasticCPUWorkHandle(roachpb.SystemTenantID, allotment)
handle.testingOverrideRunningTime = func() time.Duration {
overrideMu.Lock()
defer overrideMu.Unlock()
Expand Down
5 changes: 4 additions & 1 deletion pkg/util/admission/elastic_cpu_work_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
)
Expand Down Expand Up @@ -52,6 +53,7 @@ type elasticCPUInternalWorkQueue interface {
requester
Admit(ctx context.Context, info WorkInfo) (enabled bool, err error)
SetTenantWeights(tenantWeights map[uint64]uint32)
adjustTenantUsed(tenantID roachpb.TenantID, additionalUsed int64)
}

func makeElasticCPUWorkQueue(
Expand Down Expand Up @@ -90,7 +92,7 @@ func (e *ElasticCPUWorkQueue) Admit(
return nil, nil
}
e.metrics.AcquiredNanos.Inc(duration.Nanoseconds())
return newElasticCPUWorkHandle(duration), nil
return newElasticCPUWorkHandle(info.TenantID, duration), nil
}

// AdmittedWorkDone indicates to the queue that the admitted work has
Expand All @@ -102,6 +104,7 @@ func (e *ElasticCPUWorkQueue) AdmittedWorkDone(h *ElasticCPUWorkHandle) {

e.metrics.PreWorkNanos.Inc(h.preWork.Nanoseconds())
_, difference := h.OverLimit()
e.workQueue.adjustTenantUsed(h.tenantID, difference.Nanoseconds())
if difference > 0 {
// We've used up our allotted slice, which we've already deducted tokens
// for. But we've gone over by difference, which we now need to deduct
Expand Down
14 changes: 12 additions & 2 deletions pkg/util/admission/elastic_cpu_work_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
"github.com/cockroachdb/datadriven"
Expand Down Expand Up @@ -64,7 +65,7 @@ func TestElasticCPUWorkQueue(t *testing.T) {
d.ScanArgs(t, "disabled", &elasticCPUInternalWorkQueue.disabled)
}

handle, err := elasticWorkQ.Admit(ctx, duration, WorkInfo{})
handle, err := elasticWorkQ.Admit(ctx, duration, WorkInfo{TenantID: roachpb.SystemTenantID})
require.NoError(t, err)

var buf strings.Builder
Expand Down Expand Up @@ -97,7 +98,7 @@ func TestElasticCPUWorkQueue(t *testing.T) {
allotted, err := time.ParseDuration(allottedStr)
require.NoError(t, err)

handle := &ElasticCPUWorkHandle{}
handle := &ElasticCPUWorkHandle{tenantID: roachpb.SystemTenantID}
handle.testingOverrideRunningTime = func() time.Duration {
return running
}
Expand Down Expand Up @@ -170,6 +171,15 @@ func (t *testElasticCPUInternalWorkQueue) SetTenantWeights(tenantWeights map[uin
panic("unimplemented")
}

func (t *testElasticCPUInternalWorkQueue) adjustTenantUsed(
tenantID roachpb.TenantID, additionalUsed int64,
) {
if !t.disabled {
fmt.Fprintf(&t.buf, "adjustTenantUsed: tenantID=%s additionalUsed=%d",
tenantID.String(), additionalUsed)
}
}

func (t *testElasticCPUInternalWorkQueue) hasWaitingRequests() bool {
panic("unimplemented")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/admission/testdata/elastic_cpu_work_queue
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ handle: 50ms
admitted-work-done running=10ms allotted=50ms
----
granter: return-grant=40ms
work-queue:
work-queue: adjustTenantUsed: tenantID=system additionalUsed=-40000000
metrics: acquired=50ms returned=40ms max-available=8s

# Repeat the same but this time simulate what happens if we've taken less than
Expand All @@ -83,7 +83,7 @@ handle: 50ms
admitted-work-done running=70ms allotted=50ms
----
granter: took-without-permission=20ms
work-queue:
work-queue: adjustTenantUsed: tenantID=system additionalUsed=20000000
metrics: acquired=70ms returned=0s max-available=8s

# vim:ft=sh
96 changes: 87 additions & 9 deletions pkg/util/admission/testdata/work_queue
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ admit id=4 tenant=71 priority=-128 create-time-millis=4 bypass=false
admit id=5 tenant=71 priority=0 create-time-millis=5 bypass=false
----

# Tenant 71 is the top of the heap since not using any slots.
# Tenant 71 is the top of the heap since has not used any cpu time.
print
----
closed epoch: 0 tenantHeap len: 2 top tenant: 71
Expand All @@ -60,7 +60,7 @@ continueGrantChain 5
id 5: admit succeeded
granted: returned 1

# Both tenants are using 1 slot. The tie is broken arbitrarily in favor of
# Both tenants have used 1 cpu time. The tie is broken arbitrarily in favor of
# tenant 71.
print
----
Expand All @@ -79,12 +79,12 @@ closed epoch: 0 tenantHeap len: 2 top tenant: 71
tenant-id: 53 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 3, epoch: 0, qt: 100]
tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100]

# The work admitted for tenant 53 is done.
work-done id=1
# The work admitted for tenant 53 is done and consumed no cpu-time
work-done id=1 cpu-time=0
----
returnGrant 1

# Tenant 53 now using fewer slots so it becomes the top of the heap.
# Tenant 53 has used no cpu, so it becomes the top of the heap.
print
----
closed epoch: 0 tenantHeap len: 2 top tenant: 53
Expand All @@ -105,25 +105,103 @@ closed epoch: 0 tenantHeap len: 2 top tenant: 53
tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 3, epoch: 0, qt: 100]
tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100]

# The system tenant work is done and consumed 10 cpu.
work-done id=6 cpu-time=10
----
returnGrant 1

print
----
closed epoch: 0 tenantHeap len: 2 top tenant: 53
tenant-id: 1 used: 10, w: 1, fifo: -128
tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 3, epoch: 0, qt: 100]
tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100]

# Another request from tenant 53, which is behind the existing request in the heap.
admit id=7 tenant=53 priority=0 create-time-millis=5 bypass=false
----

print
----
closed epoch: 0 tenantHeap len: 2 top tenant: 53
tenant-id: 1 used: 10, w: 1, fifo: -128
tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 3, epoch: 0, qt: 100] [1: pri: normal-pri, ct: 5, epoch: 0, qt: 100]
tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100]

granted chain-id=7
----
continueGrantChain 7
id 2: admit succeeded
granted: returned 1

# Both tenants have used 1 cpu time. The tie is broken arbitrarily in favor of
# tenant 53.
print
----
closed epoch: 0 tenantHeap len: 2 top tenant: 53
tenant-id: 1 used: 10, w: 1, fifo: -128
tenant-id: 53 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 5, epoch: 0, qt: 100]
tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100]

# The work admitted for tenant 53 is done and has consumed 20 cpu, so tenant
# 71 moves to the top of the heap.
work-done id=2 cpu-time=20
----
returnGrant 1

print
----
closed epoch: 0 tenantHeap len: 2 top tenant: 71
tenant-id: 1 used: 10, w: 1, fifo: -128
tenant-id: 53 used: 20, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 5, epoch: 0, qt: 100]
tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100]

gc-tenants-and-reset-used
----
closed epoch: 0 tenantHeap len: 2 top tenant: 71
tenant-id: 1 used: 0, w: 1, fifo: -128
tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 5, epoch: 0, qt: 100]
tenant-id: 71 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100]

granted chain-id=9
----
continueGrantChain 9
id 4: admit succeeded
granted: returned 1

# Tenant 71 has used 1 cpu.
print
----
closed epoch: 0 tenantHeap len: 1 top tenant: 53
tenant-id: 1 used: 0, w: 1, fifo: -128
tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 5, epoch: 0, qt: 100]
tenant-id: 71 used: 1, w: 1, fifo: -128

# Try to return more cpu than used, to check that there is no overflow.
work-done id=4 cpu-time=-5
----
returnGrant 1

print
----
closed epoch: 0 tenantHeap len: 1 top tenant: 53
tenant-id: 1 used: 0, w: 1, fifo: -128
tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 5, epoch: 0, qt: 100]
tenant-id: 71 used: 0, w: 1, fifo: -128

granted chain-id=10
----
continueGrantChain 10
id 7: admit succeeded
granted: returned 1

# No more waiting requests.
print
----
closed epoch: 0 tenantHeap len: 0
tenant-id: 1 used: 1, w: 1, fifo: -128
tenant-id: 1 used: 0, w: 1, fifo: -128
tenant-id: 53 used: 1, w: 1, fifo: -128
tenant-id: 71 used: 2, w: 1, fifo: -128
tenant-id: 71 used: 0, w: 1, fifo: -128

# Granted returns false.
granted chain-id=10
Expand All @@ -133,9 +211,9 @@ granted: returned 0
print
----
closed epoch: 0 tenantHeap len: 0
tenant-id: 1 used: 1, w: 1, fifo: -128
tenant-id: 1 used: 0, w: 1, fifo: -128
tenant-id: 53 used: 1, w: 1, fifo: -128
tenant-id: 71 used: 2, w: 1, fifo: -128
tenant-id: 71 used: 0, w: 1, fifo: -128

init
----
Expand Down
Loading

0 comments on commit 970caa0

Please sign in to comment.