Skip to content

Commit

Permalink
Merge pull request #5900 from hashicorp/f-system-sched-blocked-evals
Browse files Browse the repository at this point in the history
System scheduler blocked evals
  • Loading branch information
langmartin authored Jul 18, 2019
2 parents c7ab4a6 + e0edd11 commit 5f44755
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 127 deletions.
41 changes: 41 additions & 0 deletions nomad/blocked_evals.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ type BlockedEvals struct {
// classes.
escaped map[string]wrappedEval

// system is the set of system evaluations that failed to start on nodes because of
// resource constraints.
system *systemEvals

// unblockCh is used to buffer unblocking of evaluations.
capacityChangeCh chan *capacityUpdate

Expand Down Expand Up @@ -113,6 +117,7 @@ func NewBlockedEvals(evalBroker *EvalBroker, logger log.Logger) *BlockedEvals {
evalBroker: evalBroker,
captured: make(map[string]wrappedEval),
escaped: make(map[string]wrappedEval),
system: newSystemEvals(),
jobs: make(map[structs.NamespacedID]string),
unblockIndexes: make(map[string]uint64),
capacityChangeCh: make(chan *capacityUpdate, unblockBuffer),
Expand Down Expand Up @@ -227,6 +232,12 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) {
return
}

// System evals are indexed by node and re-processed on utilization changes in
// existing nodes
if eval.Type == structs.JobTypeSystem {
b.system.Add(eval, token)
}

// Add the eval to the set of blocked evals whose jobs constraints are
// captured by computed node class.
b.captured[eval.ID] = wrapped
Expand Down Expand Up @@ -365,6 +376,14 @@ func (b *BlockedEvals) Untrack(jobID, namespace string) {

nsID := structs.NewNamespacedID(jobID, namespace)

if evals, ok := b.system.JobEvals(nsID); ok {
for _, e := range evals {
b.system.Remove(e)
b.stats.TotalBlocked--
}
return
}

// Get the evaluation ID to cancel
evalID, ok := b.jobs[nsID]
if !ok {
Expand Down Expand Up @@ -477,6 +496,27 @@ func (b *BlockedEvals) UnblockClassAndQuota(class, quota string, index uint64) {
}
}

// UnblockNode finds any blocked evalution that's node specific (system jobs) and enqueues
// it on the eval broker
func (b *BlockedEvals) UnblockNode(nodeID string, index uint64) {
b.l.Lock()
defer b.l.Unlock()

evals, ok := b.system.NodeEvals(nodeID)

// Do nothing if not enabled
if !b.enabled || !ok || len(evals) == 0 {
return
}

for e := range evals {
b.system.Remove(e)
b.stats.TotalBlocked--
}

b.evalBroker.EnqueueAll(evals)
}

// watchCapacity is a long lived function that watches for capacity changes in
// nodes and unblocks the correct set of evals.
func (b *BlockedEvals) watchCapacity(stopCh <-chan struct{}, changeCh <-chan *capacityUpdate) {
Expand Down Expand Up @@ -652,6 +692,7 @@ func (b *BlockedEvals) Flush() {
b.capacityChangeCh = make(chan *capacityUpdate, unblockBuffer)
b.stopCh = make(chan struct{})
b.duplicateCh = make(chan struct{}, 1)
b.system = newSystemEvals()
}

// Stats is used to query the state of the blocked eval tracker.
Expand Down
94 changes: 94 additions & 0 deletions nomad/blocked_evals_system.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package nomad

import "github.com/hashicorp/nomad/nomad/structs"

// systemEvals are handled specially, each job may have a blocked eval on each node
type systemEvals struct {
// byJob maps a jobID to a nodeID to that job's single blocked evalID on that node
byJob map[structs.NamespacedID]map[string]string

// byNode maps a nodeID to a set of evalIDs
byNode map[string]map[string]bool

// evals maps evalIDs to an eval and token
evals map[string]*wrappedEval
}

func newSystemEvals() *systemEvals {
return &systemEvals{
evals: map[string]*wrappedEval{},
byJob: map[structs.NamespacedID]map[string]string{},
byNode: map[string]map[string]bool{},
}
}

func (s *systemEvals) Add(eval *structs.Evaluation, token string) {
// store the eval by node id
if _, ok := s.byNode[eval.NodeID]; !ok {
s.byNode[eval.NodeID] = make(map[string]bool)
}

s.byNode[eval.NodeID][eval.ID] = true
s.evals[eval.ID] = &wrappedEval{eval: eval, token: token}

// link the job to the node for cleanup
jobID := structs.NewNamespacedID(eval.JobID, eval.Namespace)
if _, ok := s.byJob[jobID]; !ok {
s.byJob[jobID] = make(map[string]string)
}

// if we're displacing the old blocked id for this job+node, delete it first
if prevID, ok := s.byJob[jobID][eval.NodeID]; ok {
prev, _ := s.Get(prevID)
s.Remove(prev.eval)
}

// set this eval as the new eval for this job on this node
s.byJob[jobID][eval.NodeID] = eval.ID
}

func (s *systemEvals) Get(evalID string) (*wrappedEval, bool) {
w, ok := s.evals[evalID]
return w, ok
}

func (s *systemEvals) Remove(eval *structs.Evaluation) {
// delete the job index if this eval is the currently listed blocked eval
jobID := structs.NewNamespacedID(eval.JobID, eval.Namespace)
e, ok := s.byJob[jobID][eval.NodeID]
if ok && e == eval.ID {
delete(s.byJob[jobID], eval.NodeID)
}

// delete this eval from the node index, and then the map for this node if empty
delete(s.byNode[eval.NodeID], eval.ID)
if len(s.byNode[eval.NodeID]) == 0 {
delete(s.byNode, eval.NodeID)
}

// delete the eval itself
delete(s.evals, eval.ID)
}

func (s *systemEvals) NodeEvals(nodeID string) (map[*structs.Evaluation]string, bool) {
out := map[*structs.Evaluation]string{}
for eID := range s.byNode[nodeID] {
if w, ok := s.Get(eID); ok {
out[w.eval] = w.token
}
}

ok := len(out) > 0
return out, ok
}

func (s *systemEvals) JobEvals(jobID structs.NamespacedID) ([]*structs.Evaluation, bool) {
out := []*structs.Evaluation{}
_, ok := s.byJob[jobID]
for _, eID := range s.byJob[jobID] {
if e, ok := s.Get(eID); ok {
out = append(out, e.eval)
}
}
return out, ok
}
Loading

0 comments on commit 5f44755

Please sign in to comment.