diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index e5273c00db9..69fcfbddb21 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -471,7 +471,7 @@ func (s *GenericScheduler) downgradedJobForPlacement(p placementResult) (string, // destructive updates to place and the set of new placements to place. func (s *GenericScheduler) computePlacements(destructive, place []placementResult) error { // Get the base nodes - nodes, byDC, err := readyNodesInDCs(s.state, s.job.Datacenters) + nodes, _, byDC, err := readyNodesInDCs(s.state, s.job.Datacenters) if err != nil { return err } diff --git a/scheduler/scheduler_system.go b/scheduler/scheduler_system.go index 2b0da38bdef..5d0278eae65 100644 --- a/scheduler/scheduler_system.go +++ b/scheduler/scheduler_system.go @@ -37,8 +37,9 @@ type SystemScheduler struct { ctx *EvalContext stack *SystemStack - nodes []*structs.Node - nodesByDC map[string]int + nodes []*structs.Node + notReadyNodes map[string]struct{} + nodesByDC map[string]int limitReached bool nextEval *structs.Evaluation @@ -122,7 +123,7 @@ func (s *SystemScheduler) process() (bool, error) { // Get the ready nodes in the required datacenters if !s.job.Stopped() { - s.nodes, s.nodesByDC, err = readyNodesInDCs(s.state, s.job.Datacenters) + s.nodes, s.notReadyNodes, s.nodesByDC, err = readyNodesInDCs(s.state, s.job.Datacenters) if err != nil { return false, fmt.Errorf("failed to get ready nodes: %v", err) } @@ -219,7 +220,7 @@ func (s *SystemScheduler) computeJobAllocs() error { live, term := structs.SplitTerminalAllocs(allocs) // Diff the required and existing allocations - diff := diffSystemAllocs(s.job, s.nodes, tainted, live, term) + diff := diffSystemAllocs(s.job, s.nodes, s.notReadyNodes, tainted, live, term) s.logger.Debug("reconciled current state with desired state", "place", len(diff.place), "update", len(diff.update), "migrate", len(diff.migrate), "stop", len(diff.stop), diff --git a/scheduler/util.go b/scheduler/util.go index 112993a57fd..c432c05e0b8 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -65,7 +65,8 @@ func diffSystemAllocsForNode( job *structs.Job, // job whose allocs are going to be diff-ed nodeID string, eligibleNodes map[string]*structs.Node, - taintedNodes map[string]*structs.Node, // nodes which are down or in drain (by node name) + notReadyNodes map[string]struct{}, // nodes that are not ready, e.g. draining + taintedNodes map[string]*structs.Node, // nodes which are down (by node name) required map[string]*structs.TaskGroup, // set of allocations that must exist allocs []*structs.Allocation, // non-terminal allocations that exist terminal structs.TerminalByNodeByName, // latest terminal allocations (by node, name) @@ -139,10 +140,21 @@ func diffSystemAllocsForNode( // For an existing allocation, if the nodeID is no longer // eligible, the diff should be ignored - if _, ok := eligibleNodes[nodeID]; !ok { + if _, ok := notReadyNodes[nodeID]; ok { goto IGNORE } + // Existing allocations on nodes that are no longer targeted + // should be stopped + if _, ok := eligibleNodes[nodeID]; !ok { + result.stop = append(result.stop, allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: exist, + }) + continue + } + // If the definition is updated we need to update if job.JobModifyIndex != exist.Job.JobModifyIndex { result.update = append(result.update, allocTuple{ @@ -229,7 +241,8 @@ func diffSystemAllocsForNode( // diffResult contain the specific nodeID they should be allocated on. func diffSystemAllocs( job *structs.Job, // jobs whose allocations are going to be diff-ed - nodes []*structs.Node, // list of nodes in the ready state + readyNodes []*structs.Node, // list of nodes in the ready state + notReadyNodes map[string]struct{}, // list of nodes in DC but not ready, e.g. draining taintedNodes map[string]*structs.Node, // nodes which are down or drain mode (by name) allocs []*structs.Allocation, // non-terminal allocations terminal structs.TerminalByNodeByName, // latest terminal allocations (by name) @@ -238,12 +251,11 @@ func diffSystemAllocs( // Build a mapping of nodes to all their allocs. nodeAllocs := make(map[string][]*structs.Allocation, len(allocs)) for _, alloc := range allocs { - nallocs := append(nodeAllocs[alloc.NodeID], alloc) //nolint:gocritic - nodeAllocs[alloc.NodeID] = nallocs + nodeAllocs[alloc.NodeID] = append(nodeAllocs[alloc.NodeID], alloc) } eligibleNodes := make(map[string]*structs.Node) - for _, node := range nodes { + for _, node := range readyNodes { if _, ok := nodeAllocs[node.ID]; !ok { nodeAllocs[node.ID] = nil } @@ -255,7 +267,7 @@ func diffSystemAllocs( result := new(diffResult) for nodeID, allocs := range nodeAllocs { - diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, taintedNodes, required, allocs, terminal) + diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, notReadyNodes, taintedNodes, required, allocs, terminal) result.Append(diff) } @@ -264,7 +276,7 @@ func diffSystemAllocs( // readyNodesInDCs returns all the ready nodes in the given datacenters and a // mapping of each data center to the count of ready nodes. -func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int, error) { +func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]struct{}, map[string]int, error) { // Index the DCs dcMap := make(map[string]int, len(dcs)) for _, dc := range dcs { @@ -274,9 +286,10 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int // Scan the nodes ws := memdb.NewWatchSet() var out []*structs.Node + notReady := map[string]struct{}{} iter, err := state.Nodes(ws) if err != nil { - return nil, nil, err + return nil, nil, nil, err } for { raw := iter.Next() @@ -287,6 +300,7 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int // Filter on datacenter and status node := raw.(*structs.Node) if !node.Ready() { + notReady[node.ID] = struct{}{} continue } if _, ok := dcMap[node.Datacenter]; !ok { @@ -295,7 +309,7 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int out = append(out, node) dcMap[node.Datacenter]++ } - return out, dcMap, nil + return out, notReady, dcMap, nil } // retryMax is used to retry a callback until it returns success or diff --git a/scheduler/util_test.go b/scheduler/util_test.go index eba4227e677..2296c784397 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -61,7 +61,7 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) { }, } - diff := diffSystemAllocsForNode(job, "node1", eligible, tainted, required, live, terminal) + diff := diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal) require.Empty(t, diff.place) require.Empty(t, diff.update) require.Empty(t, diff.stop) @@ -87,7 +87,7 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) { expAlloc := terminal["node1"]["my-sysbatch.pinger[0]"] expAlloc.NodeID = "node1" - diff := diffSystemAllocsForNode(job, "node1", eligible, tainted, required, live, terminal) + diff := diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal) require.Empty(t, diff.place) require.Equal(t, 1, len(diff.update)) require.Empty(t, diff.stop) @@ -191,7 +191,7 @@ func TestDiffSystemAllocsForNode(t *testing.T) { }, } - diff := diffSystemAllocsForNode(job, "zip", eligible, tainted, required, allocs, terminal) + diff := diffSystemAllocsForNode(job, "zip", eligible, nil, tainted, required, allocs, terminal) place := diff.place update := diff.update migrate := diff.migrate @@ -274,7 +274,7 @@ func TestDiffSystemAllocsForNode_ExistingAllocIneligibleNode(t *testing.T) { // No terminal allocs terminal := make(structs.TerminalByNodeByName) - diff := diffSystemAllocsForNode(job, eligibleNode.ID, eligible, tainted, required, allocs, terminal) + diff := diffSystemAllocsForNode(job, eligibleNode.ID, eligible, nil, tainted, required, allocs, terminal) place := diff.place update := diff.update migrate := diff.migrate @@ -360,7 +360,7 @@ func TestDiffSystemAllocs(t *testing.T) { }, } - diff := diffSystemAllocs(job, nodes, tainted, allocs, terminal) + diff := diffSystemAllocs(job, nodes, nil, tainted, allocs, terminal) place := diff.place update := diff.update migrate := diff.migrate @@ -415,7 +415,7 @@ func TestReadyNodesInDCs(t *testing.T) { require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1002, node3)) require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1003, node4)) - nodes, dc, err := readyNodesInDCs(state, []string{"dc1", "dc2"}) + nodes, notReady, dc, err := readyNodesInDCs(state, []string{"dc1", "dc2"}) require.NoError(t, err) require.Equal(t, 2, len(nodes)) require.True(t, nodes[0].ID != node3.ID && nodes[1].ID != node3.ID) @@ -424,6 +424,9 @@ func TestReadyNodesInDCs(t *testing.T) { require.Equal(t, 1, dc["dc1"]) require.Contains(t, dc, "dc2") require.Equal(t, 1, dc["dc2"]) + + require.Contains(t, notReady, node3.ID) + require.Contains(t, notReady, node4.ID) } func TestRetryMax(t *testing.T) {