Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

System scheduler blocked evals #5900

Merged
merged 7 commits into from
Jul 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions nomad/blocked_evals.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ type BlockedEvals struct {
// classes.
escaped map[string]wrappedEval

// system is the set of system evaluations that failed to start on nodes because of
// resource constraints.
system *systemEvals

// unblockCh is used to buffer unblocking of evaluations.
capacityChangeCh chan *capacityUpdate

Expand Down Expand Up @@ -113,6 +117,7 @@ func NewBlockedEvals(evalBroker *EvalBroker, logger log.Logger) *BlockedEvals {
evalBroker: evalBroker,
captured: make(map[string]wrappedEval),
escaped: make(map[string]wrappedEval),
system: newSystemEvals(),
jobs: make(map[structs.NamespacedID]string),
unblockIndexes: make(map[string]uint64),
capacityChangeCh: make(chan *capacityUpdate, unblockBuffer),
Expand Down Expand Up @@ -227,6 +232,12 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) {
return
}

// System evals are indexed by node and re-processed on utilization changes in
// existing nodes
if eval.Type == structs.JobTypeSystem {
b.system.Add(eval, token)
}

// Add the eval to the set of blocked evals whose jobs constraints are
// captured by computed node class.
b.captured[eval.ID] = wrapped
Expand Down Expand Up @@ -365,6 +376,14 @@ func (b *BlockedEvals) Untrack(jobID, namespace string) {

nsID := structs.NewNamespacedID(jobID, namespace)

if evals, ok := b.system.JobEvals(nsID); ok {
for _, e := range evals {
b.system.Remove(e)
b.stats.TotalBlocked--
}
return
}
notnoop marked this conversation as resolved.
Show resolved Hide resolved

// Get the evaluation ID to cancel
evalID, ok := b.jobs[nsID]
if !ok {
Expand Down Expand Up @@ -477,6 +496,27 @@ func (b *BlockedEvals) UnblockClassAndQuota(class, quota string, index uint64) {
}
}

// UnblockNode finds any blocked evalution that's node specific (system jobs) and enqueues
// it on the eval broker
func (b *BlockedEvals) UnblockNode(nodeID string, index uint64) {
b.l.Lock()
defer b.l.Unlock()

evals, ok := b.system.NodeEvals(nodeID)

// Do nothing if not enabled
if !b.enabled || !ok || len(evals) == 0 {
return
}

for e := range evals {
b.system.Remove(e)
b.stats.TotalBlocked--
}

b.evalBroker.EnqueueAll(evals)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll need to clear out the system evals from in memory state in the flush method. That gets called when the eval broker is disabled (when the node loses leadership).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add a test to eval_broker_test.go where some blocked evals are in the state map and when SetDisabled is called it should no longer be in memory.

// watchCapacity is a long lived function that watches for capacity changes in
// nodes and unblocks the correct set of evals.
func (b *BlockedEvals) watchCapacity(stopCh <-chan struct{}, changeCh <-chan *capacityUpdate) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a way to persist these such that a leadership transition causes the system evals to live on the new leader node.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the blocked evals are created through the fsm, so when a new leader is elected, they will be locally replayed into memory via leader.go nomad.restoreEvals

Expand Down Expand Up @@ -652,6 +692,7 @@ func (b *BlockedEvals) Flush() {
b.capacityChangeCh = make(chan *capacityUpdate, unblockBuffer)
b.stopCh = make(chan struct{})
b.duplicateCh = make(chan struct{}, 1)
b.system = newSystemEvals()
}

// Stats is used to query the state of the blocked eval tracker.
Expand Down
94 changes: 94 additions & 0 deletions nomad/blocked_evals_system.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package nomad

import "github.com/hashicorp/nomad/nomad/structs"

// systemEvals are handled specially, each job may have a blocked eval on each node
type systemEvals struct {
// byJob maps a jobID to a nodeID to that job's single blocked evalID on that node
byJob map[structs.NamespacedID]map[string]string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More of a question: Can a system job have multiple TaskGroups - would they always be associated with the same eval id?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the eval is always at the job level but if the system job has multiple task groups the scheduler will create multiple allocations. We always evaluate every task group in the job in the reconciler.


// byNode maps a nodeID to a set of evalIDs
byNode map[string]map[string]bool

// evals maps evalIDs to an eval and token
evals map[string]*wrappedEval
}

func newSystemEvals() *systemEvals {
return &systemEvals{
evals: map[string]*wrappedEval{},
byJob: map[structs.NamespacedID]map[string]string{},
byNode: map[string]map[string]bool{},
}
}

func (s *systemEvals) Add(eval *structs.Evaluation, token string) {
// store the eval by node id
if _, ok := s.byNode[eval.NodeID]; !ok {
s.byNode[eval.NodeID] = make(map[string]bool)
}

s.byNode[eval.NodeID][eval.ID] = true
s.evals[eval.ID] = &wrappedEval{eval: eval, token: token}

// link the job to the node for cleanup
jobID := structs.NewNamespacedID(eval.JobID, eval.Namespace)
if _, ok := s.byJob[jobID]; !ok {
s.byJob[jobID] = make(map[string]string)
}

// if we're displacing the old blocked id for this job+node, delete it first
if prevID, ok := s.byJob[jobID][eval.NodeID]; ok {
prev, _ := s.Get(prevID)
s.Remove(prev.eval)
}

// set this eval as the new eval for this job on this node
s.byJob[jobID][eval.NodeID] = eval.ID
}

func (s *systemEvals) Get(evalID string) (*wrappedEval, bool) {
w, ok := s.evals[evalID]
return w, ok
}

func (s *systemEvals) Remove(eval *structs.Evaluation) {
// delete the job index if this eval is the currently listed blocked eval
jobID := structs.NewNamespacedID(eval.JobID, eval.Namespace)
e, ok := s.byJob[jobID][eval.NodeID]
if ok && e == eval.ID {
delete(s.byJob[jobID], eval.NodeID)
}

// delete this eval from the node index, and then the map for this node if empty
delete(s.byNode[eval.NodeID], eval.ID)
if len(s.byNode[eval.NodeID]) == 0 {
delete(s.byNode, eval.NodeID)
}

// delete the eval itself
delete(s.evals, eval.ID)
}

func (s *systemEvals) NodeEvals(nodeID string) (map[*structs.Evaluation]string, bool) {
out := map[*structs.Evaluation]string{}
for eID := range s.byNode[nodeID] {
if w, ok := s.Get(eID); ok {
out[w.eval] = w.token
}
}

ok := len(out) > 0
return out, ok
}

func (s *systemEvals) JobEvals(jobID structs.NamespacedID) ([]*structs.Evaluation, bool) {
out := []*structs.Evaluation{}
_, ok := s.byJob[jobID]
for _, eID := range s.byJob[jobID] {
if e, ok := s.Get(eID); ok {
out = append(out, e.eval)
}
}
return out, ok
}
Loading