Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

admission: include more work queue info in tracing and errors #114664

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/testutils/lint/passes/redactcheck/redactcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ func runAnalyzer(pass *analysis.Pass) (interface{}, error) {
"TxnSeq": {},
},
"github.com/cockroachdb/cockroach/pkg/util/admission": {
"WorkKind": {},
"WorkKind": {},
"QueueKind": {},
},
"github.com/cockroachdb/cockroach/pkg/util/hlc": {
"ClockTimestamp": {},
Expand Down
10 changes: 10 additions & 0 deletions pkg/util/admission/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,16 @@ func (wk WorkKind) String() string {
}
}

// QueueKind is used to track the specific WorkQueue an item of KVWork is in.
// The options are one of: "kv-regular-cpu-queue", "kv-elastic-cpu-queue",
// "kv-regular-store-queue", "kv-elastic-store-queue".
//
// It is left empty for SQL types of WorkKind.
type QueueKind string

// SafeValue implements the redact.SafeValue interface.
func (QueueKind) SafeValue() {}

// storeAdmissionStats are stats maintained by a storeRequester. The non-test
// implementation of storeRequester is StoreWorkQueue. StoreWorkQueue updates
// all of these when StoreWorkQueue.AdmittedWorkDone is called, so that these
Expand Down
6 changes: 4 additions & 2 deletions pkg/util/admission/admissionpb/admission_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import "gogoproto/gogo.proto";
message AdmissionWorkQueueStats {
// Duration spent waiting.
int64 wait_duration_nanos = 1 [(gogoproto.casttype) = "time.Duration"];
// String representation of admission work kind.
string work_kind = 2;
// String representation of admission queue kind.
string queue_kind = 2;
// Set to true if deadline was exceeded.
bool deadline_exceeded = 3;
// String representation of work priority.
string work_priority = 4;
}
2 changes: 1 addition & 1 deletion pkg/util/admission/grant_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ func makeElasticGrantCoordinator(
schedulerLatencyListener := newSchedulerLatencyListener(ambientCtx, st, schedulerLatencyListenerMetrics, elasticCPUGranter)

elasticCPUInternalWorkQueue := &WorkQueue{}
initWorkQueue(elasticCPUInternalWorkQueue, ambientCtx, KVWork, elasticCPUGranter, st,
initWorkQueue(elasticCPUInternalWorkQueue, ambientCtx, KVWork, "kv-elastic-cpu-queue", elasticCPUGranter, st,
elasticWorkQueueMetrics,
workQueueOptions{usesTokens: true}, nil /* knobs */) // will be closed by the embedding *ElasticCPUWorkQueue
elasticCPUWorkQueue := makeElasticCPUWorkQueue(st, elasticCPUInternalWorkQueue, elasticCPUGranter, elasticCPUGranterMetrics)
Expand Down
42 changes: 31 additions & 11 deletions pkg/util/admission/work_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ func (r LogPosition) Less(o LogPosition) bool {
type WorkQueue struct {
ambientCtx context.Context
workKind WorkKind
queueKind QueueKind
granter granter
usesTokens bool
tiedToRange bool
Expand Down Expand Up @@ -364,14 +365,19 @@ func makeWorkQueue(
opts workQueueOptions,
) requester {
q := &WorkQueue{}
initWorkQueue(q, ambientCtx, workKind, granter, settings, metrics, opts, nil)
var queueKind QueueKind
if workKind == KVWork {
queueKind = "kv-regular-cpu-queue"
}
initWorkQueue(q, ambientCtx, workKind, queueKind, granter, settings, metrics, opts, nil)
return q
}

func initWorkQueue(
q *WorkQueue,
ambientCtx log.AmbientContext,
workKind WorkKind,
queueKind QueueKind,
granter granter,
settings *cluster.Settings,
metrics *WorkQueueMetrics,
Expand All @@ -388,8 +394,13 @@ func initWorkQueue(
timeSource = timeutil.DefaultTimeSource{}
}

if queueKind == "" {
queueKind = QueueKind(workKind.String())
}

q.ambientCtx = ambientCtx.AnnotateCtx(context.Background())
q.workKind = workKind
q.queueKind = queueKind
q.granter = granter
q.usesTokens = opts.usesTokens
q.tiedToRange = opts.tiedToRange
Expand Down Expand Up @@ -803,12 +814,11 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err
q.metrics.incErrored(info.Priority)
q.metrics.recordFinishWait(info.Priority, waitDur)
deadline, _ := ctx.Deadline()
recordAdmissionWorkQueueStats(span, waitDur, q.workKind, true)
log.Eventf(ctx, "deadline expired, waited in %s queue for %v",
q.workKind, waitDur)
recordAdmissionWorkQueueStats(span, waitDur, q.queueKind, info.Priority, true)
log.Eventf(ctx, "deadline expired, waited in %s queue with pri %s for %v", q.queueKind, admissionpb.WorkPriorityDict[info.Priority], waitDur)
return true,
errors.Newf("work %s deadline expired while waiting: deadline: %v, start: %v, dur: %v",
q.workKind, deadline, startTime, waitDur)
errors.Newf("deadline expired while waiting in queue: %s, pri: %s, deadline: %v, start: %v, dur: %v",
q.queueKind, admissionpb.WorkPriorityDict[info.Priority], deadline, startTime, waitDur)
case chainID, ok := <-work.ch:
if !ok {
panic(errors.AssertionFailedf("channel should not be closed"))
Expand All @@ -819,23 +829,27 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err
if work.heapIndex != -1 {
panic(errors.AssertionFailedf("grantee should be removed from heap"))
}
recordAdmissionWorkQueueStats(span, waitDur, q.workKind, false)
log.Eventf(ctx, "admitted, waited in %s queue for %v", q.workKind, waitDur)
recordAdmissionWorkQueueStats(span, waitDur, q.queueKind, info.Priority, false)
q.granter.continueGrantChain(chainID)
return true, nil
}
}

func recordAdmissionWorkQueueStats(
span *tracing.Span, waitDur time.Duration, workKind WorkKind, deadlineExceeded bool,
span *tracing.Span,
waitDur time.Duration,
queueKind QueueKind,
workPriority admissionpb.WorkPriority,
deadlineExceeded bool,
) {
if span == nil {
return
}
span.RecordStructured(&admissionpb.AdmissionWorkQueueStats{
WaitDurationNanos: waitDur,
WorkKind: workKind.String(),
QueueKind: string(queueKind),
DeadlineExceeded: deadlineExceeded,
WorkPriority: admissionpb.WorkPriorityDict[workPriority],
})
}

Expand Down Expand Up @@ -2205,7 +2219,13 @@ func makeStoreWorkQueue(

opts.usesAsyncAdmit = true
for i := range q.q {
initWorkQueue(&q.q[i], ambientCtx, KVWork, granters[i], settings, metrics, opts, knobs)
var queueKind QueueKind
if i == int(admissionpb.RegularWorkClass) {
queueKind = "kv-regular-store-queue"
} else if i == int(admissionpb.ElasticWorkClass) {
queueKind = "kv-elastic-store-queue"
}
initWorkQueue(&q.q[i], ambientCtx, KVWork, queueKind, granters[i], settings, metrics, opts, knobs)
q.q[i].onAdmittedReplicatedWork = q
}
// Arbitrary initial value. This will be replaced before any meaningful
Expand Down