diff --git a/pkg/testutils/lint/passes/redactcheck/redactcheck.go b/pkg/testutils/lint/passes/redactcheck/redactcheck.go index 4029ebbc9991..cf0e8a76c59e 100644 --- a/pkg/testutils/lint/passes/redactcheck/redactcheck.go +++ b/pkg/testutils/lint/passes/redactcheck/redactcheck.go @@ -184,6 +184,10 @@ func runAnalyzer(pass *analysis.Pass) (interface{}, error) { "TxnEpoch": {}, "TxnSeq": {}, }, + "github.com/cockroachdb/cockroach/pkg/util/admission": { + "WorkKind": {}, + "QueueKind": {}, + }, "github.com/cockroachdb/cockroach/pkg/util/hlc": { "ClockTimestamp": {}, "LegacyTimestamp": {}, diff --git a/pkg/util/admission/admission.go b/pkg/util/admission/admission.go index e487954d4a70..0c415fe0b6b7 100644 --- a/pkg/util/admission/admission.go +++ b/pkg/util/admission/admission.go @@ -543,6 +543,16 @@ func workKindString(workKind WorkKind) string { } } +// QueueKind is used to track the specific WorkQueue an item of KVWork is in. +// The options are one of: "kv-regular-cpu-queue", "kv-elastic-cpu-queue", +// "kv-regular-store-queue", "kv-elastic-store-queue". +// +// It is left empty for SQL types of WorkKind. +type QueueKind string + +// SafeValue implements the redact.SafeValue interface. +func (QueueKind) SafeValue() {} + // storeAdmissionStats are stats maintained by a storeRequester. The non-test // implementation of storeRequester is StoreWorkQueue. StoreWorkQueue updates // all of these when StoreWorkQueue.AdmittedWorkDone is called, so that these diff --git a/pkg/util/admission/admissionpb/admission_stats.proto b/pkg/util/admission/admissionpb/admission_stats.proto index 26355e74c986..eee3d82fd63b 100644 --- a/pkg/util/admission/admissionpb/admission_stats.proto +++ b/pkg/util/admission/admissionpb/admission_stats.proto @@ -19,8 +19,9 @@ import "gogoproto/gogo.proto"; message AdmissionWorkQueueStats { // Duration spent waiting. int64 wait_duration_nanos = 1 [(gogoproto.casttype) = "time.Duration"]; - // String representation of admission work kind. - string work_kind = 2; + // String representation of admission queue kind. + string queue_kind = 2; // Set to true if deadline was exceeded. bool deadline_exceeded = 3; + } diff --git a/pkg/util/admission/grant_coordinator.go b/pkg/util/admission/grant_coordinator.go index 7d00c01c066e..235eefc89410 100644 --- a/pkg/util/admission/grant_coordinator.go +++ b/pkg/util/admission/grant_coordinator.go @@ -437,7 +437,7 @@ func makeElasticGrantCoordinator( schedulerLatencyListener := newSchedulerLatencyListener(ambientCtx, st, schedulerLatencyListenerMetrics, elasticCPUGranter) elasticCPUInternalWorkQueue := &WorkQueue{} - initWorkQueue(elasticCPUInternalWorkQueue, ambientCtx, KVWork, elasticCPUGranter, st, + initWorkQueue(elasticCPUInternalWorkQueue, ambientCtx, KVWork, "kv-elastic-cpu-queue", elasticCPUGranter, st, elasticWorkQueueMetrics, workQueueOptions{usesTokens: true}, nil /* knobs */) // will be closed by the embedding *ElasticCPUWorkQueue elasticCPUWorkQueue := makeElasticCPUWorkQueue(st, elasticCPUInternalWorkQueue, elasticCPUGranter, elasticCPUGranterMetrics) diff --git a/pkg/util/admission/work_queue.go b/pkg/util/admission/work_queue.go index 2abc9b9e2df0..4f0939a47d4c 100644 --- a/pkg/util/admission/work_queue.go +++ b/pkg/util/admission/work_queue.go @@ -276,6 +276,7 @@ func (r LogPosition) Less(o LogPosition) bool { type WorkQueue struct { ambientCtx context.Context workKind WorkKind + queueKind QueueKind granter granter usesTokens bool tiedToRange bool @@ -364,7 +365,11 @@ func makeWorkQueue( opts workQueueOptions, ) requester { q := &WorkQueue{} - initWorkQueue(q, ambientCtx, workKind, granter, settings, metrics, opts, nil) + var queueKind QueueKind + if workKind == KVWork { + queueKind = "kv-regular-cpu-queue" + } + initWorkQueue(q, ambientCtx, workKind, queueKind, granter, settings, metrics, opts, nil) return q } @@ -372,6 +377,7 @@ func initWorkQueue( q *WorkQueue, ambientCtx log.AmbientContext, workKind WorkKind, + queueKind QueueKind, granter granter, settings *cluster.Settings, metrics *WorkQueueMetrics, @@ -388,8 +394,13 @@ func initWorkQueue( timeSource = timeutil.DefaultTimeSource{} } + if queueKind == "" { + queueKind = QueueKind(workKindString(workKind)) + } + q.ambientCtx = ambientCtx.AnnotateCtx(context.Background()) q.workKind = workKind + q.queueKind = queueKind q.granter = granter q.usesTokens = opts.usesTokens q.tiedToRange = opts.tiedToRange @@ -803,10 +814,11 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err q.metrics.incErrored(info.Priority) q.metrics.recordFinishWait(info.Priority, waitDur) deadline, _ := ctx.Deadline() - recordAdmissionWorkQueueStats(span, waitDur, q.workKind, true) + recordAdmissionWorkQueueStats(span, waitDur, q.queueKind, true) + log.Eventf(ctx, "deadline expired, waited in %s queue for %v", q.queueKind, waitDur) return true, - errors.Newf("work %s deadline expired while waiting: deadline: %v, start: %v, dur: %v", - workKindString(q.workKind), deadline, startTime, waitDur) + errors.Newf("deadline expired while waiting in queue: %s, deadline: %v, start: %v, dur: %v", + q.queueKind, deadline, startTime, waitDur) case chainID, ok := <-work.ch: if !ok { panic(errors.AssertionFailedf("channel should not be closed")) @@ -817,21 +829,21 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err if work.heapIndex != -1 { panic(errors.AssertionFailedf("grantee should be removed from heap")) } - recordAdmissionWorkQueueStats(span, waitDur, q.workKind, false) + recordAdmissionWorkQueueStats(span, waitDur, q.queueKind, false) q.granter.continueGrantChain(chainID) return true, nil } } func recordAdmissionWorkQueueStats( - span *tracing.Span, waitDur time.Duration, workKind WorkKind, deadlineExceeded bool, + span *tracing.Span, waitDur time.Duration, queueKind QueueKind, deadlineExceeded bool, ) { if span == nil { return } span.RecordStructured(&admissionpb.AdmissionWorkQueueStats{ WaitDurationNanos: waitDur, - WorkKind: workKindString(workKind), + QueueKind: string(queueKind), DeadlineExceeded: deadlineExceeded, }) } @@ -2202,7 +2214,13 @@ func makeStoreWorkQueue( opts.usesAsyncAdmit = true for i := range q.q { - initWorkQueue(&q.q[i], ambientCtx, KVWork, granters[i], settings, metrics, opts, knobs) + var queueKind QueueKind + if i == int(admissionpb.RegularWorkClass) { + queueKind = "kv-regular-store-queue" + } else if i == int(admissionpb.ElasticWorkClass) { + queueKind = "kv-elastic-store-queue" + } + initWorkQueue(&q.q[i], ambientCtx, KVWork, queueKind, granters[i], settings, metrics, opts, knobs) q.q[i].onAdmittedReplicatedWork = q } // Arbitrary initial value. This will be replaced before any meaningful