diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index b6e46281530..f8ea14ff917 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -57,7 +57,7 @@ type EvalBroker struct { jobEvals map[structs.NamespacedID]string // blocked tracks the blocked evaluations by JobID in a priority queue - blocked map[string]PendingEvaluations + blocked map[structs.NamespacedID]PendingEvaluations // ready tracks the ready jobs by scheduler in a priority queue ready map[string]PendingEvaluations @@ -119,7 +119,7 @@ func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration, stats: new(BrokerStats), evals: make(map[string]int), jobEvals: make(map[structs.NamespacedID]string), - blocked: make(map[string]PendingEvaluations), + blocked: make(map[structs.NamespacedID]PendingEvaluations), ready: make(map[string]PendingEvaluations), unack: make(map[string]*unackEval), waiting: make(map[string]chan struct{}), @@ -236,17 +236,17 @@ func (b *EvalBroker) enqueueLocked(eval *structs.Evaluation, queue string) { } // Check if there is an evaluation for this JobID pending - tuple := structs.NamespacedID{ + namespacedID := structs.NamespacedID{ ID: eval.JobID, Namespace: eval.Namespace, } - pendingEval := b.jobEvals[tuple] + pendingEval := b.jobEvals[namespacedID] if pendingEval == "" { - b.jobEvals[tuple] = eval.ID + b.jobEvals[namespacedID] = eval.ID } else if pendingEval != eval.ID { - blocked := b.blocked[eval.JobID] + blocked := b.blocked[namespacedID] heap.Push(&blocked, eval) - b.blocked[eval.JobID] = blocked + b.blocked[namespacedID] = blocked b.stats.TotalBlocked += 1 return } @@ -519,19 +519,19 @@ func (b *EvalBroker) Ack(evalID, token string) error { delete(b.unack, evalID) delete(b.evals, evalID) - tuple := structs.NamespacedID{ + namespacedID := structs.NamespacedID{ ID: jobID, Namespace: unack.Eval.Namespace, } - delete(b.jobEvals, tuple) + delete(b.jobEvals, namespacedID) // Check if there are any blocked evaluations - if blocked := b.blocked[jobID]; len(blocked) != 0 { + if blocked := b.blocked[namespacedID]; len(blocked) != 0 { raw := heap.Pop(&blocked) if len(blocked) > 0 { - b.blocked[jobID] = blocked + b.blocked[namespacedID] = blocked } else { - delete(b.blocked, jobID) + delete(b.blocked, namespacedID) } eval := raw.(*structs.Evaluation) b.stats.TotalBlocked -= 1 @@ -671,7 +671,7 @@ func (b *EvalBroker) Flush() { b.stats.ByScheduler = make(map[string]*SchedulerStats) b.evals = make(map[string]int) b.jobEvals = make(map[structs.NamespacedID]string) - b.blocked = make(map[string]PendingEvaluations) + b.blocked = make(map[structs.NamespacedID]PendingEvaluations) b.ready = make(map[string]PendingEvaluations) b.unack = make(map[string]*unackEval) b.timeWait = make(map[string]*time.Timer) diff --git a/nomad/eval_broker_test.go b/nomad/eval_broker_test.go index 90a040578ef..5275ed999f4 100644 --- a/nomad/eval_broker_test.go +++ b/nomad/eval_broker_test.go @@ -8,6 +8,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/require" ) var ( @@ -1295,3 +1296,45 @@ func TestEvalBroker_EnqueueAll_Requeue_Nack(t *testing.T) { t.Fatal(e) }) } + +func TestEvalBroker_NamespacedJobs(t *testing.T) { + t.Parallel() + b := testBroker(t, 0) + b.SetEnabled(true) + + // Create evals with the same jobid and different namespace + jobId := "test-jobID" + + eval1 := mock.Eval() + eval1.JobID = jobId + eval1.Namespace = "n1" + b.Enqueue(eval1) + + // This eval should not block + eval2 := mock.Eval() + eval2.JobID = jobId + eval2.Namespace = "default" + b.Enqueue(eval2) + + // This eval should block + eval3 := mock.Eval() + eval3.JobID = jobId + eval3.Namespace = "default" + b.Enqueue(eval3) + + require := require.New(t) + out1, _, err := b.Dequeue(defaultSched, 5*time.Millisecond) + require.Nil(err) + require.Equal(eval1.ID, out1.ID) + + out2, _, err := b.Dequeue(defaultSched, 5*time.Millisecond) + require.Nil(err) + require.Equal(eval2.ID, out2.ID) + + out3, _, err := b.Dequeue(defaultSched, 5*time.Millisecond) + require.Nil(err) + require.Nil(out3) + + require.Equal(1, len(b.blocked)) + +}