Skip to content

Commit

Permalink
admission: inject queue kind into work queue tracing
Browse files Browse the repository at this point in the history
This patch plumbs down a queue type identifier (`QueueKind`) into the
`WorkQueue` to help track down the origin of items in the queue.

Informs: cockroachdb#113990

Release note: None
  • Loading branch information
aadityasondhi committed Dec 10, 2023
1 parent 004cb44 commit 0cad7d5
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 11 deletions.
4 changes: 4 additions & 0 deletions pkg/testutils/lint/passes/redactcheck/redactcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ func runAnalyzer(pass *analysis.Pass) (interface{}, error) {
"TxnEpoch": {},
"TxnSeq": {},
},
"github.com/cockroachdb/cockroach/pkg/util/admission": {
"WorkKind": {},
"QueueKind": {},
},
"github.com/cockroachdb/cockroach/pkg/util/hlc": {
"ClockTimestamp": {},
"LegacyTimestamp": {},
Expand Down
10 changes: 10 additions & 0 deletions pkg/util/admission/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,16 @@ func workKindString(workKind WorkKind) string {
}
}

// QueueKind is used to track the specific WorkQueue an item of KVWork is in.
// The options are one of: "kv-regular-cpu-queue", "kv-elastic-cpu-queue",
// "kv-regular-store-queue", "kv-elastic-store-queue".
//
// It is left empty for SQL types of WorkKind.
type QueueKind string

// SafeValue implements the redact.SafeValue interface.
func (QueueKind) SafeValue() {}

// storeAdmissionStats are stats maintained by a storeRequester. The non-test
// implementation of storeRequester is StoreWorkQueue. StoreWorkQueue updates
// all of these when StoreWorkQueue.AdmittedWorkDone is called, so that these
Expand Down
5 changes: 3 additions & 2 deletions pkg/util/admission/admissionpb/admission_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ import "gogoproto/gogo.proto";
message AdmissionWorkQueueStats {
// Duration spent waiting.
int64 wait_duration_nanos = 1 [(gogoproto.casttype) = "time.Duration"];
// String representation of admission work kind.
string work_kind = 2;
// String representation of admission queue kind.
string queue_kind = 2;
// Set to true if deadline was exceeded.
bool deadline_exceeded = 3;

}
2 changes: 1 addition & 1 deletion pkg/util/admission/grant_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ func makeElasticGrantCoordinator(
schedulerLatencyListener := newSchedulerLatencyListener(ambientCtx, st, schedulerLatencyListenerMetrics, elasticCPUGranter)

elasticCPUInternalWorkQueue := &WorkQueue{}
initWorkQueue(elasticCPUInternalWorkQueue, ambientCtx, KVWork, elasticCPUGranter, st,
initWorkQueue(elasticCPUInternalWorkQueue, ambientCtx, KVWork, "kv-elastic-cpu-queue", elasticCPUGranter, st,
elasticWorkQueueMetrics,
workQueueOptions{usesTokens: true}, nil /* knobs */) // will be closed by the embedding *ElasticCPUWorkQueue
elasticCPUWorkQueue := makeElasticCPUWorkQueue(st, elasticCPUInternalWorkQueue, elasticCPUGranter, elasticCPUGranterMetrics)
Expand Down
34 changes: 26 additions & 8 deletions pkg/util/admission/work_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ func (r LogPosition) Less(o LogPosition) bool {
type WorkQueue struct {
ambientCtx context.Context
workKind WorkKind
queueKind QueueKind
granter granter
usesTokens bool
tiedToRange bool
Expand Down Expand Up @@ -364,14 +365,19 @@ func makeWorkQueue(
opts workQueueOptions,
) requester {
q := &WorkQueue{}
initWorkQueue(q, ambientCtx, workKind, granter, settings, metrics, opts, nil)
var queueKind QueueKind
if workKind == KVWork {
queueKind = "kv-regular-cpu-queue"
}
initWorkQueue(q, ambientCtx, workKind, queueKind, granter, settings, metrics, opts, nil)
return q
}

func initWorkQueue(
q *WorkQueue,
ambientCtx log.AmbientContext,
workKind WorkKind,
queueKind QueueKind,
granter granter,
settings *cluster.Settings,
metrics *WorkQueueMetrics,
Expand All @@ -388,8 +394,13 @@ func initWorkQueue(
timeSource = timeutil.DefaultTimeSource{}
}

if queueKind == "" {
queueKind = QueueKind(workKindString(workKind))
}

q.ambientCtx = ambientCtx.AnnotateCtx(context.Background())
q.workKind = workKind
q.queueKind = queueKind
q.granter = granter
q.usesTokens = opts.usesTokens
q.tiedToRange = opts.tiedToRange
Expand Down Expand Up @@ -803,10 +814,11 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err
q.metrics.incErrored(info.Priority)
q.metrics.recordFinishWait(info.Priority, waitDur)
deadline, _ := ctx.Deadline()
recordAdmissionWorkQueueStats(span, waitDur, q.workKind, true)
recordAdmissionWorkQueueStats(span, waitDur, q.queueKind, true)
log.Eventf(ctx, "deadline expired, waited in %s queue for %v", q.queueKind, waitDur)
return true,
errors.Newf("work %s deadline expired while waiting: deadline: %v, start: %v, dur: %v",
workKindString(q.workKind), deadline, startTime, waitDur)
errors.Newf("deadline expired while waiting in queue: %s, deadline: %v, start: %v, dur: %v",
q.queueKind, deadline, startTime, waitDur)
case chainID, ok := <-work.ch:
if !ok {
panic(errors.AssertionFailedf("channel should not be closed"))
Expand All @@ -817,21 +829,21 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err
if work.heapIndex != -1 {
panic(errors.AssertionFailedf("grantee should be removed from heap"))
}
recordAdmissionWorkQueueStats(span, waitDur, q.workKind, false)
recordAdmissionWorkQueueStats(span, waitDur, q.queueKind, false)
q.granter.continueGrantChain(chainID)
return true, nil
}
}

func recordAdmissionWorkQueueStats(
span *tracing.Span, waitDur time.Duration, workKind WorkKind, deadlineExceeded bool,
span *tracing.Span, waitDur time.Duration, queueKind QueueKind, deadlineExceeded bool,
) {
if span == nil {
return
}
span.RecordStructured(&admissionpb.AdmissionWorkQueueStats{
WaitDurationNanos: waitDur,
WorkKind: workKindString(workKind),
QueueKind: string(queueKind),
DeadlineExceeded: deadlineExceeded,
})
}
Expand Down Expand Up @@ -2202,7 +2214,13 @@ func makeStoreWorkQueue(

opts.usesAsyncAdmit = true
for i := range q.q {
initWorkQueue(&q.q[i], ambientCtx, KVWork, granters[i], settings, metrics, opts, knobs)
var queueKind QueueKind
if i == int(admissionpb.RegularWorkClass) {
queueKind = "kv-regular-store-queue"
} else if i == int(admissionpb.ElasticWorkClass) {
queueKind = "kv-elastic-store-queue"
}
initWorkQueue(&q.q[i], ambientCtx, KVWork, queueKind, granters[i], settings, metrics, opts, knobs)
q.q[i].onAdmittedReplicatedWork = q
}
// Arbitrary initial value. This will be replaced before any meaningful
Expand Down

0 comments on commit 0cad7d5

Please sign in to comment.