From 0397cbc8d15686eb274b5a55e38a6dedda4fd5a0 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 7 Nov 2018 10:08:23 -0800 Subject: [PATCH 1/5] Duplicate blocked evals cancelling improved The old logic for cancelling duplicate blocked evaluations by job id had the issue where the newer evaluation could have additional node classes that it is (in)eligible for that we would not capture. This could make it such that cluster state could change such that the job would make progress but no evaluation was unblocked. --- nomad/blocked_evals.go | 85 ++++++++++++++++++++++++++++++------- nomad/blocked_evals_test.go | 22 ++++++++-- nomad/fsm_test.go | 5 ++- nomad/server.go | 14 +++--- nomad/worker.go | 3 +- scheduler/context.go | 26 +++++++----- scheduler/context_test.go | 23 ++++++++++ 7 files changed, 134 insertions(+), 44 deletions(-) diff --git a/nomad/blocked_evals.go b/nomad/blocked_evals.go index ee8386cb2e3..02360be2fd6 100644 --- a/nomad/blocked_evals.go +++ b/nomad/blocked_evals.go @@ -6,6 +6,8 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/consul/lib" + log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" ) @@ -29,6 +31,9 @@ const ( // allocations. It is unblocked when the capacity of a node that could run the // failed allocation becomes available. type BlockedEvals struct { + // logger is the logger to use by the blocked eval tracker. + logger log.Logger + evalBroker *EvalBroker enabled bool stats *BlockedStats @@ -102,8 +107,9 @@ type BlockedStats struct { // NewBlockedEvals creates a new blocked eval tracker that will enqueue // unblocked evals into the passed broker. -func NewBlockedEvals(evalBroker *EvalBroker) *BlockedEvals { +func NewBlockedEvals(evalBroker *EvalBroker, logger log.Logger) *BlockedEvals { return &BlockedEvals{ + logger: logger.Named("blocked_evals"), evalBroker: evalBroker, captured: make(map[string]wrappedEval), escaped: make(map[string]wrappedEval), @@ -176,21 +182,8 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) { return } - // Check if the job already has a blocked evaluation. If it does add it to - // the list of duplicates. We only ever want one blocked evaluation per job, - // otherwise we would create unnecessary work for the scheduler as multiple - // evals for the same job would be run, all producing the same outcome. - if _, existing := b.jobs[eval.JobID]; existing { - b.duplicates = append(b.duplicates, eval) - - // Unblock any waiter. - select { - case b.duplicateCh <- struct{}{}: - default: - } - - return - } + // Handle the new evaluation being for a job we are already tracking. + b.processBlockJobDuplicate(eval) // Check if the eval missed an unblock while it was in the scheduler at an // older index. The scheduler could have been invoked with a snapshot of @@ -234,6 +227,66 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) { b.captured[eval.ID] = wrapped } +// processBlockJobDuplicate handles the case where the new eval is for a job +// that we are already tracking. If the eval is a duplicate, we add the older +// evaluation by Raft index to the list of duplicates such that it can be +// cancelled. We only ever want one blocked evaluation per job, otherwise we +// would create unnecessary work for the scheduler as multiple evals for the +// same job would be run, all producing the same outcome. It is critical to +// prefer the newer evaluation, since it will contain the most up to date set of +// class eligibility. This should be called with the lock held. +func (b *BlockedEvals) processBlockJobDuplicate(eval *structs.Evaluation) { + existingID, hasExisting := b.jobs[eval.JobID] + if !hasExisting { + return + } + + var dup *structs.Evaluation + existingW, ok := b.captured[existingID] + if ok { + if latestEvalIndex(existingW.eval) <= latestEvalIndex(eval) { + delete(b.captured, existingID) + b.stats.TotalBlocked-- + dup = existingW.eval + } else { + dup = eval + } + } else { + existingW, ok = b.escaped[existingID] + if !ok { + // This is a programming error + b.logger.Error("existing blocked evaluation is niether tracked as captured or escaped", "existing_id", existingID) + delete(b.jobs, eval.JobID) + return + } + + if latestEvalIndex(existingW.eval) <= latestEvalIndex(eval) { + delete(b.escaped, existingID) + b.stats.TotalEscaped-- + dup = existingW.eval + } else { + dup = eval + } + } + + b.duplicates = append(b.duplicates, dup) + + // Unblock any waiter. + select { + case b.duplicateCh <- struct{}{}: + default: + } +} + +// latestEvalIndex returns the max of the evaluations create and snapshot index +func latestEvalIndex(eval *structs.Evaluation) uint64 { + if eval == nil { + return 0 + } + + return helper.Uint64Max(eval.CreateIndex, eval.SnapshotIndex) +} + // missedUnblock returns whether an evaluation missed an unblock while it was in // the scheduler. Since the scheduler can operate at an index in the past, the // evaluation may have been processed missing data that would allow it to diff --git a/nomad/blocked_evals_test.go b/nomad/blocked_evals_test.go index 009fadceb1c..7a1e15afe9e 100644 --- a/nomad/blocked_evals_test.go +++ b/nomad/blocked_evals_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" @@ -14,7 +15,7 @@ import ( func testBlockedEvals(t *testing.T) (*BlockedEvals, *EvalBroker) { broker := testBroker(t, 0) broker.SetEnabled(true) - blocked := NewBlockedEvals(broker) + blocked := NewBlockedEvals(broker, testlog.HCLogger(t)) blocked.SetEnabled(true) return blocked, broker } @@ -99,10 +100,16 @@ func TestBlockedEvals_GetDuplicates(t *testing.T) { // Create duplicate blocked evals and add them to the blocked tracker. e := mock.Eval() + e.CreateIndex = 100 e2 := mock.Eval() e2.JobID = e.JobID + e2.CreateIndex = 101 e3 := mock.Eval() e3.JobID = e.JobID + e3.CreateIndex = 102 + e4 := mock.Eval() + e4.JobID = e.JobID + e4.CreateIndex = 100 blocked.Block(e) blocked.Block(e2) @@ -114,8 +121,8 @@ func TestBlockedEvals_GetDuplicates(t *testing.T) { // Get the duplicates. out := blocked.GetDuplicates(0) - if len(out) != 1 || !reflect.DeepEqual(out[0], e2) { - t.Fatalf("bad: %#v %#v", out, e2) + if len(out) != 1 || !reflect.DeepEqual(out[0], e) { + t.Fatalf("bad: %#v %#v", out, e) } // Call block again after a small sleep. @@ -126,9 +133,16 @@ func TestBlockedEvals_GetDuplicates(t *testing.T) { // Get the duplicates. out = blocked.GetDuplicates(1 * time.Second) - if len(out) != 1 || !reflect.DeepEqual(out[0], e3) { + if len(out) != 1 || !reflect.DeepEqual(out[0], e2) { t.Fatalf("bad: %#v %#v", out, e2) } + + // Add an older evaluation and assert it gets cancelled + blocked.Block(e4) + out = blocked.GetDuplicates(0) + if len(out) != 1 || !reflect.DeepEqual(out[0], e4) { + t.Fatalf("bad: %#v %#v", out, e4) + } } func TestBlockedEvals_UnblockEscaped(t *testing.T) { diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index c8802bf9bd3..6b5f0c44b8b 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -48,11 +48,12 @@ func testStateStore(t *testing.T) *state.StateStore { func testFSM(t *testing.T) *nomadFSM { broker := testBroker(t, 0) dispatcher, _ := testPeriodicDispatcher(t) + logger := testlog.HCLogger(t) fsmConfig := &FSMConfig{ EvalBroker: broker, Periodic: dispatcher, - Blocked: NewBlockedEvals(broker), - Logger: testlog.HCLogger(t), + Blocked: NewBlockedEvals(broker, logger), + Logger: logger, Region: "global", } fsm, err := NewFSM(fsmConfig) diff --git a/nomad/server.go b/nomad/server.go index 4bb10f346fb..a3ea18860cb 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -15,14 +15,12 @@ import ( "sync/atomic" "time" + "github.com/hashicorp/consul/agent/consul/autopilot" consulapi "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/lib" log "github.com/hashicorp/go-hclog" multierror "github.com/hashicorp/go-multierror" lru "github.com/hashicorp/golang-lru" - raftboltdb "github.com/hashicorp/raft-boltdb" - - "github.com/hashicorp/consul/agent/consul/autopilot" - "github.com/hashicorp/consul/lib" "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper/codec" "github.com/hashicorp/nomad/helper/pool" @@ -35,6 +33,7 @@ import ( "github.com/hashicorp/nomad/nomad/structs/config" "github.com/hashicorp/nomad/scheduler" "github.com/hashicorp/raft" + raftboltdb "github.com/hashicorp/raft-boltdb" "github.com/hashicorp/serf/serf" ) @@ -267,9 +266,6 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI) (*Server, error) return nil, err } - // Create a new blocked eval tracker. - blockedEvals := NewBlockedEvals(evalBroker) - // Configure TLS tlsConf, err := tlsutil.NewTLSConfiguration(config.TLSConfig, true, true) if err != nil { @@ -304,7 +300,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI) (*Server, error) reconcileCh: make(chan serf.Member, 32), eventCh: make(chan serf.Event, 256), evalBroker: evalBroker, - blockedEvals: blockedEvals, + blockedEvals: NewBlockedEvals(evalBroker, logger), rpcTLS: incomingTLS, aclCache: aclCache, shutdownCh: make(chan struct{}), @@ -401,7 +397,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI) (*Server, error) go s.planQueue.EmitStats(time.Second, s.shutdownCh) // Emit metrics for the blocked eval tracker. - go blockedEvals.EmitStats(time.Second, s.shutdownCh) + go s.blockedEvals.EmitStats(time.Second, s.shutdownCh) // Emit metrics for the Vault client. go s.vault.EmitStats(time.Second, s.shutdownCh) diff --git a/nomad/worker.go b/nomad/worker.go index 75b20c1c774..19442f47cce 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -6,10 +6,9 @@ import ( "sync" "time" + "github.com/armon/go-metrics" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" - - "github.com/armon/go-metrics" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/scheduler" ) diff --git a/scheduler/context.go b/scheduler/context.go index 031a3b45a51..406f709c3ec 100644 --- a/scheduler/context.go +++ b/scheduler/context.go @@ -5,7 +5,6 @@ import ( log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" - "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/nomad/structs" ) @@ -240,16 +239,6 @@ func (e *EvalEligibility) HasEscaped() bool { func (e *EvalEligibility) GetClasses() map[string]bool { elig := make(map[string]bool) - // Go through the job. - for class, feas := range e.job { - switch feas { - case EvalComputedClassEligible: - elig[class] = true - case EvalComputedClassIneligible: - elig[class] = false - } - } - // Go through the task groups. for _, classes := range e.taskGroups { for class, feas := range classes { @@ -267,6 +256,21 @@ func (e *EvalEligibility) GetClasses() map[string]bool { } } + // Go through the job. + for class, feas := range e.job { + switch feas { + case EvalComputedClassEligible: + // Only mark as eligible if it hasn't been marked before. This + // prevents the job marking a class as eligible when it is ineligible + // to all the task groups. + if _, ok := elig[class]; !ok { + elig[class] = true + } + case EvalComputedClassIneligible: + elig[class] = false + } + } + return elig } diff --git a/scheduler/context_test.go b/scheduler/context_test.go index 28416b04773..6802a238838 100644 --- a/scheduler/context_test.go +++ b/scheduler/context_test.go @@ -9,6 +9,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" ) func testContext(t testing.TB) (*state.StateStore, *EvalContext) { @@ -271,3 +272,25 @@ func TestEvalEligibility_GetClasses(t *testing.T) { t.Fatalf("GetClasses() returned %#v; want %#v", actClasses, expClasses) } } +func TestEvalEligibility_GetClasses_JobEligible_TaskGroupIneligible(t *testing.T) { + e := NewEvalEligibility() + e.SetJobEligibility(true, "v1:1") + e.SetTaskGroupEligibility(false, "foo", "v1:1") + + e.SetJobEligibility(true, "v1:2") + e.SetTaskGroupEligibility(false, "foo", "v1:2") + e.SetTaskGroupEligibility(true, "bar", "v1:2") + + e.SetJobEligibility(true, "v1:3") + e.SetTaskGroupEligibility(false, "foo", "v1:3") + e.SetTaskGroupEligibility(false, "bar", "v1:3") + + expClasses := map[string]bool{ + "v1:1": false, + "v1:2": true, + "v1:3": false, + } + + actClasses := e.GetClasses() + require.Equal(t, expClasses, actClasses) +} From 19af9c9b32fed6df6adf708856a9cc9ec43cbaa1 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 7 Nov 2018 10:22:08 -0800 Subject: [PATCH 2/5] Track jobs by namespace --- nomad/blocked_evals.go | 30 ++++++++++++++++-------------- nomad/blocked_evals_test.go | 4 ++-- nomad/fsm.go | 5 ++--- nomad/structs/structs.go | 8 ++++++++ 4 files changed, 28 insertions(+), 19 deletions(-) diff --git a/nomad/blocked_evals.go b/nomad/blocked_evals.go index 02360be2fd6..edf08bbf1af 100644 --- a/nomad/blocked_evals.go +++ b/nomad/blocked_evals.go @@ -52,7 +52,7 @@ type BlockedEvals struct { // jobs is the map of blocked job and is used to ensure that only one // blocked eval exists for each job. The value is the blocked evaluation ID. - jobs map[string]string + jobs map[structs.NamespacedID]string // unblockIndexes maps computed node classes or quota name to the index in // which they were unblocked. This is used to check if an evaluation could @@ -113,7 +113,7 @@ func NewBlockedEvals(evalBroker *EvalBroker, logger log.Logger) *BlockedEvals { evalBroker: evalBroker, captured: make(map[string]wrappedEval), escaped: make(map[string]wrappedEval), - jobs: make(map[string]string), + jobs: make(map[structs.NamespacedID]string), unblockIndexes: make(map[string]uint64), capacityChangeCh: make(chan *capacityUpdate, unblockBuffer), duplicateCh: make(chan struct{}, 1), @@ -198,7 +198,7 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) { } // Mark the job as tracked. - b.jobs[eval.JobID] = eval.ID + b.jobs[structs.NewNamespacedID(eval.JobID, eval.Namespace)] = eval.ID b.stats.TotalBlocked++ // Track that the evaluation is being added due to reaching the quota limit @@ -236,7 +236,7 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) { // prefer the newer evaluation, since it will contain the most up to date set of // class eligibility. This should be called with the lock held. func (b *BlockedEvals) processBlockJobDuplicate(eval *structs.Evaluation) { - existingID, hasExisting := b.jobs[eval.JobID] + existingID, hasExisting := b.jobs[structs.NewNamespacedID(eval.JobID, eval.Namespace)] if !hasExisting { return } @@ -256,7 +256,7 @@ func (b *BlockedEvals) processBlockJobDuplicate(eval *structs.Evaluation) { if !ok { // This is a programming error b.logger.Error("existing blocked evaluation is niether tracked as captured or escaped", "existing_id", existingID) - delete(b.jobs, eval.JobID) + delete(b.jobs, structs.NewNamespacedID(eval.JobID, eval.Namespace)) return } @@ -344,7 +344,7 @@ func (b *BlockedEvals) missedUnblock(eval *structs.Evaluation) bool { // Untrack causes any blocked evaluation for the passed job to be no longer // tracked. Untrack is called when there is a successful evaluation for the job // and a blocked evaluation is no longer needed. -func (b *BlockedEvals) Untrack(jobID string) { +func (b *BlockedEvals) Untrack(jobID, namespace string) { b.l.Lock() defer b.l.Unlock() @@ -353,8 +353,10 @@ func (b *BlockedEvals) Untrack(jobID string) { return } + nsID := structs.NewNamespacedID(jobID, namespace) + // Get the evaluation ID to cancel - evalID, ok := b.jobs[jobID] + evalID, ok := b.jobs[nsID] if !ok { // No blocked evaluation so exit return @@ -362,7 +364,7 @@ func (b *BlockedEvals) Untrack(jobID string) { // Attempt to delete the evaluation if w, ok := b.captured[evalID]; ok { - delete(b.jobs, w.eval.JobID) + delete(b.jobs, nsID) delete(b.captured, evalID) b.stats.TotalBlocked-- if w.eval.QuotaLimitReached != "" { @@ -371,7 +373,7 @@ func (b *BlockedEvals) Untrack(jobID string) { } if w, ok := b.escaped[evalID]; ok { - delete(b.jobs, w.eval.JobID) + delete(b.jobs, nsID) delete(b.escaped, evalID) b.stats.TotalEscaped-- b.stats.TotalBlocked-- @@ -493,7 +495,7 @@ func (b *BlockedEvals) unblock(computedClass, quota string, index uint64) { for id, wrapped := range b.escaped { unblocked[wrapped.eval] = wrapped.token delete(b.escaped, id) - delete(b.jobs, wrapped.eval.JobID) + delete(b.jobs, structs.NewNamespacedID(wrapped.eval.JobID, wrapped.eval.Namespace)) if wrapped.eval.QuotaLimitReached != "" { numQuotaLimit++ @@ -520,7 +522,7 @@ func (b *BlockedEvals) unblock(computedClass, quota string, index uint64) { // is eligible based on the computed node class, or never seen the // computed node class. unblocked[wrapped.eval] = wrapped.token - delete(b.jobs, wrapped.eval.JobID) + delete(b.jobs, structs.NewNamespacedID(wrapped.eval.JobID, wrapped.eval.Namespace)) delete(b.captured, id) if wrapped.eval.QuotaLimitReached != "" { numQuotaLimit++ @@ -555,7 +557,7 @@ func (b *BlockedEvals) UnblockFailed() { if wrapped.eval.TriggeredBy == structs.EvalTriggerMaxPlans { unblocked[wrapped.eval] = wrapped.token delete(b.captured, id) - delete(b.jobs, wrapped.eval.JobID) + delete(b.jobs, structs.NewNamespacedID(wrapped.eval.JobID, wrapped.eval.Namespace)) if wrapped.eval.QuotaLimitReached != "" { quotaLimit++ } @@ -566,7 +568,7 @@ func (b *BlockedEvals) UnblockFailed() { if wrapped.eval.TriggeredBy == structs.EvalTriggerMaxPlans { unblocked[wrapped.eval] = wrapped.token delete(b.escaped, id) - delete(b.jobs, wrapped.eval.JobID) + delete(b.jobs, structs.NewNamespacedID(wrapped.eval.JobID, wrapped.eval.Namespace)) b.stats.TotalEscaped -= 1 if wrapped.eval.QuotaLimitReached != "" { quotaLimit++ @@ -624,7 +626,7 @@ func (b *BlockedEvals) Flush() { b.stats.TotalQuotaLimit = 0 b.captured = make(map[string]wrappedEval) b.escaped = make(map[string]wrappedEval) - b.jobs = make(map[string]string) + b.jobs = make(map[structs.NamespacedID]string) b.unblockIndexes = make(map[string]uint64) b.timetable = nil b.duplicates = nil diff --git a/nomad/blocked_evals_test.go b/nomad/blocked_evals_test.go index 7a1e15afe9e..4b3994b3481 100644 --- a/nomad/blocked_evals_test.go +++ b/nomad/blocked_evals_test.go @@ -661,7 +661,7 @@ func TestBlockedEvals_Untrack(t *testing.T) { } // Untrack and verify - blocked.Untrack(e.JobID) + blocked.Untrack(e.JobID, e.Namespace) bStats = blocked.Stats() if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 { t.Fatalf("bad: %#v", bStats) @@ -686,7 +686,7 @@ func TestBlockedEvals_Untrack_Quota(t *testing.T) { } // Untrack and verify - blocked.Untrack(e.JobID) + blocked.Untrack(e.JobID, e.Namespace) bs = blocked.Stats() if bs.TotalBlocked != 0 || bs.TotalEscaped != 0 || bs.TotalQuotaLimit != 0 { t.Fatalf("bad: %#v", bs) diff --git a/nomad/fsm.go b/nomad/fsm.go index 0111f0c35ca..74c1e50f995 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -7,10 +7,9 @@ import ( "sync" "time" + "github.com/armon/go-metrics" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" - - "github.com/armon/go-metrics" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -609,7 +608,7 @@ func (n *nomadFSM) handleUpsertedEval(eval *structs.Evaluation) { len(eval.FailedTGAllocs) == 0 { // If we have a successful evaluation for a node, untrack any // blocked evaluation - n.blockedEvals.Untrack(eval.JobID) + n.blockedEvals.Untrack(eval.JobID, eval.Namespace) } } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 1f269c4c062..6a48c38ea72 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -163,6 +163,14 @@ type NamespacedID struct { Namespace string } +// NewNamespacedID returns a new namespaced ID given the ID and namespace +func NewNamespacedID(id, ns string) NamespacedID { + return NamespacedID{ + ID: id, + Namespace: ns, + } +} + func (n NamespacedID) String() string { return fmt.Sprintf("", n.Namespace, n.ID) } From 2c6ea24f3c0e78a9aad1de318955431365fea22f Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 7 Nov 2018 11:59:24 -0800 Subject: [PATCH 3/5] fix test --- nomad/leader_test.go | 4 +++- scheduler/context_test.go | 7 ++----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/nomad/leader_test.go b/nomad/leader_test.go index 2475ab84487..dd02761e510 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -627,8 +627,10 @@ func TestLeader_ReapDuplicateEval(t *testing.T) { // Create a duplicate blocked eval eval := mock.Eval() + eval.CreateIndex = 100 eval2 := mock.Eval() eval2.JobID = eval.JobID + eval2.CreateIndex = 102 s1.blockedEvals.Block(eval) s1.blockedEvals.Block(eval2) @@ -636,7 +638,7 @@ func TestLeader_ReapDuplicateEval(t *testing.T) { state := s1.fsm.State() testutil.WaitForResult(func() (bool, error) { ws := memdb.NewWatchSet() - out, err := state.EvalByID(ws, eval2.ID) + out, err := state.EvalByID(ws, eval.ID) if err != nil { return false, err } diff --git a/scheduler/context_test.go b/scheduler/context_test.go index 6802a238838..da12e5fcd9f 100644 --- a/scheduler/context_test.go +++ b/scheduler/context_test.go @@ -1,7 +1,6 @@ package scheduler import ( - "reflect" "testing" "github.com/hashicorp/nomad/helper/testlog" @@ -260,7 +259,7 @@ func TestEvalEligibility_GetClasses(t *testing.T) { e.SetTaskGroupEligibility(false, "fizz", "v1:3") expClasses := map[string]bool{ - "v1:1": true, + "v1:1": false, "v1:2": false, "v1:3": true, "v1:4": false, @@ -268,9 +267,7 @@ func TestEvalEligibility_GetClasses(t *testing.T) { } actClasses := e.GetClasses() - if !reflect.DeepEqual(actClasses, expClasses) { - t.Fatalf("GetClasses() returned %#v; want %#v", actClasses, expClasses) - } + require.Equal(t, expClasses, actClasses) } func TestEvalEligibility_GetClasses_JobEligible_TaskGroupIneligible(t *testing.T) { e := NewEvalEligibility() From 361d384651ddf8c8027520fc6139c4ada3176ea8 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 8 Nov 2018 13:28:27 -0800 Subject: [PATCH 4/5] typo fix --- nomad/blocked_evals.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nomad/blocked_evals.go b/nomad/blocked_evals.go index edf08bbf1af..8a9f9576a76 100644 --- a/nomad/blocked_evals.go +++ b/nomad/blocked_evals.go @@ -255,7 +255,7 @@ func (b *BlockedEvals) processBlockJobDuplicate(eval *structs.Evaluation) { existingW, ok = b.escaped[existingID] if !ok { // This is a programming error - b.logger.Error("existing blocked evaluation is niether tracked as captured or escaped", "existing_id", existingID) + b.logger.Error("existing blocked evaluation is neither tracked as captured or escaped", "existing_id", existingID) delete(b.jobs, structs.NewNamespacedID(eval.JobID, eval.Namespace)) return } From 8460726980cb102133084959c9167a6bd8caff19 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 12 Nov 2018 16:02:23 -0800 Subject: [PATCH 5/5] Handle new eval being the duplicate properly --- nomad/blocked_evals.go | 16 +++++++++++++--- nomad/blocked_evals_test.go | 14 +++++++++++++- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/nomad/blocked_evals.go b/nomad/blocked_evals.go index 8a9f9576a76..9f9ca013fbb 100644 --- a/nomad/blocked_evals.go +++ b/nomad/blocked_evals.go @@ -183,7 +183,12 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) { } // Handle the new evaluation being for a job we are already tracking. - b.processBlockJobDuplicate(eval) + if b.processBlockJobDuplicate(eval) { + // If process block job duplicate returns true, the new evaluation has + // been marked as a duplicate and we have nothing to do, so return + // early. + return + } // Check if the eval missed an unblock while it was in the scheduler at an // older index. The scheduler could have been invoked with a snapshot of @@ -234,8 +239,9 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) { // would create unnecessary work for the scheduler as multiple evals for the // same job would be run, all producing the same outcome. It is critical to // prefer the newer evaluation, since it will contain the most up to date set of -// class eligibility. This should be called with the lock held. -func (b *BlockedEvals) processBlockJobDuplicate(eval *structs.Evaluation) { +// class eligibility. The return value is set to true, if the passed evaluation +// is cancelled. This should be called with the lock held. +func (b *BlockedEvals) processBlockJobDuplicate(eval *structs.Evaluation) (newCancelled bool) { existingID, hasExisting := b.jobs[structs.NewNamespacedID(eval.JobID, eval.Namespace)] if !hasExisting { return @@ -250,6 +256,7 @@ func (b *BlockedEvals) processBlockJobDuplicate(eval *structs.Evaluation) { dup = existingW.eval } else { dup = eval + newCancelled = true } } else { existingW, ok = b.escaped[existingID] @@ -266,6 +273,7 @@ func (b *BlockedEvals) processBlockJobDuplicate(eval *structs.Evaluation) { dup = existingW.eval } else { dup = eval + newCancelled = true } } @@ -276,6 +284,8 @@ func (b *BlockedEvals) processBlockJobDuplicate(eval *structs.Evaluation) { case b.duplicateCh <- struct{}{}: default: } + + return } // latestEvalIndex returns the max of the evaluations create and snapshot index diff --git a/nomad/blocked_evals_test.go b/nomad/blocked_evals_test.go index 4b3994b3481..8fe7db32f06 100644 --- a/nomad/blocked_evals_test.go +++ b/nomad/blocked_evals_test.go @@ -113,7 +113,7 @@ func TestBlockedEvals_GetDuplicates(t *testing.T) { blocked.Block(e) blocked.Block(e2) - // Verify block did track both + // Verify stats such that we are only tracking one bStats := blocked.Stats() if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 { t.Fatalf("bad: %#v", bStats) @@ -137,12 +137,24 @@ func TestBlockedEvals_GetDuplicates(t *testing.T) { t.Fatalf("bad: %#v %#v", out, e2) } + // Verify stats such that we are only tracking one + bStats = blocked.Stats() + if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 { + t.Fatalf("bad: %#v", bStats) + } + // Add an older evaluation and assert it gets cancelled blocked.Block(e4) out = blocked.GetDuplicates(0) if len(out) != 1 || !reflect.DeepEqual(out[0], e4) { t.Fatalf("bad: %#v %#v", out, e4) } + + // Verify stats such that we are only tracking one + bStats = blocked.Stats() + if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 { + t.Fatalf("bad: %#v", bStats) + } } func TestBlockedEvals_UnblockEscaped(t *testing.T) {