From b0ce6845e71c1395c7c8304df1211cbebc856790 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Tue, 26 Oct 2021 08:18:00 -0400 Subject: [PATCH] scheduler: stop allocs in unrelated nodes he system scheduler should leave allocs on draining nodes as-is, but stop node stop allocs on nodes that are no longer part of the job datacenters. Previously, the scheduler did not make the distinction and left system job allocs intact if they are already running. --- scheduler/generic_sched.go | 2 +- scheduler/scheduler_system.go | 9 +++++---- scheduler/util.go | 34 ++++++++++++++++++++++++---------- scheduler/util_test.go | 15 +++++++++------ 4 files changed, 39 insertions(+), 21 deletions(-) diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index e5273c00db9..69fcfbddb21 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -471,7 +471,7 @@ func (s *GenericScheduler) downgradedJobForPlacement(p placementResult) (string, // destructive updates to place and the set of new placements to place. func (s *GenericScheduler) computePlacements(destructive, place []placementResult) error { // Get the base nodes - nodes, byDC, err := readyNodesInDCs(s.state, s.job.Datacenters) + nodes, _, byDC, err := readyNodesInDCs(s.state, s.job.Datacenters) if err != nil { return err } diff --git a/scheduler/scheduler_system.go b/scheduler/scheduler_system.go index 2b0da38bdef..5d0278eae65 100644 --- a/scheduler/scheduler_system.go +++ b/scheduler/scheduler_system.go @@ -37,8 +37,9 @@ type SystemScheduler struct { ctx *EvalContext stack *SystemStack - nodes []*structs.Node - nodesByDC map[string]int + nodes []*structs.Node + notReadyNodes map[string]struct{} + nodesByDC map[string]int limitReached bool nextEval *structs.Evaluation @@ -122,7 +123,7 @@ func (s *SystemScheduler) process() (bool, error) { // Get the ready nodes in the required datacenters if !s.job.Stopped() { - s.nodes, s.nodesByDC, err = readyNodesInDCs(s.state, s.job.Datacenters) + s.nodes, s.notReadyNodes, s.nodesByDC, err = readyNodesInDCs(s.state, s.job.Datacenters) if err != nil { return false, fmt.Errorf("failed to get ready nodes: %v", err) } @@ -219,7 +220,7 @@ func (s *SystemScheduler) computeJobAllocs() error { live, term := structs.SplitTerminalAllocs(allocs) // Diff the required and existing allocations - diff := diffSystemAllocs(s.job, s.nodes, tainted, live, term) + diff := diffSystemAllocs(s.job, s.nodes, s.notReadyNodes, tainted, live, term) s.logger.Debug("reconciled current state with desired state", "place", len(diff.place), "update", len(diff.update), "migrate", len(diff.migrate), "stop", len(diff.stop), diff --git a/scheduler/util.go b/scheduler/util.go index 112993a57fd..c432c05e0b8 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -65,7 +65,8 @@ func diffSystemAllocsForNode( job *structs.Job, // job whose allocs are going to be diff-ed nodeID string, eligibleNodes map[string]*structs.Node, - taintedNodes map[string]*structs.Node, // nodes which are down or in drain (by node name) + notReadyNodes map[string]struct{}, // nodes that are not ready, e.g. draining + taintedNodes map[string]*structs.Node, // nodes which are down (by node name) required map[string]*structs.TaskGroup, // set of allocations that must exist allocs []*structs.Allocation, // non-terminal allocations that exist terminal structs.TerminalByNodeByName, // latest terminal allocations (by node, name) @@ -139,10 +140,21 @@ func diffSystemAllocsForNode( // For an existing allocation, if the nodeID is no longer // eligible, the diff should be ignored - if _, ok := eligibleNodes[nodeID]; !ok { + if _, ok := notReadyNodes[nodeID]; ok { goto IGNORE } + // Existing allocations on nodes that are no longer targeted + // should be stopped + if _, ok := eligibleNodes[nodeID]; !ok { + result.stop = append(result.stop, allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: exist, + }) + continue + } + // If the definition is updated we need to update if job.JobModifyIndex != exist.Job.JobModifyIndex { result.update = append(result.update, allocTuple{ @@ -229,7 +241,8 @@ func diffSystemAllocsForNode( // diffResult contain the specific nodeID they should be allocated on. func diffSystemAllocs( job *structs.Job, // jobs whose allocations are going to be diff-ed - nodes []*structs.Node, // list of nodes in the ready state + readyNodes []*structs.Node, // list of nodes in the ready state + notReadyNodes map[string]struct{}, // list of nodes in DC but not ready, e.g. draining taintedNodes map[string]*structs.Node, // nodes which are down or drain mode (by name) allocs []*structs.Allocation, // non-terminal allocations terminal structs.TerminalByNodeByName, // latest terminal allocations (by name) @@ -238,12 +251,11 @@ func diffSystemAllocs( // Build a mapping of nodes to all their allocs. nodeAllocs := make(map[string][]*structs.Allocation, len(allocs)) for _, alloc := range allocs { - nallocs := append(nodeAllocs[alloc.NodeID], alloc) //nolint:gocritic - nodeAllocs[alloc.NodeID] = nallocs + nodeAllocs[alloc.NodeID] = append(nodeAllocs[alloc.NodeID], alloc) } eligibleNodes := make(map[string]*structs.Node) - for _, node := range nodes { + for _, node := range readyNodes { if _, ok := nodeAllocs[node.ID]; !ok { nodeAllocs[node.ID] = nil } @@ -255,7 +267,7 @@ func diffSystemAllocs( result := new(diffResult) for nodeID, allocs := range nodeAllocs { - diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, taintedNodes, required, allocs, terminal) + diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, notReadyNodes, taintedNodes, required, allocs, terminal) result.Append(diff) } @@ -264,7 +276,7 @@ func diffSystemAllocs( // readyNodesInDCs returns all the ready nodes in the given datacenters and a // mapping of each data center to the count of ready nodes. -func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int, error) { +func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]struct{}, map[string]int, error) { // Index the DCs dcMap := make(map[string]int, len(dcs)) for _, dc := range dcs { @@ -274,9 +286,10 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int // Scan the nodes ws := memdb.NewWatchSet() var out []*structs.Node + notReady := map[string]struct{}{} iter, err := state.Nodes(ws) if err != nil { - return nil, nil, err + return nil, nil, nil, err } for { raw := iter.Next() @@ -287,6 +300,7 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int // Filter on datacenter and status node := raw.(*structs.Node) if !node.Ready() { + notReady[node.ID] = struct{}{} continue } if _, ok := dcMap[node.Datacenter]; !ok { @@ -295,7 +309,7 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int out = append(out, node) dcMap[node.Datacenter]++ } - return out, dcMap, nil + return out, notReady, dcMap, nil } // retryMax is used to retry a callback until it returns success or diff --git a/scheduler/util_test.go b/scheduler/util_test.go index eba4227e677..2296c784397 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -61,7 +61,7 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) { }, } - diff := diffSystemAllocsForNode(job, "node1", eligible, tainted, required, live, terminal) + diff := diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal) require.Empty(t, diff.place) require.Empty(t, diff.update) require.Empty(t, diff.stop) @@ -87,7 +87,7 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) { expAlloc := terminal["node1"]["my-sysbatch.pinger[0]"] expAlloc.NodeID = "node1" - diff := diffSystemAllocsForNode(job, "node1", eligible, tainted, required, live, terminal) + diff := diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal) require.Empty(t, diff.place) require.Equal(t, 1, len(diff.update)) require.Empty(t, diff.stop) @@ -191,7 +191,7 @@ func TestDiffSystemAllocsForNode(t *testing.T) { }, } - diff := diffSystemAllocsForNode(job, "zip", eligible, tainted, required, allocs, terminal) + diff := diffSystemAllocsForNode(job, "zip", eligible, nil, tainted, required, allocs, terminal) place := diff.place update := diff.update migrate := diff.migrate @@ -274,7 +274,7 @@ func TestDiffSystemAllocsForNode_ExistingAllocIneligibleNode(t *testing.T) { // No terminal allocs terminal := make(structs.TerminalByNodeByName) - diff := diffSystemAllocsForNode(job, eligibleNode.ID, eligible, tainted, required, allocs, terminal) + diff := diffSystemAllocsForNode(job, eligibleNode.ID, eligible, nil, tainted, required, allocs, terminal) place := diff.place update := diff.update migrate := diff.migrate @@ -360,7 +360,7 @@ func TestDiffSystemAllocs(t *testing.T) { }, } - diff := diffSystemAllocs(job, nodes, tainted, allocs, terminal) + diff := diffSystemAllocs(job, nodes, nil, tainted, allocs, terminal) place := diff.place update := diff.update migrate := diff.migrate @@ -415,7 +415,7 @@ func TestReadyNodesInDCs(t *testing.T) { require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1002, node3)) require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1003, node4)) - nodes, dc, err := readyNodesInDCs(state, []string{"dc1", "dc2"}) + nodes, notReady, dc, err := readyNodesInDCs(state, []string{"dc1", "dc2"}) require.NoError(t, err) require.Equal(t, 2, len(nodes)) require.True(t, nodes[0].ID != node3.ID && nodes[1].ID != node3.ID) @@ -424,6 +424,9 @@ func TestReadyNodesInDCs(t *testing.T) { require.Equal(t, 1, dc["dc1"]) require.Contains(t, dc, "dc2") require.Equal(t, 1, dc["dc2"]) + + require.Contains(t, notReady, node3.ID) + require.Contains(t, notReady, node4.ID) } func TestRetryMax(t *testing.T) {