diff --git a/nomad/blocked_evals.go b/nomad/blocked_evals.go index ed6cccb8659..eba0a9f5045 100644 --- a/nomad/blocked_evals.go +++ b/nomad/blocked_evals.go @@ -47,6 +47,10 @@ type BlockedEvals struct { // classes. escaped map[string]wrappedEval + // system is the set of system evaluations that failed to start on nodes because of + // resource constraints. + system *systemEvals + // unblockCh is used to buffer unblocking of evaluations. capacityChangeCh chan *capacityUpdate @@ -113,6 +117,7 @@ func NewBlockedEvals(evalBroker *EvalBroker, logger log.Logger) *BlockedEvals { evalBroker: evalBroker, captured: make(map[string]wrappedEval), escaped: make(map[string]wrappedEval), + system: newSystemEvals(), jobs: make(map[structs.NamespacedID]string), unblockIndexes: make(map[string]uint64), capacityChangeCh: make(chan *capacityUpdate, unblockBuffer), @@ -227,6 +232,12 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) { return } + // System evals are indexed by node and re-processed on utilization changes in + // existing nodes + if eval.Type == structs.JobTypeSystem { + b.system.Add(eval, token) + } + // Add the eval to the set of blocked evals whose jobs constraints are // captured by computed node class. b.captured[eval.ID] = wrapped @@ -365,6 +376,14 @@ func (b *BlockedEvals) Untrack(jobID, namespace string) { nsID := structs.NewNamespacedID(jobID, namespace) + if evals, ok := b.system.JobEvals(nsID); ok { + for _, e := range evals { + b.system.Remove(e) + b.stats.TotalBlocked-- + } + return + } + // Get the evaluation ID to cancel evalID, ok := b.jobs[nsID] if !ok { @@ -477,6 +496,27 @@ func (b *BlockedEvals) UnblockClassAndQuota(class, quota string, index uint64) { } } +// UnblockNode finds any blocked evalution that's node specific (system jobs) and enqueues +// it on the eval broker +func (b *BlockedEvals) UnblockNode(nodeID string, index uint64) { + b.l.Lock() + defer b.l.Unlock() + + evals, ok := b.system.NodeEvals(nodeID) + + // Do nothing if not enabled + if !b.enabled || !ok || len(evals) == 0 { + return + } + + for e := range evals { + b.system.Remove(e) + b.stats.TotalBlocked-- + } + + b.evalBroker.EnqueueAll(evals) +} + // watchCapacity is a long lived function that watches for capacity changes in // nodes and unblocks the correct set of evals. func (b *BlockedEvals) watchCapacity(stopCh <-chan struct{}, changeCh <-chan *capacityUpdate) { @@ -652,6 +692,7 @@ func (b *BlockedEvals) Flush() { b.capacityChangeCh = make(chan *capacityUpdate, unblockBuffer) b.stopCh = make(chan struct{}) b.duplicateCh = make(chan struct{}, 1) + b.system = newSystemEvals() } // Stats is used to query the state of the blocked eval tracker. diff --git a/nomad/blocked_evals_system.go b/nomad/blocked_evals_system.go new file mode 100644 index 00000000000..5fae445954b --- /dev/null +++ b/nomad/blocked_evals_system.go @@ -0,0 +1,94 @@ +package nomad + +import "github.com/hashicorp/nomad/nomad/structs" + +// systemEvals are handled specially, each job may have a blocked eval on each node +type systemEvals struct { + // byJob maps a jobID to a nodeID to that job's single blocked evalID on that node + byJob map[structs.NamespacedID]map[string]string + + // byNode maps a nodeID to a set of evalIDs + byNode map[string]map[string]bool + + // evals maps evalIDs to an eval and token + evals map[string]*wrappedEval +} + +func newSystemEvals() *systemEvals { + return &systemEvals{ + evals: map[string]*wrappedEval{}, + byJob: map[structs.NamespacedID]map[string]string{}, + byNode: map[string]map[string]bool{}, + } +} + +func (s *systemEvals) Add(eval *structs.Evaluation, token string) { + // store the eval by node id + if _, ok := s.byNode[eval.NodeID]; !ok { + s.byNode[eval.NodeID] = make(map[string]bool) + } + + s.byNode[eval.NodeID][eval.ID] = true + s.evals[eval.ID] = &wrappedEval{eval: eval, token: token} + + // link the job to the node for cleanup + jobID := structs.NewNamespacedID(eval.JobID, eval.Namespace) + if _, ok := s.byJob[jobID]; !ok { + s.byJob[jobID] = make(map[string]string) + } + + // if we're displacing the old blocked id for this job+node, delete it first + if prevID, ok := s.byJob[jobID][eval.NodeID]; ok { + prev, _ := s.Get(prevID) + s.Remove(prev.eval) + } + + // set this eval as the new eval for this job on this node + s.byJob[jobID][eval.NodeID] = eval.ID +} + +func (s *systemEvals) Get(evalID string) (*wrappedEval, bool) { + w, ok := s.evals[evalID] + return w, ok +} + +func (s *systemEvals) Remove(eval *structs.Evaluation) { + // delete the job index if this eval is the currently listed blocked eval + jobID := structs.NewNamespacedID(eval.JobID, eval.Namespace) + e, ok := s.byJob[jobID][eval.NodeID] + if ok && e == eval.ID { + delete(s.byJob[jobID], eval.NodeID) + } + + // delete this eval from the node index, and then the map for this node if empty + delete(s.byNode[eval.NodeID], eval.ID) + if len(s.byNode[eval.NodeID]) == 0 { + delete(s.byNode, eval.NodeID) + } + + // delete the eval itself + delete(s.evals, eval.ID) +} + +func (s *systemEvals) NodeEvals(nodeID string) (map[*structs.Evaluation]string, bool) { + out := map[*structs.Evaluation]string{} + for eID := range s.byNode[nodeID] { + if w, ok := s.Get(eID); ok { + out[w.eval] = w.token + } + } + + ok := len(out) > 0 + return out, ok +} + +func (s *systemEvals) JobEvals(jobID structs.NamespacedID) ([]*structs.Evaluation, bool) { + out := []*structs.Evaluation{} + _, ok := s.byJob[jobID] + for _, eID := range s.byJob[jobID] { + if e, ok := s.Get(eID); ok { + out = append(out, e.eval) + } + } + return out, ok +} diff --git a/nomad/blocked_evals_test.go b/nomad/blocked_evals_test.go index 8fe7db32f06..f5c2961f8bf 100644 --- a/nomad/blocked_evals_test.go +++ b/nomad/blocked_evals_test.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/require" ) func testBlockedEvals(t *testing.T) (*BlockedEvals, *EvalBroker) { @@ -174,18 +175,21 @@ func TestBlockedEvals_UnblockEscaped(t *testing.T) { } blocked.Unblock("v1:123", 1000) + requireBlockedEvalsEnqueued(t, blocked, broker, 1) +} +func requireBlockedEvalsEnqueued(t *testing.T, blocked *BlockedEvals, broker *EvalBroker, enqueued int) { testutil.WaitForResult(func() (bool, error) { // Verify Unblock caused an enqueue brokerStats := broker.Stats() - if brokerStats.TotalReady != 1 { - return false, fmt.Errorf("bad: %#v", brokerStats) + if brokerStats.TotalReady != enqueued { + return false, fmt.Errorf("missing enqueued evals: %#v", brokerStats) } // Verify Unblock updates the stats bStats := blocked.Stats() if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 { - return false, fmt.Errorf("bad: %#v", bStats) + return false, fmt.Errorf("evals still blocked: %#v", bStats) } return true, nil }, func(err error) { @@ -211,23 +215,7 @@ func TestBlockedEvals_UnblockEligible(t *testing.T) { } blocked.Unblock("v1:123", 1000) - - testutil.WaitForResult(func() (bool, error) { - // Verify Unblock caused an enqueue - brokerStats := broker.Stats() - if brokerStats.TotalReady != 1 { - return false, fmt.Errorf("bad: %#v", brokerStats) - } - - // Verify Unblock updates the stats - bStats := blocked.Stats() - if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 { - return false, fmt.Errorf("bad: %#v", bStats) - } - return true, nil - }, func(err error) { - t.Fatalf("err: %s", err) - }) + requireBlockedEvalsEnqueued(t, blocked, broker, 1) } func TestBlockedEvals_UnblockIneligible(t *testing.T) { @@ -286,23 +274,7 @@ func TestBlockedEvals_UnblockUnknown(t *testing.T) { // Should unblock because the eval hasn't seen this node class. blocked.Unblock("v1:789", 1000) - - testutil.WaitForResult(func() (bool, error) { - // Verify Unblock causes an enqueue - brokerStats := broker.Stats() - if brokerStats.TotalReady != 1 { - return false, fmt.Errorf("bad: %#v", brokerStats) - } - - // Verify Unblock updates the stats - bStats := blocked.Stats() - if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 { - return false, fmt.Errorf("bad: %#v", bStats) - } - return true, nil - }, func(err error) { - t.Fatalf("err: %s", err) - }) + requireBlockedEvalsEnqueued(t, blocked, broker, 1) } func TestBlockedEvals_UnblockEligible_Quota(t *testing.T) { @@ -322,23 +294,7 @@ func TestBlockedEvals_UnblockEligible_Quota(t *testing.T) { } blocked.UnblockQuota("foo", 1000) - - testutil.WaitForResult(func() (bool, error) { - // Verify Unblock caused an enqueue - brokerStats := broker.Stats() - if brokerStats.TotalReady != 1 { - return false, fmt.Errorf("bad: %#v", brokerStats) - } - - // Verify Unblock updates the stats - bs := blocked.Stats() - if bs.TotalBlocked != 0 || bs.TotalEscaped != 0 || bs.TotalQuotaLimit != 0 { - return false, fmt.Errorf("bad: %#v", bs) - } - return true, nil - }, func(err error) { - t.Fatalf("err: %s", err) - }) + requireBlockedEvalsEnqueued(t, blocked, broker, 1) } func TestBlockedEvals_UnblockIneligible_Quota(t *testing.T) { @@ -416,22 +372,7 @@ func TestBlockedEvals_Reblock(t *testing.T) { t.Fatalf("err: %v", err) } - testutil.WaitForResult(func() (bool, error) { - // Verify Unblock causes an enqueue - brokerStats := broker.Stats() - if brokerStats.TotalReady != 1 { - return false, fmt.Errorf("bad: %#v", brokerStats) - } - - // Verify Unblock updates the stats - bStats := blocked.Stats() - if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 { - return false, fmt.Errorf("bad: %#v", bStats) - } - return true, nil - }, func(err error) { - t.Fatalf("err: %s", err) - }) + requireBlockedEvalsEnqueued(t, blocked, broker, 1) } // Test the block case in which the eval should be immediately unblocked since @@ -457,17 +398,7 @@ func TestBlockedEvals_Block_ImmediateUnblock_Escaped(t *testing.T) { t.Fatalf("bad: %#v", blockedStats) } - testutil.WaitForResult(func() (bool, error) { - // Verify Unblock caused an enqueue - brokerStats := broker.Stats() - if brokerStats.TotalReady != 1 { - return false, fmt.Errorf("bad: %#v", brokerStats) - } - - return true, nil - }, func(err error) { - t.Fatalf("err: %s", err) - }) + requireBlockedEvalsEnqueued(t, blocked, broker, 1) } // Test the block case in which the eval should be immediately unblocked since @@ -494,17 +425,7 @@ func TestBlockedEvals_Block_ImmediateUnblock_UnseenClass_After(t *testing.T) { t.Fatalf("bad: %#v", blockedStats) } - testutil.WaitForResult(func() (bool, error) { - // Verify Unblock caused an enqueue - brokerStats := broker.Stats() - if brokerStats.TotalReady != 1 { - return false, fmt.Errorf("bad: %#v", brokerStats) - } - - return true, nil - }, func(err error) { - t.Fatalf("err: %s", err) - }) + requireBlockedEvalsEnqueued(t, blocked, broker, 1) } // Test the block case in which the eval should not immediately unblock since @@ -555,17 +476,7 @@ func TestBlockedEvals_Block_ImmediateUnblock_SeenClass(t *testing.T) { t.Fatalf("bad: %#v", blockedStats) } - testutil.WaitForResult(func() (bool, error) { - // Verify Unblock caused an enqueue - brokerStats := broker.Stats() - if brokerStats.TotalReady != 1 { - return false, fmt.Errorf("bad: %#v", brokerStats) - } - - return true, nil - }, func(err error) { - t.Fatalf("err: %s", err) - }) + requireBlockedEvalsEnqueued(t, blocked, broker, 1) } // Test the block case in which the eval should be immediately unblocked since @@ -591,17 +502,7 @@ func TestBlockedEvals_Block_ImmediateUnblock_Quota(t *testing.T) { t.Fatalf("bad: %#v", bs) } - testutil.WaitForResult(func() (bool, error) { - // Verify Unblock caused an enqueue - brokerStats := broker.Stats() - if brokerStats.TotalReady != 1 { - return false, fmt.Errorf("bad: %#v", brokerStats) - } - - return true, nil - }, func(err error) { - t.Fatalf("err: %s", err) - }) + requireBlockedEvalsEnqueued(t, blocked, broker, 1) } func TestBlockedEvals_UnblockFailed(t *testing.T) { @@ -636,16 +537,7 @@ func TestBlockedEvals_UnblockFailed(t *testing.T) { t.Fatalf("bad: %#v", bs) } - testutil.WaitForResult(func() (bool, error) { - // Verify Unblock caused an enqueue - brokerStats := broker.Stats() - if brokerStats.TotalReady != 3 { - return false, fmt.Errorf("bad: %#v", brokerStats) - } - return true, nil - }, func(err error) { - t.Fatalf("err: %s", err) - }) + requireBlockedEvalsEnqueued(t, blocked, broker, 3) // Reblock an eval for the same job and check that it gets tracked. blocked.Block(e) @@ -704,3 +596,78 @@ func TestBlockedEvals_Untrack_Quota(t *testing.T) { t.Fatalf("bad: %#v", bs) } } + +func TestBlockedEvals_UnblockNode(t *testing.T) { + t.Parallel() + blocked, broker := testBlockedEvals(t) + + require.NotNil(t, broker) + + // Create a blocked evals and add it to the blocked tracker. + e := mock.Eval() + e.Type = structs.JobTypeSystem + e.NodeID = "foo" + e.SnapshotIndex = 999 + blocked.Block(e) + + // Verify block did track + bs := blocked.Stats() + require.Equal(t, 1, bs.TotalBlocked) + + blocked.UnblockNode("foo", 1000) + requireBlockedEvalsEnqueued(t, blocked, broker, 1) + bs = blocked.Stats() + require.Empty(t, blocked.system.byNode) + require.Equal(t, 0, bs.TotalBlocked) +} + +func TestBlockedEvals_SystemUntrack(t *testing.T) { + t.Parallel() + blocked, _ := testBlockedEvals(t) + + // Create a blocked evals and add it to the blocked tracker. + e := mock.Eval() + e.Type = structs.JobTypeSystem + e.NodeID = "foo" + blocked.Block(e) + + // Verify block did track + bs := blocked.Stats() + require.Equal(t, 1, bs.TotalBlocked) + require.Equal(t, 0, bs.TotalEscaped) + require.Equal(t, 0, bs.TotalQuotaLimit) + + // Untrack and verify + blocked.Untrack(e.JobID, e.Namespace) + bs = blocked.Stats() + require.Equal(t, 0, bs.TotalBlocked) + require.Equal(t, 0, bs.TotalEscaped) + require.Equal(t, 0, bs.TotalQuotaLimit) +} + +func TestBlockedEvals_SystemDisableFlush(t *testing.T) { + t.Parallel() + blocked, _ := testBlockedEvals(t) + + // Create a blocked evals and add it to the blocked tracker. + e := mock.Eval() + e.Type = structs.JobTypeSystem + e.NodeID = "foo" + blocked.Block(e) + + // Verify block did track + bs := blocked.Stats() + require.Equal(t, 1, bs.TotalBlocked) + require.Equal(t, 0, bs.TotalEscaped) + require.Equal(t, 0, bs.TotalQuotaLimit) + + // Disable empties + blocked.SetEnabled(false) + bs = blocked.Stats() + require.Equal(t, 0, bs.TotalBlocked) + require.Equal(t, 0, bs.TotalEscaped) + require.Equal(t, 0, bs.TotalQuotaLimit) + require.Empty(t, blocked.system.evals) + require.Empty(t, blocked.system.byJob) + require.Empty(t, blocked.system.byNode) +} diff --git a/nomad/fsm.go b/nomad/fsm.go index 50a397e0b8e..c41e5b9d1b4 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -344,6 +344,7 @@ func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} { } n.blockedEvals.Unblock(node.ComputedClass, index) + n.blockedEvals.UnblockNode(req.NodeID, index) } return nil @@ -415,6 +416,7 @@ func (n *nomadFSM) applyNodeEligibilityUpdate(buf []byte, index uint64) interfac if node != nil && node.SchedulingEligibility == structs.NodeSchedulingIneligible && req.Eligibility == structs.NodeSchedulingEligible { n.blockedEvals.Unblock(node.ComputedClass, index) + n.blockedEvals.UnblockNode(req.NodeID, index) } return nil @@ -760,6 +762,7 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{} } n.blockedEvals.UnblockClassAndQuota(node.ComputedClass, quota, index) + n.blockedEvals.UnblockNode(node.ID, index) } } diff --git a/nomad/worker.go b/nomad/worker.go index 06a8dc34db1..68e57bcb49e 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -253,7 +253,7 @@ func (w *Worker) invokeScheduler(snap *state.StateSnapshot, eval *structs.Evalua return fmt.Errorf("failed to determine snapshot's index: %v", err) } - // Create the scheduler, or use the special system scheduler + // Create the scheduler, or use the special core scheduler var sched scheduler.Scheduler if eval.Type == structs.JobTypeCore { sched = NewCoreScheduler(w.srv, snap) diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 9dacb0d73d9..23e0745bed9 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -257,7 +257,8 @@ func (s *GenericScheduler) process() (bool, error) { // If there are failed allocations, we need to create a blocked evaluation // to place the failed allocations when resources become available. If the - // current evaluation is already a blocked eval, we reuse it. + // current evaluation is already a blocked eval, we reuse it by submitting + // a new eval to the planner in createBlockedEval if s.eval.Status != structs.EvalStatusBlocked && len(s.failedTGAllocs) != 0 && s.blocked == nil { if err := s.createBlockedEval(false); err != nil { s.logger.Error("failed to make blocked eval", "error", err) diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index 7918bbcc021..b0fab7756ab 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -18,6 +18,7 @@ const ( // SystemScheduler is used for 'system' jobs. This scheduler is // designed for services that should be run on every client. +// One for each job, containing an allocation for each node type SystemScheduler struct { logger log.Logger state State @@ -61,7 +62,8 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error { switch eval.TriggeredBy { case structs.EvalTriggerJobRegister, structs.EvalTriggerNodeUpdate, structs.EvalTriggerFailedFollowUp, structs.EvalTriggerJobDeregister, structs.EvalTriggerRollingUpdate, structs.EvalTriggerPreemption, - structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerNodeDrain, structs.EvalTriggerAllocStop: + structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerNodeDrain, structs.EvalTriggerAllocStop, + structs.EvalTriggerQueuedAllocs: default: desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason", eval.TriggeredBy) @@ -324,6 +326,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { // Actual failure to start this task on this candidate node, report it individually s.failedTGAllocs[missing.TaskGroup.Name] = s.ctx.Metrics() + s.addBlocked(node) continue } @@ -390,3 +393,22 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { return nil } + +// addBlocked creates a new blocked eval for this job on this node +// and submit to the planner (worker.go), which keeps the eval for execution later +func (s *SystemScheduler) addBlocked(node *structs.Node) error { + e := s.ctx.Eligibility() + escaped := e.HasEscaped() + + // Only store the eligible classes if the eval hasn't escaped. + var classEligibility map[string]bool + if !escaped { + classEligibility = e.GetClasses() + } + + blocked := s.eval.CreateBlockedEval(classEligibility, escaped, e.QuotaLimitReached()) + blocked.StatusDescription = blockedEvalFailedPlacements + blocked.NodeID = node.ID + + return s.planner.CreateEval(blocked) +}