Skip to content

Commit

Permalink
scheduler: stop allocs in unrelated nodes (#11391)
Browse files Browse the repository at this point in the history
The system scheduler should leave allocs on draining nodes as-is, but
stop node stop allocs on nodes that are no longer part of the job
datacenters.

Previously, the scheduler did not make the distinction and left system
job allocs intact if they are already running.

I've added a failing test first, which you can see in https://app.circleci.com/jobs/github/hashicorp/nomad/179661 .

Fixes #11373
  • Loading branch information
Mahmood Ali authored Oct 27, 2021
1 parent 5f6ad87 commit 56a7cc6
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 68 deletions.
3 changes: 3 additions & 0 deletions .changelog/11391.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
core: Fix a bug to stop running system job allocations once their datacenters are removed from the job
```
2 changes: 1 addition & 1 deletion scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func (s *GenericScheduler) downgradedJobForPlacement(p placementResult) (string,
// destructive updates to place and the set of new placements to place.
func (s *GenericScheduler) computePlacements(destructive, place []placementResult) error {
// Get the base nodes
nodes, byDC, err := readyNodesInDCs(s.state, s.job.Datacenters)
nodes, _, byDC, err := readyNodesInDCs(s.state, s.job.Datacenters)
if err != nil {
return err
}
Expand Down
9 changes: 5 additions & 4 deletions scheduler/scheduler_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ type SystemScheduler struct {
ctx *EvalContext
stack *SystemStack

nodes []*structs.Node
nodesByDC map[string]int
nodes []*structs.Node
notReadyNodes map[string]struct{}
nodesByDC map[string]int

limitReached bool
nextEval *structs.Evaluation
Expand Down Expand Up @@ -122,7 +123,7 @@ func (s *SystemScheduler) process() (bool, error) {

// Get the ready nodes in the required datacenters
if !s.job.Stopped() {
s.nodes, s.nodesByDC, err = readyNodesInDCs(s.state, s.job.Datacenters)
s.nodes, s.notReadyNodes, s.nodesByDC, err = readyNodesInDCs(s.state, s.job.Datacenters)
if err != nil {
return false, fmt.Errorf("failed to get ready nodes: %v", err)
}
Expand Down Expand Up @@ -219,7 +220,7 @@ func (s *SystemScheduler) computeJobAllocs() error {
live, term := structs.SplitTerminalAllocs(allocs)

// Diff the required and existing allocations
diff := diffSystemAllocs(s.job, s.nodes, tainted, live, term)
diff := diffSystemAllocs(s.job, s.nodes, s.notReadyNodes, tainted, live, term)
s.logger.Debug("reconciled current state with desired state",
"place", len(diff.place), "update", len(diff.update),
"migrate", len(diff.migrate), "stop", len(diff.stop),
Expand Down
85 changes: 85 additions & 0 deletions scheduler/scheduler_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,91 @@ func TestSystemSched_JobModify_InPlace(t *testing.T) {
}
}

func TestSystemSched_JobModify_RemoveDC(t *testing.T) {
h := NewHarness(t)

// Create some nodes
node1 := mock.Node()
node1.Datacenter = "dc1"
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node1))

node2 := mock.Node()
node2.Datacenter = "dc2"
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node2))

fmt.Println("DC1 node: ", node1.ID)
fmt.Println("DC2 node: ", node2.ID)
nodes := []*structs.Node{node1, node2}

// Generate a fake job with allocations
job := mock.SystemJob()
job.Datacenters = []string{"dc1", "dc2"}
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job))

var allocs []*structs.Allocation
for _, node := range nodes {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
allocs = append(allocs, alloc)
}
require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs))

// Update the job
job2 := job.Copy()
job2.Datacenters = []string{"dc1"}
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job2))

// Create a mock evaluation to deal with update
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))

// Process the evaluation
err := h.Process(NewSystemScheduler, eval)
require.NoError(t, err)

// Ensure a single plan
require.Len(t, h.Plans, 1)
plan := h.Plans[0]

// Ensure the plan did not evict any allocs
var update []*structs.Allocation
for _, updateList := range plan.NodeUpdate {
update = append(update, updateList...)
}
require.Len(t, update, 1)

// Ensure the plan updated the existing allocs
var planned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
}
require.Len(t, planned, 1)

for _, p := range planned {
require.Equal(t, job2, p.Job, "should update job")
}

// Lookup the allocations by JobID
ws := memdb.NewWatchSet()
out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
require.NoError(t, err)

// Ensure all allocations placed
require.Len(t, out, 2)
h.AssertEvalStatus(t, structs.EvalStatusComplete)

}

func TestSystemSched_JobDeregister_Purged(t *testing.T) {
h := NewHarness(t)

Expand Down
40 changes: 27 additions & 13 deletions scheduler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,11 @@ func diffSystemAllocsForNode(
job *structs.Job, // job whose allocs are going to be diff-ed
nodeID string,
eligibleNodes map[string]*structs.Node,
taintedNodes map[string]*structs.Node, // nodes which are down or in drain (by node name)
notReadyNodes map[string]struct{}, // nodes that are not ready, e.g. draining
taintedNodes map[string]*structs.Node, // nodes which are down (by node id)
required map[string]*structs.TaskGroup, // set of allocations that must exist
allocs []*structs.Allocation, // non-terminal allocations that exist
terminal structs.TerminalByNodeByName, // latest terminal allocations (by node, name)
terminal structs.TerminalByNodeByName, // latest terminal allocations (by node, id)
) *diffResult {
result := new(diffResult)

Expand Down Expand Up @@ -139,10 +140,21 @@ func diffSystemAllocsForNode(

// For an existing allocation, if the nodeID is no longer
// eligible, the diff should be ignored
if _, ok := eligibleNodes[nodeID]; !ok {
if _, ok := notReadyNodes[nodeID]; ok {
goto IGNORE
}

// Existing allocations on nodes that are no longer targeted
// should be stopped
if _, ok := eligibleNodes[nodeID]; !ok {
result.stop = append(result.stop, allocTuple{
Name: name,
TaskGroup: tg,
Alloc: exist,
})
continue
}

// If the definition is updated we need to update
if job.JobModifyIndex != exist.Job.JobModifyIndex {
result.update = append(result.update, allocTuple{
Expand Down Expand Up @@ -229,21 +241,21 @@ func diffSystemAllocsForNode(
// diffResult contain the specific nodeID they should be allocated on.
func diffSystemAllocs(
job *structs.Job, // jobs whose allocations are going to be diff-ed
nodes []*structs.Node, // list of nodes in the ready state
taintedNodes map[string]*structs.Node, // nodes which are down or drain mode (by name)
readyNodes []*structs.Node, // list of nodes in the ready state
notReadyNodes map[string]struct{}, // list of nodes in DC but not ready, e.g. draining
taintedNodes map[string]*structs.Node, // nodes which are down or drain mode (by node id)
allocs []*structs.Allocation, // non-terminal allocations
terminal structs.TerminalByNodeByName, // latest terminal allocations (by name)
terminal structs.TerminalByNodeByName, // latest terminal allocations (by node id)
) *diffResult {

// Build a mapping of nodes to all their allocs.
nodeAllocs := make(map[string][]*structs.Allocation, len(allocs))
for _, alloc := range allocs {
nallocs := append(nodeAllocs[alloc.NodeID], alloc) //nolint:gocritic
nodeAllocs[alloc.NodeID] = nallocs
nodeAllocs[alloc.NodeID] = append(nodeAllocs[alloc.NodeID], alloc)
}

eligibleNodes := make(map[string]*structs.Node)
for _, node := range nodes {
for _, node := range readyNodes {
if _, ok := nodeAllocs[node.ID]; !ok {
nodeAllocs[node.ID] = nil
}
Expand All @@ -255,7 +267,7 @@ func diffSystemAllocs(

result := new(diffResult)
for nodeID, allocs := range nodeAllocs {
diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, taintedNodes, required, allocs, terminal)
diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, notReadyNodes, taintedNodes, required, allocs, terminal)
result.Append(diff)
}

Expand All @@ -264,7 +276,7 @@ func diffSystemAllocs(

// readyNodesInDCs returns all the ready nodes in the given datacenters and a
// mapping of each data center to the count of ready nodes.
func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int, error) {
func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]struct{}, map[string]int, error) {
// Index the DCs
dcMap := make(map[string]int, len(dcs))
for _, dc := range dcs {
Expand All @@ -274,9 +286,10 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int
// Scan the nodes
ws := memdb.NewWatchSet()
var out []*structs.Node
notReady := map[string]struct{}{}
iter, err := state.Nodes(ws)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
for {
raw := iter.Next()
Expand All @@ -287,6 +300,7 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int
// Filter on datacenter and status
node := raw.(*structs.Node)
if !node.Ready() {
notReady[node.ID] = struct{}{}
continue
}
if _, ok := dcMap[node.Datacenter]; !ok {
Expand All @@ -295,7 +309,7 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int
out = append(out, node)
dcMap[node.Datacenter]++
}
return out, dcMap, nil
return out, notReady, dcMap, nil
}

// retryMax is used to retry a callback until it returns success or
Expand Down
Loading

0 comments on commit 56a7cc6

Please sign in to comment.