Skip to content

Commit

Permalink
eval_broker: track enqueue and dequeue times (#20329)
Browse files Browse the repository at this point in the history
Adds new metrics to the eval broker that track times of evaluations enqueueing
and dequeueing.
  • Loading branch information
pkazmierczak authored Apr 15, 2024
1 parent 1739f94 commit 0d14dd9
Show file tree
Hide file tree
Showing 4 changed files with 340 additions and 249 deletions.
3 changes: 3 additions & 0 deletions .changelog/20329.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
metrics: Added tracking of enqueue and dequeue times of evaluations to the broker
```
120 changes: 101 additions & 19 deletions nomad/eval_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

metrics "github.com/armon/go-metrics"

"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/broker"
"github.com/hashicorp/nomad/helper/uuid"
Expand Down Expand Up @@ -112,6 +113,11 @@ type EvalBroker struct {
// compounding after the first Nack.
subsequentNackDelay time.Duration

// enqueued and dequeuedTime store the time an evaluation was enqueued and
// dequeued, and are used as metrics
enqueuedTime map[string]time.Time
dequeuedTime map[string]time.Time

l sync.RWMutex
}

Expand Down Expand Up @@ -158,6 +164,8 @@ func NewEvalBroker(ctx context.Context, timeout, initialNackDelay, subsequentNac
timeWait: make(map[string]*time.Timer),
initialNackDelay: initialNackDelay,
subsequentNackDelay: subsequentNackDelay,
enqueuedTime: make(map[string]time.Time),
dequeuedTime: make(map[string]time.Time),
delayHeap: delayheap.NewDelayHeap(),
delayedEvalsUpdateCh: make(chan struct{}, 1),
}
Expand Down Expand Up @@ -191,6 +199,11 @@ func (b *EvalBroker) SetEnabled(enabled bool) {

if !enabled {
b.flush()
} else {
// if we're the leader, allocate some memory for the enqueuedTime and
// dequeuedTime maps
b.enqueuedTime = make(map[string]time.Time, 256)
b.dequeuedTime = make(map[string]time.Time, 256)
}

// Notify all subscribers to state changes of the broker enabled value.
Expand All @@ -201,7 +214,16 @@ func (b *EvalBroker) SetEnabled(enabled bool) {
func (b *EvalBroker) Enqueue(eval *structs.Evaluation) {
b.l.Lock()
defer b.l.Unlock()
b.processEnqueue(eval, "")
b.processEnqueue(eval, "", true)
}

// Restore is used to restore an evaluation that was previously enqueued. It
// works like enqueue exceot that it does not track enqueueTime of the restored
// evaluation.
func (b *EvalBroker) Restore(eval *structs.Evaluation) {
b.l.Lock()
defer b.l.Unlock()
b.processEnqueue(eval, "", false)
}

// EnqueueAll is used to enqueue many evaluations. The map allows evaluations
Expand All @@ -221,15 +243,15 @@ func (b *EvalBroker) EnqueueAll(evals map[*structs.Evaluation]string) {
b.l.Lock()
defer b.l.Unlock()
for eval, token := range evals {
b.processEnqueue(eval, token)
b.processEnqueue(eval, token, true)
}
}

// processEnqueue deduplicates evals and either enqueue immediately or enforce
// the evals wait time. If the token is passed, and the evaluation ID is
// outstanding, the evaluation is blocked until an Ack/Nack is received.
// processEnqueue must be called with the lock held.
func (b *EvalBroker) processEnqueue(eval *structs.Evaluation, token string) {
func (b *EvalBroker) processEnqueue(eval *structs.Evaluation, token string, trackTime bool) {
// If we're not enabled, don't enable more queuing.
if !b.enabled {
return
Expand All @@ -254,7 +276,7 @@ func (b *EvalBroker) processEnqueue(eval *structs.Evaluation, token string) {

// Check if we need to enforce a wait
if eval.Wait > 0 {
b.processWaitingEnqueue(eval)
b.processWaitingEnqueue(eval, trackTime)
return
}

Expand All @@ -270,32 +292,32 @@ func (b *EvalBroker) processEnqueue(eval *structs.Evaluation, token string) {
return
}

b.enqueueLocked(eval, eval.Type)
b.enqueueLocked(eval, eval.Type, trackTime)
}

// processWaitingEnqueue waits the given duration on the evaluation before
// enqueuing.
func (b *EvalBroker) processWaitingEnqueue(eval *structs.Evaluation) {
func (b *EvalBroker) processWaitingEnqueue(eval *structs.Evaluation, trackTime bool) {
timer := time.AfterFunc(eval.Wait, func() {
b.enqueueWaiting(eval)
b.enqueueWaiting(eval, trackTime)
})
b.timeWait[eval.ID] = timer
b.stats.TotalWaiting += 1
}

// enqueueWaiting is used to enqueue a waiting evaluation
func (b *EvalBroker) enqueueWaiting(eval *structs.Evaluation) {
func (b *EvalBroker) enqueueWaiting(eval *structs.Evaluation, trackTime bool) {
b.l.Lock()
defer b.l.Unlock()

delete(b.timeWait, eval.ID)
b.stats.TotalWaiting -= 1

b.enqueueLocked(eval, eval.Type)
b.enqueueLocked(eval, eval.Type, trackTime)
}

// enqueueLocked is used to enqueue with the lock held
func (b *EvalBroker) enqueueLocked(eval *structs.Evaluation, sched string) {
func (b *EvalBroker) enqueueLocked(eval *structs.Evaluation, sched string, trackTime bool) {
// Do nothing if not enabled
if !b.enabled {
return
Expand All @@ -307,6 +329,15 @@ func (b *EvalBroker) enqueueLocked(eval *structs.Evaluation, sched string) {
Namespace: eval.Namespace,
}
readyEval := b.jobEvals[namespacedID]

// store when the eval was enqueued before early return, so that we capture
// the "pending" queue time, too
//
// we only store the first 10k enqueued times to avoid memory exhaustion
if len(b.enqueuedTime) < 10_000 && trackTime {
b.enqueuedTime[eval.ID] = time.Now()
}

if readyEval == "" {
b.jobEvals[namespacedID] = eval.ID
} else if readyEval != eval.ID {
Expand Down Expand Up @@ -346,7 +377,7 @@ func (b *EvalBroker) enqueueLocked(eval *structs.Evaluation, sched string) {
}
}

// Dequeue is used to perform a blocking dequeue. The next available evalution
// Dequeue is used to perform a blocking dequeue. The next available evaluation
// is returned as well as a unique token identifier for this dequeue. The token
// changes on leadership election to ensure a Dequeue prior to a leadership
// election cannot conflict with a Dequeue of the same evaluation after a
Expand All @@ -369,6 +400,19 @@ SCAN:
if timeoutTimer != nil {
timeoutTimer.Stop()
}
b.l.Lock()
if t, ok := b.enqueuedTime[eval.ID]; ok {
if len(b.dequeuedTime) < 10_000 {
b.dequeuedTime[eval.ID] = time.Now()
}
metrics.MeasureSinceWithLabels([]string{"nomad", "broker", "wait_time"}, t, []metrics.Label{
{Name: "job", Value: eval.JobID},
{Name: "namespace", Value: eval.Namespace},
{Name: "eval_type", Value: eval.Type},
{Name: "triggered_by", Value: eval.TriggeredBy},
})
}
b.l.Unlock()
return eval, token, nil
}

Expand Down Expand Up @@ -570,6 +614,8 @@ func (b *EvalBroker) Ack(evalID, token string) error {
}
jobID := unack.Eval.JobID

defer b.handleAckNackLocked(unack.Eval)

// Ensure we were able to stop the timer
if !unack.NackTimer.Stop() {
return fmt.Errorf("Evaluation ID Ack'd after Nack timer expiration")
Expand Down Expand Up @@ -609,7 +655,7 @@ func (b *EvalBroker) Ack(evalID, token string) error {
raw := heap.Pop(&pending)
eval := raw.(*structs.Evaluation)
b.stats.TotalPending -= 1
b.enqueueLocked(eval, eval.Type)
b.enqueueLocked(eval, eval.Type, true)
}

// Clean up if there are no more after that
Expand All @@ -622,7 +668,7 @@ func (b *EvalBroker) Ack(evalID, token string) error {

// Re-enqueue the evaluation.
if eval, ok := b.requeue[token]; ok {
b.processEnqueue(eval, "")
b.processEnqueue(eval, "", true)
}

return nil
Expand All @@ -645,6 +691,7 @@ func (b *EvalBroker) Nack(evalID, token string) error {
if unack.Token != token {
return fmt.Errorf("Token does not match for Evaluation ID")
}
defer b.handleAckNackLocked(unack.Eval)

// Stop the timer, doesn't matter if we've missed it
unack.NackTimer.Stop()
Expand All @@ -660,16 +707,16 @@ func (b *EvalBroker) Nack(evalID, token string) error {
// Check if we've hit the delivery limit, and re-enqueue
// in the failedQueue
if dequeues := b.evals[evalID]; dequeues >= b.deliveryLimit {
b.enqueueLocked(unack.Eval, failedQueue)
b.enqueueLocked(unack.Eval, failedQueue, true)
} else {
e := unack.Eval
e.Wait = b.nackReenqueueDelay(e, dequeues)
e.Wait = b.nackReenqueueDelay(dequeues)

// See if there should be a delay before re-enqueuing
if e.Wait > 0 {
b.processWaitingEnqueue(e)
b.processWaitingEnqueue(e, true)
} else {
b.enqueueLocked(e, e.Type)
b.enqueueLocked(e, e.Type, true)
}
}

Expand All @@ -678,7 +725,7 @@ func (b *EvalBroker) Nack(evalID, token string) error {

// nackReenqueueDelay is used to determine the delay that should be applied on
// the evaluation given the number of previous attempts
func (b *EvalBroker) nackReenqueueDelay(eval *structs.Evaluation, prevDequeues int) time.Duration {
func (b *EvalBroker) nackReenqueueDelay(prevDequeues int) time.Duration {
switch {
case prevDequeues <= 0:
return 0
Expand Down Expand Up @@ -724,6 +771,39 @@ func (b *EvalBroker) ResumeNackTimeout(evalID, token string) error {
return nil
}

func (b *EvalBroker) handleAckNackLocked(eval *structs.Evaluation) {
if eval == nil {
return
}

tEnq, ok := b.enqueuedTime[eval.ID]
if !ok {
delete(b.dequeuedTime, eval.ID)
return
}

tDeq, ok := b.dequeuedTime[eval.ID]
if !ok {
delete(b.enqueuedTime, eval.ID)
return
}

metrics.MeasureSinceWithLabels([]string{"nomad", "broker", "process_time"}, tDeq, []metrics.Label{
{Name: "job", Value: eval.JobID},
{Name: "namespace", Value: eval.Namespace},
{Name: "eval_type", Value: eval.Type},
{Name: "triggered_by", Value: eval.TriggeredBy},
})
metrics.MeasureSinceWithLabels([]string{"nomad", "broker", "response_time"}, tEnq, []metrics.Label{
{Name: "job", Value: eval.JobID},
{Name: "namespace", Value: eval.Namespace},
{Name: "eval_type", Value: eval.Type},
{Name: "triggered_by", Value: eval.TriggeredBy},
})
delete(b.enqueuedTime, eval.ID)
delete(b.dequeuedTime, eval.ID)
}

// Flush is used to clear the state of the broker. It must be called from within
// the lock.
func (b *EvalBroker) flush() {
Expand Down Expand Up @@ -767,6 +847,8 @@ func (b *EvalBroker) flush() {
b.unack = make(map[string]*unackEval)
b.timeWait = make(map[string]*time.Timer)
b.delayHeap = delayheap.NewDelayHeap()
b.enqueuedTime = make(map[string]time.Time)
b.dequeuedTime = make(map[string]time.Time)
}

// evalWrapper satisfies the HeapNode interface
Expand Down Expand Up @@ -814,7 +896,7 @@ func (b *EvalBroker) runDelayedEvalsWatcher(ctx context.Context, updateCh <-chan
b.delayHeap.Remove(&evalWrapper{eval})
b.stats.TotalWaiting -= 1
delete(b.stats.DelayedEvals, eval.ID)
b.enqueueLocked(eval, eval.Type)
b.enqueueLocked(eval, eval.Type, true)
b.l.Unlock()
case <-updateCh:
continue
Expand Down
2 changes: 1 addition & 1 deletion nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ func (s *Server) restoreEvals() error {
eval := raw.(*structs.Evaluation)

if eval.ShouldEnqueue() {
s.evalBroker.Enqueue(eval)
s.evalBroker.Restore(eval)
} else if eval.ShouldBlock() {
s.blockedEvals.Block(eval)
}
Expand Down
Loading

0 comments on commit 0d14dd9

Please sign in to comment.