Skip to content

Commit

Permalink
Consider all system jobs for a new node (#11054)
Browse files Browse the repository at this point in the history
When a node becomes ready, create an eval for all system jobs across
namespaces.

The previous code uses `job.ID` to deduplicate evals, but that ignores
the job namespace. Thus if there are multiple jobs in different
namespaces sharing the same ID/Name, only one will be considered for
running in the new node. Thus, Nomad may skip running some system jobs
in that node.
  • Loading branch information
Mahmood Ali authored Aug 18, 2021
1 parent 2bb0317 commit 327d461
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 9 deletions.
3 changes: 3 additions & 0 deletions .changelog/11054.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
core: Fixed a bug where system jobs with non-unique IDs may not be placed on new nodes
```
10 changes: 5 additions & 5 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1387,15 +1387,15 @@ func (n *Node) createNodeEvals(nodeID string, nodeIndex uint64) ([]string, uint6
// Create an eval for each JobID affected
var evals []*structs.Evaluation
var evalIDs []string
jobIDs := make(map[string]struct{})
jobIDs := map[structs.NamespacedID]struct{}{}
now := time.Now().UTC().UnixNano()

for _, alloc := range allocs {
// Deduplicate on JobID
if _, ok := jobIDs[alloc.JobID]; ok {
if _, ok := jobIDs[alloc.JobNamespacedID()]; ok {
continue
}
jobIDs[alloc.JobID] = struct{}{}
jobIDs[alloc.JobNamespacedID()] = struct{}{}

// Create a new eval
eval := &structs.Evaluation{
Expand All @@ -1418,10 +1418,10 @@ func (n *Node) createNodeEvals(nodeID string, nodeIndex uint64) ([]string, uint6
// Create an evaluation for each system job.
for _, job := range sysJobs {
// Still dedup on JobID as the node may already have the system job.
if _, ok := jobIDs[job.ID]; ok {
if _, ok := jobIDs[job.NamespacedID()]; ok {
continue
}
jobIDs[job.ID] = struct{}{}
jobIDs[job.NamespacedID()] = struct{}{}

// Create a new eval
eval := &structs.Evaluation{
Expand Down
61 changes: 61 additions & 0 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2707,6 +2707,67 @@ func TestClientEndpoint_CreateNodeEvals(t *testing.T) {
}
}

// TestClientEndpoint_CreateNodeEvals_MultipleNSes asserts that evals are made
// for all jobs across namespaces
func TestClientEndpoint_CreateNodeEvals_MultipleNSes(t *testing.T) {
t.Parallel()

s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)

state := s1.fsm.State()

idx := uint64(3)
ns1 := mock.Namespace()
err := state.UpsertNamespaces(idx, []*structs.Namespace{ns1})
require.NoError(t, err)
idx++

node := mock.Node()
err = state.UpsertNode(structs.MsgTypeTestSetup, idx, node)
require.NoError(t, err)
idx++

// Inject a fake system job.
defaultJob := mock.SystemJob()
err = state.UpsertJob(structs.MsgTypeTestSetup, idx, defaultJob)
require.NoError(t, err)
idx++

nsJob := mock.SystemJob()
nsJob.ID = defaultJob.ID
nsJob.Namespace = ns1.Name
err = state.UpsertJob(structs.MsgTypeTestSetup, idx, nsJob)
require.NoError(t, err)
idx++

// Create some evaluations
evalIDs, index, err := s1.staticEndpoints.Node.createNodeEvals(node.ID, 1)
require.NoError(t, err)
require.NotZero(t, index)
require.Len(t, evalIDs, 2)

byNS := map[string]*structs.Evaluation{}
for _, evalID := range evalIDs {
eval, err := state.EvalByID(nil, evalID)
require.NoError(t, err)
byNS[eval.Namespace] = eval
}

require.Len(t, byNS, 2)

defaultNSEval := byNS[defaultJob.Namespace]
require.NotNil(t, defaultNSEval)
require.Equal(t, defaultJob.ID, defaultNSEval.JobID)
require.Equal(t, defaultJob.Namespace, defaultNSEval.Namespace)

otherNSEval := byNS[nsJob.Namespace]
require.NotNil(t, otherNSEval)
require.Equal(t, nsJob.ID, otherNSEval.JobID)
require.Equal(t, nsJob.Namespace, otherNSEval.Namespace)
}

func TestClientEndpoint_Evaluate(t *testing.T) {
t.Parallel()

Expand Down
4 changes: 2 additions & 2 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4081,8 +4081,8 @@ type Job struct {
}

// NamespacedID returns the namespaced id useful for logging
func (j *Job) NamespacedID() *NamespacedID {
return &NamespacedID{
func (j *Job) NamespacedID() NamespacedID {
return NamespacedID{
ID: j.ID,
Namespace: j.Namespace,
}
Expand Down
4 changes: 2 additions & 2 deletions scheduler/rank.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ type BinPackIterator struct {
source RankIterator
evict bool
priority int
jobId *structs.NamespacedID
jobId structs.NamespacedID
taskGroup *structs.TaskGroup
memoryOversubscription bool
scoreFit func(*structs.Node, *structs.ComparableResources) float64
Expand Down Expand Up @@ -233,7 +233,7 @@ OUTER:
var allocsToPreempt []*structs.Allocation

// Initialize preemptor with node
preemptor := NewPreemptor(iter.priority, iter.ctx, iter.jobId)
preemptor := NewPreemptor(iter.priority, iter.ctx, &iter.jobId)
preemptor.SetNode(option.Node)

// Count the number of existing preemptions
Expand Down

0 comments on commit 327d461

Please sign in to comment.