From 1df3f2efc0e0e8fb4069da8de15f6dd00a8c3402 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Tue, 26 Oct 2021 08:14:41 -0400 Subject: [PATCH 1/5] Add a failing test --- scheduler/scheduler_system_test.go | 85 ++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/scheduler/scheduler_system_test.go b/scheduler/scheduler_system_test.go index b46bbc9e444..ac9c3725e72 100644 --- a/scheduler/scheduler_system_test.go +++ b/scheduler/scheduler_system_test.go @@ -766,6 +766,91 @@ func TestSystemSched_JobModify_InPlace(t *testing.T) { } } +func TestSystemSched_JobModify_RemoveDC(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + node1 := mock.Node() + node1.Datacenter = "dc1" + require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node1)) + + node2 := mock.Node() + node2.Datacenter = "dc2" + require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node2)) + + fmt.Println("DC1 node: ", node1.ID) + fmt.Println("DC2 node: ", node2.ID) + nodes := []*structs.Node{node1, node2} + + // Generate a fake job with allocations + job := mock.SystemJob() + job.Datacenters = []string{"dc1", "dc2"} + require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job)) + + var allocs []*structs.Allocation + for _, node := range nodes { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = "my-job.web[0]" + allocs = append(allocs, alloc) + } + require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs)) + + // Update the job + job2 := job.Copy() + job2.Datacenters = []string{"dc1"} + require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job2)) + + // Create a mock evaluation to deal with update + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: 50, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + require.NoError(t, err) + + // Ensure a single plan + require.Len(t, h.Plans, 1) + plan := h.Plans[0] + + // Ensure the plan did not evict any allocs + var update []*structs.Allocation + for _, updateList := range plan.NodeUpdate { + update = append(update, updateList...) + } + require.Len(t, update, 1) + + // Ensure the plan updated the existing allocs + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + require.Len(t, planned, 1) + + for _, p := range planned { + require.Equal(t, job2, p.Job, "should update job") + } + + // Lookup the allocations by JobID + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false) + require.NoError(t, err) + + // Ensure all allocations placed + require.Len(t, out, 2) + h.AssertEvalStatus(t, structs.EvalStatusComplete) + +} + func TestSystemSched_JobDeregister_Purged(t *testing.T) { h := NewHarness(t) From b0ce6845e71c1395c7c8304df1211cbebc856790 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Tue, 26 Oct 2021 08:18:00 -0400 Subject: [PATCH 2/5] scheduler: stop allocs in unrelated nodes he system scheduler should leave allocs on draining nodes as-is, but stop node stop allocs on nodes that are no longer part of the job datacenters. Previously, the scheduler did not make the distinction and left system job allocs intact if they are already running. --- scheduler/generic_sched.go | 2 +- scheduler/scheduler_system.go | 9 +++++---- scheduler/util.go | 34 ++++++++++++++++++++++++---------- scheduler/util_test.go | 15 +++++++++------ 4 files changed, 39 insertions(+), 21 deletions(-) diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index e5273c00db9..69fcfbddb21 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -471,7 +471,7 @@ func (s *GenericScheduler) downgradedJobForPlacement(p placementResult) (string, // destructive updates to place and the set of new placements to place. func (s *GenericScheduler) computePlacements(destructive, place []placementResult) error { // Get the base nodes - nodes, byDC, err := readyNodesInDCs(s.state, s.job.Datacenters) + nodes, _, byDC, err := readyNodesInDCs(s.state, s.job.Datacenters) if err != nil { return err } diff --git a/scheduler/scheduler_system.go b/scheduler/scheduler_system.go index 2b0da38bdef..5d0278eae65 100644 --- a/scheduler/scheduler_system.go +++ b/scheduler/scheduler_system.go @@ -37,8 +37,9 @@ type SystemScheduler struct { ctx *EvalContext stack *SystemStack - nodes []*structs.Node - nodesByDC map[string]int + nodes []*structs.Node + notReadyNodes map[string]struct{} + nodesByDC map[string]int limitReached bool nextEval *structs.Evaluation @@ -122,7 +123,7 @@ func (s *SystemScheduler) process() (bool, error) { // Get the ready nodes in the required datacenters if !s.job.Stopped() { - s.nodes, s.nodesByDC, err = readyNodesInDCs(s.state, s.job.Datacenters) + s.nodes, s.notReadyNodes, s.nodesByDC, err = readyNodesInDCs(s.state, s.job.Datacenters) if err != nil { return false, fmt.Errorf("failed to get ready nodes: %v", err) } @@ -219,7 +220,7 @@ func (s *SystemScheduler) computeJobAllocs() error { live, term := structs.SplitTerminalAllocs(allocs) // Diff the required and existing allocations - diff := diffSystemAllocs(s.job, s.nodes, tainted, live, term) + diff := diffSystemAllocs(s.job, s.nodes, s.notReadyNodes, tainted, live, term) s.logger.Debug("reconciled current state with desired state", "place", len(diff.place), "update", len(diff.update), "migrate", len(diff.migrate), "stop", len(diff.stop), diff --git a/scheduler/util.go b/scheduler/util.go index 112993a57fd..c432c05e0b8 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -65,7 +65,8 @@ func diffSystemAllocsForNode( job *structs.Job, // job whose allocs are going to be diff-ed nodeID string, eligibleNodes map[string]*structs.Node, - taintedNodes map[string]*structs.Node, // nodes which are down or in drain (by node name) + notReadyNodes map[string]struct{}, // nodes that are not ready, e.g. draining + taintedNodes map[string]*structs.Node, // nodes which are down (by node name) required map[string]*structs.TaskGroup, // set of allocations that must exist allocs []*structs.Allocation, // non-terminal allocations that exist terminal structs.TerminalByNodeByName, // latest terminal allocations (by node, name) @@ -139,10 +140,21 @@ func diffSystemAllocsForNode( // For an existing allocation, if the nodeID is no longer // eligible, the diff should be ignored - if _, ok := eligibleNodes[nodeID]; !ok { + if _, ok := notReadyNodes[nodeID]; ok { goto IGNORE } + // Existing allocations on nodes that are no longer targeted + // should be stopped + if _, ok := eligibleNodes[nodeID]; !ok { + result.stop = append(result.stop, allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: exist, + }) + continue + } + // If the definition is updated we need to update if job.JobModifyIndex != exist.Job.JobModifyIndex { result.update = append(result.update, allocTuple{ @@ -229,7 +241,8 @@ func diffSystemAllocsForNode( // diffResult contain the specific nodeID they should be allocated on. func diffSystemAllocs( job *structs.Job, // jobs whose allocations are going to be diff-ed - nodes []*structs.Node, // list of nodes in the ready state + readyNodes []*structs.Node, // list of nodes in the ready state + notReadyNodes map[string]struct{}, // list of nodes in DC but not ready, e.g. draining taintedNodes map[string]*structs.Node, // nodes which are down or drain mode (by name) allocs []*structs.Allocation, // non-terminal allocations terminal structs.TerminalByNodeByName, // latest terminal allocations (by name) @@ -238,12 +251,11 @@ func diffSystemAllocs( // Build a mapping of nodes to all their allocs. nodeAllocs := make(map[string][]*structs.Allocation, len(allocs)) for _, alloc := range allocs { - nallocs := append(nodeAllocs[alloc.NodeID], alloc) //nolint:gocritic - nodeAllocs[alloc.NodeID] = nallocs + nodeAllocs[alloc.NodeID] = append(nodeAllocs[alloc.NodeID], alloc) } eligibleNodes := make(map[string]*structs.Node) - for _, node := range nodes { + for _, node := range readyNodes { if _, ok := nodeAllocs[node.ID]; !ok { nodeAllocs[node.ID] = nil } @@ -255,7 +267,7 @@ func diffSystemAllocs( result := new(diffResult) for nodeID, allocs := range nodeAllocs { - diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, taintedNodes, required, allocs, terminal) + diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, notReadyNodes, taintedNodes, required, allocs, terminal) result.Append(diff) } @@ -264,7 +276,7 @@ func diffSystemAllocs( // readyNodesInDCs returns all the ready nodes in the given datacenters and a // mapping of each data center to the count of ready nodes. -func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int, error) { +func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]struct{}, map[string]int, error) { // Index the DCs dcMap := make(map[string]int, len(dcs)) for _, dc := range dcs { @@ -274,9 +286,10 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int // Scan the nodes ws := memdb.NewWatchSet() var out []*structs.Node + notReady := map[string]struct{}{} iter, err := state.Nodes(ws) if err != nil { - return nil, nil, err + return nil, nil, nil, err } for { raw := iter.Next() @@ -287,6 +300,7 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int // Filter on datacenter and status node := raw.(*structs.Node) if !node.Ready() { + notReady[node.ID] = struct{}{} continue } if _, ok := dcMap[node.Datacenter]; !ok { @@ -295,7 +309,7 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int out = append(out, node) dcMap[node.Datacenter]++ } - return out, dcMap, nil + return out, notReady, dcMap, nil } // retryMax is used to retry a callback until it returns success or diff --git a/scheduler/util_test.go b/scheduler/util_test.go index eba4227e677..2296c784397 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -61,7 +61,7 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) { }, } - diff := diffSystemAllocsForNode(job, "node1", eligible, tainted, required, live, terminal) + diff := diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal) require.Empty(t, diff.place) require.Empty(t, diff.update) require.Empty(t, diff.stop) @@ -87,7 +87,7 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) { expAlloc := terminal["node1"]["my-sysbatch.pinger[0]"] expAlloc.NodeID = "node1" - diff := diffSystemAllocsForNode(job, "node1", eligible, tainted, required, live, terminal) + diff := diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal) require.Empty(t, diff.place) require.Equal(t, 1, len(diff.update)) require.Empty(t, diff.stop) @@ -191,7 +191,7 @@ func TestDiffSystemAllocsForNode(t *testing.T) { }, } - diff := diffSystemAllocsForNode(job, "zip", eligible, tainted, required, allocs, terminal) + diff := diffSystemAllocsForNode(job, "zip", eligible, nil, tainted, required, allocs, terminal) place := diff.place update := diff.update migrate := diff.migrate @@ -274,7 +274,7 @@ func TestDiffSystemAllocsForNode_ExistingAllocIneligibleNode(t *testing.T) { // No terminal allocs terminal := make(structs.TerminalByNodeByName) - diff := diffSystemAllocsForNode(job, eligibleNode.ID, eligible, tainted, required, allocs, terminal) + diff := diffSystemAllocsForNode(job, eligibleNode.ID, eligible, nil, tainted, required, allocs, terminal) place := diff.place update := diff.update migrate := diff.migrate @@ -360,7 +360,7 @@ func TestDiffSystemAllocs(t *testing.T) { }, } - diff := diffSystemAllocs(job, nodes, tainted, allocs, terminal) + diff := diffSystemAllocs(job, nodes, nil, tainted, allocs, terminal) place := diff.place update := diff.update migrate := diff.migrate @@ -415,7 +415,7 @@ func TestReadyNodesInDCs(t *testing.T) { require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1002, node3)) require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1003, node4)) - nodes, dc, err := readyNodesInDCs(state, []string{"dc1", "dc2"}) + nodes, notReady, dc, err := readyNodesInDCs(state, []string{"dc1", "dc2"}) require.NoError(t, err) require.Equal(t, 2, len(nodes)) require.True(t, nodes[0].ID != node3.ID && nodes[1].ID != node3.ID) @@ -424,6 +424,9 @@ func TestReadyNodesInDCs(t *testing.T) { require.Equal(t, 1, dc["dc1"]) require.Contains(t, dc, "dc2") require.Equal(t, 1, dc["dc2"]) + + require.Contains(t, notReady, node3.ID) + require.Contains(t, notReady, node4.ID) } func TestRetryMax(t *testing.T) { From ff58c60e81106182a3b67d26c2647c4bcdf077e5 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Tue, 26 Oct 2021 08:26:52 -0400 Subject: [PATCH 3/5] Refactor tests further --- scheduler/util_test.go | 78 ++++++++++++++++++------------------------ 1 file changed, 34 insertions(+), 44 deletions(-) diff --git a/scheduler/util_test.go b/scheduler/util_test.go index 2296c784397..7064c50ae67 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -89,7 +89,7 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) { diff := diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal) require.Empty(t, diff.place) - require.Equal(t, 1, len(diff.update)) + require.Len(t, diff.update, 1) require.Empty(t, diff.stop) require.Empty(t, diff.migrate) require.Empty(t, diff.lost) @@ -192,30 +192,29 @@ func TestDiffSystemAllocsForNode(t *testing.T) { } diff := diffSystemAllocsForNode(job, "zip", eligible, nil, tainted, required, allocs, terminal) - place := diff.place - update := diff.update - migrate := diff.migrate - stop := diff.stop - ignore := diff.ignore - lost := diff.lost // We should update the first alloc - require.True(t, len(update) == 1 && update[0].Alloc == allocs[0]) + require.Len(t, diff.update, 1) + require.Equal(t, allocs[0], diff.update[0].Alloc) // We should ignore the second alloc - require.True(t, len(ignore) == 1 && ignore[0].Alloc == allocs[1]) + require.Len(t, diff.ignore, 1) + require.Equal(t, allocs[1], diff.ignore[0].Alloc) // We should stop the 3rd alloc - require.True(t, len(stop) == 1 && stop[0].Alloc == allocs[2]) + require.Len(t, diff.stop, 1) + require.Equal(t, allocs[2], diff.stop[0].Alloc) // We should migrate the 4rd alloc - require.True(t, len(migrate) == 1 && migrate[0].Alloc == allocs[3]) + require.Len(t, diff.migrate, 1) + require.Equal(t, allocs[3], diff.migrate[0].Alloc) // We should mark the 5th alloc as lost - require.True(t, len(lost) == 1 && lost[0].Alloc == allocs[4]) + require.Len(t, diff.lost, 1) + require.Equal(t, allocs[4], diff.lost[0].Alloc) // We should place 6 - require.Equal(t, 6, len(place)) + require.Len(t, diff.place, 6) // Ensure that the allocations which are replacements of terminal allocs are // annotated. @@ -223,8 +222,7 @@ func TestDiffSystemAllocsForNode(t *testing.T) { for _, alloc := range m { for _, tuple := range diff.place { if alloc.Name == tuple.Name { - require.True(t, reflect.DeepEqual(alloc, tuple.Alloc), - "expected: %#v, actual: %#v", alloc, tuple.Alloc) + require.Equal(t, alloc, tuple.Alloc) } } } @@ -275,19 +273,13 @@ func TestDiffSystemAllocsForNode_ExistingAllocIneligibleNode(t *testing.T) { terminal := make(structs.TerminalByNodeByName) diff := diffSystemAllocsForNode(job, eligibleNode.ID, eligible, nil, tainted, required, allocs, terminal) - place := diff.place - update := diff.update - migrate := diff.migrate - stop := diff.stop - ignore := diff.ignore - lost := diff.lost - - require.Len(t, place, 0) - require.Len(t, update, 1) - require.Len(t, migrate, 0) - require.Len(t, stop, 0) - require.Len(t, ignore, 1) - require.Len(t, lost, 0) + + require.Len(t, diff.place, 0) + require.Len(t, diff.update, 1) + require.Len(t, diff.migrate, 0) + require.Len(t, diff.stop, 0) + require.Len(t, diff.ignore, 1) + require.Len(t, diff.lost, 0) } func TestDiffSystemAllocs(t *testing.T) { @@ -361,30 +353,28 @@ func TestDiffSystemAllocs(t *testing.T) { } diff := diffSystemAllocs(job, nodes, nil, tainted, allocs, terminal) - place := diff.place - update := diff.update - migrate := diff.migrate - stop := diff.stop - ignore := diff.ignore - lost := diff.lost // We should update the first alloc - require.True(t, len(update) == 1 && update[0].Alloc == allocs[0]) + require.Len(t, diff.update, 1) + require.Equal(t, allocs[0], diff.update[0].Alloc) // We should ignore the second alloc - require.True(t, len(ignore) == 1 && ignore[0].Alloc == allocs[1]) + require.Len(t, diff.ignore, 1) + require.Equal(t, allocs[1], diff.ignore[0].Alloc) // We should stop the third alloc - require.Empty(t, stop) + require.Empty(t, diff.stop) // There should be no migrates. - require.True(t, len(migrate) == 1 && migrate[0].Alloc == allocs[2]) + require.Len(t, diff.migrate, 1) + require.Equal(t, allocs[2], diff.migrate[0].Alloc) // We should mark the 5th alloc as lost - require.True(t, len(lost) == 1 && lost[0].Alloc == allocs[3]) + require.Len(t, diff.lost, 1) + require.Equal(t, allocs[3], diff.lost[0].Alloc) - // We should place 1 - require.Equal(t, 2, len(place)) + // We should place 2 + require.Len(t, diff.place, 2) // Ensure that the allocations which are replacements of terminal allocs are // annotated. @@ -392,8 +382,7 @@ func TestDiffSystemAllocs(t *testing.T) { for _, alloc := range m { for _, tuple := range diff.place { if alloc.NodeID == tuple.Alloc.NodeID { - require.True(t, reflect.DeepEqual(alloc, tuple.Alloc), - "expected: %#v, actual: %#v", alloc, tuple.Alloc) + require.Equal(t, alloc, tuple.Alloc) } } } @@ -418,7 +407,8 @@ func TestReadyNodesInDCs(t *testing.T) { nodes, notReady, dc, err := readyNodesInDCs(state, []string{"dc1", "dc2"}) require.NoError(t, err) require.Equal(t, 2, len(nodes)) - require.True(t, nodes[0].ID != node3.ID && nodes[1].ID != node3.ID) + require.NotEqual(t, node3.ID, nodes[0].ID) + require.NotEqual(t, node3.ID, nodes[1].ID) require.Contains(t, dc, "dc1") require.Equal(t, 1, dc["dc1"]) From 1004cf62b1179c7e853eb8f03a9a41663e543c20 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Tue, 26 Oct 2021 21:29:08 -0700 Subject: [PATCH 4/5] schedule: we use node id We use node ids rather than node names when hashing or grouping. --- scheduler/util.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/scheduler/util.go b/scheduler/util.go index c432c05e0b8..869442b78f9 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -66,10 +66,10 @@ func diffSystemAllocsForNode( nodeID string, eligibleNodes map[string]*structs.Node, notReadyNodes map[string]struct{}, // nodes that are not ready, e.g. draining - taintedNodes map[string]*structs.Node, // nodes which are down (by node name) + taintedNodes map[string]*structs.Node, // nodes which are down (by node id) required map[string]*structs.TaskGroup, // set of allocations that must exist allocs []*structs.Allocation, // non-terminal allocations that exist - terminal structs.TerminalByNodeByName, // latest terminal allocations (by node, name) + terminal structs.TerminalByNodeByName, // latest terminal allocations (by node, id) ) *diffResult { result := new(diffResult) @@ -243,9 +243,9 @@ func diffSystemAllocs( job *structs.Job, // jobs whose allocations are going to be diff-ed readyNodes []*structs.Node, // list of nodes in the ready state notReadyNodes map[string]struct{}, // list of nodes in DC but not ready, e.g. draining - taintedNodes map[string]*structs.Node, // nodes which are down or drain mode (by name) + taintedNodes map[string]*structs.Node, // nodes which are down or drain mode (by node id) allocs []*structs.Allocation, // non-terminal allocations - terminal structs.TerminalByNodeByName, // latest terminal allocations (by name) + terminal structs.TerminalByNodeByName, // latest terminal allocations (by node id) ) *diffResult { // Build a mapping of nodes to all their allocs. From 1b3d2788ecd00933ccdd5b1415639d237b010b8a Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Tue, 26 Oct 2021 21:33:18 -0700 Subject: [PATCH 5/5] changelog --- .changelog/11391.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/11391.txt diff --git a/.changelog/11391.txt b/.changelog/11391.txt new file mode 100644 index 00000000000..6afb8a97602 --- /dev/null +++ b/.changelog/11391.txt @@ -0,0 +1,3 @@ +```release-note:bug +core: Fix a bug to stop running system job allocations once their datacenters are removed from the job +```