From 672f877d8c8f99f17cde45a6c12d85df958e07f6 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 28 Jan 2016 13:43:48 -0800 Subject: [PATCH 01/10] Schedulers create blocked eval if there are failed allocations --- nomad/blocked_evals.go | 1 + nomad/blocked_evals_test.go | 1 + nomad/structs/structs.go | 32 ++++++++++++ scheduler/context.go | 30 +++++++++++ scheduler/context_test.go | 30 +++++++++++ scheduler/generic_sched.go | 15 ++++++ scheduler/generic_sched_test.go | 92 +++++++++++++++++++++++++++++++++ scheduler/system_sched.go | 15 ++++++ scheduler/system_sched_test.go | 82 +++++++++++++++++++++++++++++ 9 files changed, 298 insertions(+) create mode 100644 nomad/blocked_evals.go create mode 100644 nomad/blocked_evals_test.go diff --git a/nomad/blocked_evals.go b/nomad/blocked_evals.go new file mode 100644 index 00000000000..1a253cd4900 --- /dev/null +++ b/nomad/blocked_evals.go @@ -0,0 +1 @@ +package nomad diff --git a/nomad/blocked_evals_test.go b/nomad/blocked_evals_test.go new file mode 100644 index 00000000000..1a253cd4900 --- /dev/null +++ b/nomad/blocked_evals_test.go @@ -0,0 +1 @@ +package nomad diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index fa7ac8bfc84..db58a726451 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1824,6 +1824,7 @@ func (a *AllocMetric) ScoreNode(node *Node, name string, score float64) { } const ( + EvalStatusBlocked = "blocked" EvalStatusPending = "pending" EvalStatusComplete = "complete" EvalStatusFailed = "failed" @@ -1912,6 +1913,18 @@ type Evaluation struct { // This is used to support rolling upgrades, where we need a chain of evaluations. PreviousEval string + // EligibleClasses are the computed node classes that have explicitely been + // marked as eligible for placement for some task groups of the job. + EligibleClasses []uint64 + + // IneligibleClasses are the computed node classes that have explicitely been + // marked as ineligible for placement for some task groups of the job. + IneligibleClasses []uint64 + + // EscapedComputedClass marks whether the job has constraints that are not + // captured by computed node classes. + EscapedComputedClass bool + // Raft Indexes CreateIndex uint64 ModifyIndex uint64 @@ -1980,6 +1993,25 @@ func (e *Evaluation) NextRollingEval(wait time.Duration) *Evaluation { } } +// BlockedEval creates a blocked evaluation to followup this eval to place any +// failed allocations. It takes the classes marked explicitely eligible or +// ineligible and whether the job has escaped computed node classes. +func (e *Evaluation) BlockedEval(elig, inelig []uint64, escaped bool) *Evaluation { + return &Evaluation{ + ID: GenerateUUID(), + Priority: e.Priority, + Type: e.Type, + TriggeredBy: e.TriggeredBy, + JobID: e.JobID, + JobModifyIndex: e.JobModifyIndex, + Status: EvalStatusBlocked, + PreviousEval: e.ID, + EligibleClasses: elig, + IneligibleClasses: inelig, + EscapedComputedClass: escaped, + } +} + // Plan is used to submit a commit plan for task allocations. These // are submitted to the leader which verifies that resources have // not been overcommitted before admiting the plan. diff --git a/scheduler/context.go b/scheduler/context.go index f4b92a17014..d809fdb338b 100644 --- a/scheduler/context.go +++ b/scheduler/context.go @@ -220,6 +220,36 @@ func (e *EvalEligibility) HasEscaped() bool { return false } +// GetClasses returns the eligible classes and the ineligible classes, +// respectively, across the job and task groups. +func (e *EvalEligibility) GetClasses() ([]uint64, []uint64) { + var elig, inelig []uint64 + + // Go through the job. + for class, feas := range e.job { + switch feas { + case EvalComputedClassEligible: + elig = append(elig, class) + case EvalComputedClassIneligible: + inelig = append(inelig, class) + } + } + + // Go through the task groups. + for _, classes := range e.taskGroups { + for class, feas := range classes { + switch feas { + case EvalComputedClassEligible: + elig = append(elig, class) + case EvalComputedClassIneligible: + inelig = append(inelig, class) + } + } + } + + return elig, inelig +} + // JobStatus returns the eligibility status of the job. func (e *EvalEligibility) JobStatus(class uint64) ComputedClassFeasibility { // COMPAT: Computed node class was introduced in 0.3. Clients running < 0.3 diff --git a/scheduler/context_test.go b/scheduler/context_test.go index 27d906ddbf1..d97f99113e0 100644 --- a/scheduler/context_test.go +++ b/scheduler/context_test.go @@ -3,6 +3,8 @@ package scheduler import ( "log" "os" + "reflect" + "sort" "testing" "github.com/hashicorp/nomad/nomad/mock" @@ -206,3 +208,31 @@ func TestEvalEligibility_SetJob(t *testing.T) { t.Fatalf("SetJob() should mark task group as escaped") } } + +type uint64Array []uint64 + +func (u uint64Array) Len() int { return len(u) } +func (u uint64Array) Less(i, j int) bool { return u[i] <= u[j] } +func (u uint64Array) Swap(i, j int) { u[i], u[j] = u[j], u[i] } + +func TestEvalEligibility_GetClasses(t *testing.T) { + e := NewEvalEligibility() + e.SetJobEligibility(true, 1) + e.SetJobEligibility(false, 2) + e.SetTaskGroupEligibility(true, "foo", 3) + e.SetTaskGroupEligibility(false, "bar", 4) + e.SetTaskGroupEligibility(true, "bar", 5) + expElig := []uint64{1, 3, 5} + expInelig := []uint64{2, 4} + + actElig, actInelig := e.GetClasses() + sort.Sort(uint64Array(actElig)) + sort.Sort(uint64Array(actInelig)) + + if !reflect.DeepEqual(actElig, expElig) { + t.Fatalf("GetClasses() returned %#v; want %#v", actElig, expElig) + } + if !reflect.DeepEqual(actInelig, expInelig) { + t.Fatalf("GetClasses() returned %#v; want %#v", actInelig, expInelig) + } +} diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 0f1fb757ae6..d772836aa1a 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -58,6 +58,8 @@ type GenericScheduler struct { limitReached bool nextEval *structs.Evaluation + + blocked *structs.Evaluation } // NewServiceScheduler is a factory function to instantiate a new service scheduler @@ -158,6 +160,19 @@ func (s *GenericScheduler) process() (bool, error) { s.logger.Printf("[DEBUG] sched: %#v: rolling update limit reached, next eval '%s' created", s.eval, s.nextEval.ID) } + // If there are failed allocations, we need to create a blocked evaluation + // to place the failed allocations when resources become available. + if len(s.plan.FailedAllocs) != 0 && s.blocked == nil { + e := s.ctx.Eligibility() + elig, inelig := e.GetClasses() + s.blocked = s.eval.BlockedEval(elig, inelig, e.HasEscaped()) + if err := s.planner.CreateEval(s.blocked); err != nil { + s.logger.Printf("[ERR] sched: %#v failed to make blocked eval: %v", s.eval, err) + return false, err + } + s.logger.Printf("[DEBUG] sched: %#v: failed to place all allocations, blocked eval '%s' created", s.eval, s.blocked.ID) + } + // Submit the plan result, newState, err := s.planner.SubmitPlan(s.plan) if err != nil { diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 81cc3fd5700..be46d17cd58 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -104,6 +104,11 @@ func TestServiceSched_JobRegister_AllocFail(t *testing.T) { } plan := h.Plans[0] + // Ensure the plan has created a follow up eval. + if len(h.CreateEvals) != 1 || h.CreateEvals[0].Status != structs.EvalStatusBlocked { + t.Fatalf("bad: %#v", h.CreateEvals) + } + // Ensure the plan failed to alloc if len(plan.FailedAllocs) != 1 { t.Fatalf("bad: %#v", plan) @@ -131,6 +136,93 @@ func TestServiceSched_JobRegister_AllocFail(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } +func TestServiceSched_JobRegister_BlockedEval(t *testing.T) { + h := NewHarness(t) + + // Create a full node + node := mock.Node() + node.Reserved = node.Resources + node.ComputeClass() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + + // Create an ineligible node + node2 := mock.Node() + node2.Attributes["kernel.name"] = "windows" + node2.ComputeClass() + noErr(t, h.State.UpsertNode(h.NextIndex(), node2)) + + // Create a jobs + job := mock.Job() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan has created a follow up eval. + if len(h.CreateEvals) != 1 { + t.Fatalf("bad: %#v", h.CreateEvals) + } + + created := h.CreateEvals[0] + if created.Status != structs.EvalStatusBlocked { + t.Fatalf("bad: %#v", created) + } + + if len(created.EligibleClasses) != 1 && len(created.IneligibleClasses) != 1 { + t.Fatalf("bad: %#v", created) + } + + if created.EscapedComputedClass { + t.Fatalf("bad: %#v", created) + } + + // Ensure the plan failed to alloc + if len(plan.FailedAllocs) != 1 { + t.Fatalf("bad: %#v", plan) + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure all allocations placed + if len(out) != 1 { + for _, a := range out { + t.Logf("%#v", a) + } + t.Fatalf("bad: %#v", out) + } + + // Check the coalesced failures + if out[0].Metrics.CoalescedFailures != 9 { + t.Fatalf("bad: %#v", out[0].Metrics) + } + + // Check the available nodes + if count, ok := out[0].Metrics.NodesAvailable["dc1"]; !ok || count != 2 { + t.Fatalf("bad: %#v", out[0].Metrics) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + func TestServiceSched_JobModify(t *testing.T) { h := NewHarness(t) diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index 9b352b81fb1..60d76a2b482 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -35,6 +35,8 @@ type SystemScheduler struct { limitReached bool nextEval *structs.Evaluation + + blocked *structs.Evaluation } // NewSystemScheduler is a factory function to instantiate a new system @@ -127,6 +129,19 @@ func (s *SystemScheduler) process() (bool, error) { s.logger.Printf("[DEBUG] sched: %#v: rolling update limit reached, next eval '%s' created", s.eval, s.nextEval.ID) } + // If there are failed allocations, we need to create a blocked evaluation + // to place the failed allocations when resources become available. + if len(s.plan.FailedAllocs) != 0 && s.blocked == nil { + e := s.ctx.Eligibility() + elig, inelig := e.GetClasses() + s.blocked = s.eval.BlockedEval(elig, inelig, e.HasEscaped()) + if err := s.planner.CreateEval(s.blocked); err != nil { + s.logger.Printf("[ERR] sched: %#v failed to make blocked eval: %v", s.eval, err) + return false, err + } + s.logger.Printf("[DEBUG] sched: %#v: failed to place all allocations, blocked eval '%s' created", s.eval, s.blocked.ID) + } + // Submit the plan result, newState, err := s.planner.SubmitPlan(s.plan) if err != nil { diff --git a/scheduler/system_sched_test.go b/scheduler/system_sched_test.go index fae5f322a76..a275a58b0c7 100644 --- a/scheduler/system_sched_test.go +++ b/scheduler/system_sched_test.go @@ -184,6 +184,88 @@ func TestSystemSched_JobRegister_AllocFail(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } +func TestSystemSched_JobRegister_BlockedEval(t *testing.T) { + h := NewHarness(t) + + // Create a full node + node := mock.Node() + node.Reserved = node.Resources + node.ComputeClass() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + + // Create an ineligible node + node2 := mock.Node() + node2.Attributes["kernel.name"] = "windows" + node2.ComputeClass() + noErr(t, h.State.UpsertNode(h.NextIndex(), node2)) + + // Create a jobs + job := mock.SystemJob() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan has created a follow up eval. + if len(h.CreateEvals) != 1 { + t.Fatalf("bad: %#v", h.CreateEvals) + } + + created := h.CreateEvals[0] + if created.Status != structs.EvalStatusBlocked { + t.Fatalf("bad: %#v", created) + } + + if len(created.EligibleClasses) != 1 && len(created.IneligibleClasses) != 1 { + t.Fatalf("bad: %#v", created) + } + + if created.EscapedComputedClass { + t.Fatalf("bad: %#v", created) + } + + // Ensure the plan failed to alloc + if len(plan.FailedAllocs) != 1 { + t.Fatalf("bad: %#v", plan) + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure all allocations placed + if len(out) != 1 { + for _, a := range out { + t.Logf("%#v", a) + } + t.Fatalf("bad: %#v", out) + } + + // Check the available nodes + if count, ok := out[0].Metrics.NodesAvailable["dc1"]; !ok || count != 2 { + t.Fatalf("bad: %#v", out[0].Metrics) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + func TestSystemSched_JobModify(t *testing.T) { h := NewHarness(t) From 253fa75b0ce0052fa98514a93be19c6e77e0e78a Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 29 Jan 2016 15:31:32 -0800 Subject: [PATCH 02/10] Blocked Eval tracker --- demo/vagrant/client2.hcl | 2 +- nomad/blocked_evals.go | 197 ++++++++++++++++++++++++++++++++++++ nomad/blocked_evals_test.go | 159 +++++++++++++++++++++++++++++ nomad/eval_broker.go | 12 +++ nomad/fsm.go | 38 ++++++- nomad/fsm_test.go | 123 +++++++++++++++++++++- nomad/leader.go | 26 +++-- nomad/server.go | 32 +++--- nomad/structs/structs.go | 37 +++++-- 9 files changed, 591 insertions(+), 35 deletions(-) diff --git a/demo/vagrant/client2.hcl b/demo/vagrant/client2.hcl index 839dc46af4a..1b1372ae2a8 100644 --- a/demo/vagrant/client2.hcl +++ b/demo/vagrant/client2.hcl @@ -15,7 +15,7 @@ client { # Set ourselves as thing one meta { - thing = "two" + ssd = "true" } } diff --git a/nomad/blocked_evals.go b/nomad/blocked_evals.go index 1a253cd4900..fda018cef77 100644 --- a/nomad/blocked_evals.go +++ b/nomad/blocked_evals.go @@ -1 +1,198 @@ package nomad + +import ( + "sync" + "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/nomad/nomad/structs" +) + +// BlockedEvals is used to track evaluations that shouldn't be queued until a +// certain class of nodes becomes available. An evaluation is put into the +// blocked state when it is run through the scheduler and produced failed +// allocations. It is unblocked when the capacity of a node that could run the +// failed allocation becomes available. +type BlockedEvals struct { + evalBroker *EvalBroker + enabled bool + stats *BlockedStats + + // captured is the set of evaluations that are captured by computed node + // classes. + captured map[string]*structs.Evaluation + + // escaped is the set of evaluations that have escaped computed node + // classes. + escaped map[string]*structs.Evaluation + + l sync.RWMutex +} + +// BlockedStats returns all the stats about the blocked eval tracker. +type BlockedStats struct { + // The number of blocked evaluations that have escaped computed node + // classses. + TotalEscaped int + + // The number of blocked evaluations that are captured by computed node + // classses. + TotalCaptured int +} + +// NewBlockedEvals creates a new blocked eval tracker that will enqueue +// unblocked evals into the passed broker. +func NewBlockedEvals(evalBroker *EvalBroker) *BlockedEvals { + return &BlockedEvals{ + evalBroker: evalBroker, + captured: make(map[string]*structs.Evaluation), + escaped: make(map[string]*structs.Evaluation), + stats: new(BlockedStats), + } +} + +// Enabled is used to check if the broker is enabled. +func (b *BlockedEvals) Enabled() bool { + b.l.RLock() + defer b.l.RUnlock() + return b.enabled +} + +// SetEnabled is used to control if the broker is enabled. The broker +// should only be enabled on the active leader. +func (b *BlockedEvals) SetEnabled(enabled bool) { + b.l.Lock() + b.enabled = enabled + b.l.Unlock() + if !enabled { + b.Flush() + } +} + +// Block tracks the passed evaluation and enqueues it into the eval broker when +// a suitable node calls unblock. +func (b *BlockedEvals) Block(eval *structs.Evaluation) { + b.l.Lock() + defer b.l.Unlock() + + // Do nothing if not enabled + if !b.enabled { + return + } + + if eval.EscapedComputedClass { + b.escaped[eval.ID] = eval + b.stats.TotalEscaped++ + return + } + + b.captured[eval.ID] = eval + b.stats.TotalCaptured++ +} + +// Unblock causes any evaluation that could potentially make progress on a +// capacity change on the passed computed node class to be enqueued into the +// eval broker. +func (b *BlockedEvals) Unblock(computedClass uint64) { + b.l.Lock() + defer b.l.Unlock() + + // Do nothing if not enabled + if !b.enabled { + return + } + + // Every eval that has escaped computed node class has to be unblocked + // because any node could potentially be feasible. + i := 0 + l := len(b.escaped) + var unblocked []*structs.Evaluation + if l != 0 { + unblocked = make([]*structs.Evaluation, l) + for id, eval := range b.escaped { + unblocked[i] = eval + delete(b.escaped, id) + i++ + } + } + + // Reset the escaped + b.stats.TotalEscaped = 0 + + // We unblock any eval that is explicitely eligible for the computed class + // and also any eval that is not eligible or uneligible. This signifies that + // when the evaluation was originally run through the scheduler, that it + // never saw a node with the given computed class and thus needs to be + // unblocked for correctness. + var untrack []string + for id, eval := range b.captured { + if _, ok := eval.EligibleClasses[computedClass]; ok { + goto UNBLOCK + } else if _, ok := eval.IneligibleClasses[computedClass]; ok { + // Can skip because the eval has explicitely marked the node class + // as ineligible. + continue + } + + UNBLOCK: + // The computed node class has never been seen by the eval so we unblock + // it. + unblocked = append(unblocked, eval) + untrack = append(untrack, id) + } + + // Untrack the unblocked evals. + if l := len(untrack); l != 0 { + for _, id := range untrack { + delete(b.captured, id) + } + + // Update the stats on captured evals. + b.stats.TotalCaptured -= len(untrack) + } + + if len(unblocked) != 0 { + // Enqueue all the unblocked evals into the broker. + b.evalBroker.EnqueueAll(unblocked) + } +} + +// Flush is used to clear the state of blocked evaluations. +func (b *BlockedEvals) Flush() { + b.l.Lock() + defer b.l.Unlock() + + // Reset the blocked eval tracker. + b.stats.TotalEscaped = 0 + b.stats.TotalCaptured = 0 + b.captured = make(map[string]*structs.Evaluation) + b.escaped = make(map[string]*structs.Evaluation) +} + +// Stats is used to query the state of the blocked eval tracker. +func (b *BlockedEvals) Stats() *BlockedStats { + // Allocate a new stats struct + stats := new(BlockedStats) + + b.l.RLock() + defer b.l.RUnlock() + + // Copy all the stats + stats.TotalEscaped = b.stats.TotalEscaped + stats.TotalCaptured = b.stats.TotalCaptured + return stats +} + +// EmitStats is used to export metrics about the blocked eval tracker while enabled +func (b *BlockedEvals) EmitStats(period time.Duration, stopCh chan struct{}) { + for { + select { + case <-time.After(period): + stats := b.Stats() + metrics.SetGauge([]string{"nomad", "blocked_evals", "total_captured"}, float32(stats.TotalCaptured)) + metrics.SetGauge([]string{"nomad", "blocked_evals", "total_escaped"}, float32(stats.TotalEscaped)) + case <-stopCh: + return + } + } +} diff --git a/nomad/blocked_evals_test.go b/nomad/blocked_evals_test.go index 1a253cd4900..c7bf4ad9d61 100644 --- a/nomad/blocked_evals_test.go +++ b/nomad/blocked_evals_test.go @@ -1 +1,160 @@ package nomad + +import ( + "testing" + + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" +) + +func testBlockedEvals(t *testing.T) (*BlockedEvals, *EvalBroker) { + broker := testBroker(t, 0) + broker.SetEnabled(true) + blocked := NewBlockedEvals(broker) + blocked.SetEnabled(true) + return blocked, broker +} + +func TestBlockedEvals_Block_Disabled(t *testing.T) { + blocked, _ := testBlockedEvals(t) + blocked.SetEnabled(false) + + // Create an escaped eval and add it to the blocked tracker. + e := mock.Eval() + e.Status = structs.EvalStatusBlocked + e.EscapedComputedClass = true + blocked.Block(e) + + // Verify block did nothing + blockedStats := blocked.Stats() + if blockedStats.TotalEscaped != 0 || blockedStats.TotalCaptured != 0 { + t.Fatalf("bad: %#v", blockedStats) + } + +} + +func TestBlockedEvals_UnblockEscaped(t *testing.T) { + blocked, broker := testBlockedEvals(t) + + // Create an escaped eval and add it to the blocked tracker. + e := mock.Eval() + e.Status = structs.EvalStatusBlocked + e.EscapedComputedClass = true + blocked.Block(e) + + // Verify block caused the eval to be tracked + blockedStats := blocked.Stats() + if blockedStats.TotalEscaped != 1 { + t.Fatalf("bad: %#v", blockedStats) + } + + blocked.Unblock(123) + + // Verify Unblock caused an enqueue + brokerStats := broker.Stats() + if brokerStats.TotalReady != 1 { + t.Fatalf("bad: %#v", brokerStats) + } + + // Verify Unblock updates the stats + blockedStats = blocked.Stats() + if blockedStats.TotalEscaped != 0 { + t.Fatalf("bad: %#v", blockedStats) + } +} + +func TestBlockedEvals_UnblockEligible(t *testing.T) { + blocked, broker := testBlockedEvals(t) + + // Create a blocked eval that is eligible on a specific node class and add + // it to the blocked tracker. + e := mock.Eval() + e.Status = structs.EvalStatusBlocked + e.EligibleClasses = map[uint64]struct{}{123: struct{}{}} + blocked.Block(e) + + // Verify block caused the eval to be tracked + blockedStats := blocked.Stats() + if blockedStats.TotalCaptured != 1 { + t.Fatalf("bad: %#v", blockedStats) + } + + blocked.Unblock(123) + + // Verify Unblock caused an enqueue + brokerStats := broker.Stats() + if brokerStats.TotalReady != 1 { + t.Fatalf("bad: %#v", brokerStats) + } + + // Verify Unblock updates the stats + blockedStats = blocked.Stats() + if blockedStats.TotalCaptured != 0 { + t.Fatalf("bad: %#v", blockedStats) + } +} + +func TestBlockedEvals_UnblockIneligible(t *testing.T) { + blocked, broker := testBlockedEvals(t) + + // Create a blocked eval that is ineligible on a specific node class and add + // it to the blocked tracker. + e := mock.Eval() + e.Status = structs.EvalStatusBlocked + e.IneligibleClasses = map[uint64]struct{}{123: struct{}{}} + blocked.Block(e) + + // Verify block caused the eval to be tracked + blockedStats := blocked.Stats() + if blockedStats.TotalCaptured != 1 && blockedStats.TotalEscaped != 0 { + t.Fatalf("bad: %#v", blockedStats) + } + + // Should do nothing + blocked.Unblock(123) + + // Verify Unblock didn't cause an enqueue + brokerStats := broker.Stats() + if brokerStats.TotalReady != 0 { + t.Fatalf("bad: %#v", brokerStats) + } + + // Verify Unblock updates the stats + blockedStats = blocked.Stats() + if blockedStats.TotalCaptured != 1 { + t.Fatalf("bad: %#v", blockedStats) + } +} + +func TestBlockedEvals_UnblockUnknown(t *testing.T) { + blocked, broker := testBlockedEvals(t) + + // Create a blocked eval that is ineligible on a specific node class and add + // it to the blocked tracker. + e := mock.Eval() + e.Status = structs.EvalStatusBlocked + e.EligibleClasses = map[uint64]struct{}{123: struct{}{}} + e.IneligibleClasses = map[uint64]struct{}{456: struct{}{}} + blocked.Block(e) + + // Verify block caused the eval to be tracked + blockedStats := blocked.Stats() + if blockedStats.TotalCaptured != 1 && blockedStats.TotalEscaped != 0 { + t.Fatalf("bad: %#v", blockedStats) + } + + // Should unblock because the eval hasn't seen this node class. + blocked.Unblock(789) + + // Verify Unblock didn't cause an enqueue + brokerStats := broker.Stats() + if brokerStats.TotalReady != 1 { + t.Fatalf("bad: %#v", brokerStats) + } + + // Verify Unblock updates the stats + blockedStats = blocked.Stats() + if blockedStats.TotalCaptured != 0 { + t.Fatalf("bad: %#v", blockedStats) + } +} diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index 92e49996941..ead6f86bf71 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -128,7 +128,19 @@ func (b *EvalBroker) SetEnabled(enabled bool) { } } +// EnqueueAll is used to enqueue many evaluations. +// TODO: Update enqueueLocked to take a list and use heap.Fix instead of +// heap.Push in order to make the running time O(log(n+m)) instead of +// O(m*log(n)) where m is the size of the evals and n is the size of the +// existing heap. +func (b *EvalBroker) EnqueueAll(evals []*structs.Evaluation) { + for _, e := range evals { + b.Enqueue(e) + } +} + // Enqueue is used to enqueue an evaluation +// TODO: remove the error return value func (b *EvalBroker) Enqueue(eval *structs.Evaluation) error { b.l.Lock() defer b.l.Unlock() diff --git a/nomad/fsm.go b/nomad/fsm.go index 5fe4009a26a..7e9a57f3f2d 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -40,6 +40,7 @@ const ( // this outside the Server to avoid exposing this outside the package. type nomadFSM struct { evalBroker *EvalBroker + blockedEvals *BlockedEvals periodicDispatcher *PeriodicDispatch logOutput io.Writer logger *log.Logger @@ -60,7 +61,8 @@ type snapshotHeader struct { } // NewFSMPath is used to construct a new FSM with a blank state -func NewFSM(evalBroker *EvalBroker, periodic *PeriodicDispatch, logOutput io.Writer) (*nomadFSM, error) { +func NewFSM(evalBroker *EvalBroker, periodic *PeriodicDispatch, + blocked *BlockedEvals, logOutput io.Writer) (*nomadFSM, error) { // Create a state store state, err := state.NewStateStore(logOutput) if err != nil { @@ -70,6 +72,7 @@ func NewFSM(evalBroker *EvalBroker, periodic *PeriodicDispatch, logOutput io.Wri fsm := &nomadFSM{ evalBroker: evalBroker, periodicDispatcher: periodic, + blockedEvals: blocked, logOutput: logOutput, logger: log.New(logOutput, "", log.LstdFlags), state: state, @@ -179,6 +182,19 @@ func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} { n.logger.Printf("[ERR] nomad.fsm: UpdateNodeStatus failed: %v", err) return err } + + // Unblock evals for the nodes computed node class if it is in a ready + // state. + if req.Status == structs.NodeStatusReady { + node, err := n.state.NodeByID(req.NodeID) + if err != nil { + n.logger.Printf("[ERR] nomad.fsm: looking up node %q failed: %v", req.NodeID, err) + return err + + } + n.blockedEvals.Unblock(node.ComputedClass) + } + return nil } @@ -312,6 +328,8 @@ func (n *nomadFSM) applyUpdateEval(buf []byte, index uint64) interface{} { n.logger.Printf("[ERR] nomad.fsm: failed to enqueue evaluation %s: %v", eval.ID, err) return err } + } else if eval.ShouldBlock() { + n.blockedEvals.Block(eval) } } return nil @@ -355,10 +373,26 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{} return nil } - if err := n.state.UpdateAllocFromClient(index, req.Alloc[0]); err != nil { + alloc := req.Alloc[0] + if err := n.state.UpdateAllocFromClient(index, alloc); err != nil { n.logger.Printf("[ERR] nomad.fsm: UpdateAllocFromClient failed: %v", err) return err } + + // Unblock evals for the nodes computed node class if the client has + // finished running an allocation. + if alloc.ClientStatus == structs.AllocClientStatusDead || + alloc.ClientStatus == structs.AllocClientStatusFailed { + nodeID := alloc.NodeID + node, err := n.state.NodeByID(nodeID) + if err != nil || node == nil { + n.logger.Printf("[ERR] nomad.fsm: looking up node %q failed: %v", nodeID, err) + return err + + } + n.blockedEvals.Unblock(node.ComputedClass) + } + return nil } diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index ef76486a842..24f5dff7ca5 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -44,7 +44,9 @@ func testStateStore(t *testing.T) *state.StateStore { func testFSM(t *testing.T) *nomadFSM { p, _ := testPeriodicDispatcher() - fsm, err := NewFSM(testBroker(t, 0), p, os.Stderr) + broker := testBroker(t, 0) + blocked := NewBlockedEvals(broker) + fsm, err := NewFSM(broker, p, blocked, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -140,6 +142,7 @@ func TestFSM_DeregisterNode(t *testing.T) { func TestFSM_UpdateNodeStatus(t *testing.T) { fsm := testFSM(t) + fsm.blockedEvals.SetEnabled(true) node := mock.Node() req := structs.NodeRegisterRequest{ @@ -155,6 +158,11 @@ func TestFSM_UpdateNodeStatus(t *testing.T) { t.Fatalf("resp: %v", resp) } + // Mark an eval as blocked. + eval := mock.Eval() + eval.EligibleClasses = map[uint64]struct{}{node.ComputedClass: struct{}{}} + fsm.blockedEvals.Block(eval) + req2 := structs.NodeUpdateStatusRequest{ NodeID: node.ID, Status: structs.NodeStatusReady, @@ -169,7 +177,7 @@ func TestFSM_UpdateNodeStatus(t *testing.T) { t.Fatalf("resp: %v", resp) } - // Verify we are NOT registered + // Verify the status is ready. node, err = fsm.State().NodeByID(req.Node.ID) if err != nil { t.Fatalf("err: %v", err) @@ -177,6 +185,12 @@ func TestFSM_UpdateNodeStatus(t *testing.T) { if node.Status != structs.NodeStatusReady { t.Fatalf("bad node: %#v", node) } + + // Verify the eval was unblocked. + bStats := fsm.blockedEvals.Stats() + if bStats.TotalCaptured != 0 { + t.Fatalf("bad: %#v", bStats) + } } func TestFSM_UpdateNodeDrain(t *testing.T) { @@ -357,6 +371,53 @@ func TestFSM_UpdateEval(t *testing.T) { } } +func TestFSM_UpdateEval_Blocked(t *testing.T) { + fsm := testFSM(t) + fsm.evalBroker.SetEnabled(true) + fsm.blockedEvals.SetEnabled(true) + + // Create a blocked eval. + eval := mock.Eval() + eval.Status = structs.EvalStatusBlocked + + req := structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{eval}, + } + buf, err := structs.Encode(structs.EvalUpdateRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are registered + out, err := fsm.State().EvalByID(eval.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("not found!") + } + if out.CreateIndex != 1 { + t.Fatalf("bad index: %d", out.CreateIndex) + } + + // Verify the eval wasn't enqueued + stats := fsm.evalBroker.Stats() + if stats.TotalReady != 0 { + t.Fatalf("bad: %#v %#v", stats, out) + } + + // Verify the eval was added to the blocked tracker. + bStats := fsm.blockedEvals.Stats() + if bStats.TotalCaptured != 1 { + t.Fatalf("bad: %#v %#v", bStats, out) + } +} + func TestFSM_DeleteEval(t *testing.T) { fsm := testFSM(t) @@ -452,6 +513,64 @@ func TestFSM_UpsertAllocs(t *testing.T) { } func TestFSM_UpdateAllocFromClient(t *testing.T) { + fsm := testFSM(t) + fsm.blockedEvals.SetEnabled(true) + state := fsm.State() + + node := mock.Node() + state.UpsertNode(1, node) + + // Mark an eval as blocked. + eval := mock.Eval() + eval.EligibleClasses = map[uint64]struct{}{node.ComputedClass: struct{}{}} + fsm.blockedEvals.Block(eval) + + bStats := fsm.blockedEvals.Stats() + if bStats.TotalCaptured != 1 { + t.Fatalf("bad: %#v", bStats) + } + + // Create a completed eval + alloc := mock.Alloc() + alloc.NodeID = node.ID + state.UpsertAllocs(1, []*structs.Allocation{alloc}) + + clientAlloc := new(structs.Allocation) + *clientAlloc = *alloc + clientAlloc.ClientStatus = structs.AllocClientStatusDead + + req := structs.AllocUpdateRequest{ + Alloc: []*structs.Allocation{clientAlloc}, + } + buf, err := structs.Encode(structs.AllocClientUpdateRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are registered + out, err := fsm.State().AllocByID(alloc.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + clientAlloc.CreateIndex = out.CreateIndex + clientAlloc.ModifyIndex = out.ModifyIndex + if !reflect.DeepEqual(clientAlloc, out) { + t.Fatalf("bad: %#v %#v", clientAlloc, out) + } + + // Verify the eval was unblocked. + bStats = fsm.blockedEvals.Stats() + if bStats.TotalCaptured != 0 { + t.Fatalf("bad: %#v %#v", bStats, out) + } +} + +func TestFSM_UpdateAllocFromClient_Unblock(t *testing.T) { fsm := testFSM(t) state := fsm.State() diff --git a/nomad/leader.go b/nomad/leader.go index b11ff81f04a..b511f3d13b9 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -113,8 +113,11 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Enable the eval broker, since we are now the leader s.evalBroker.SetEnabled(true) + // Enable the blocked eval tracker, since we are now the leader + s.blockedEvals.SetEnabled(true) + // Restore the eval broker state - if err := s.restoreEvalBroker(); err != nil { + if err := s.restoreEvals(); err != nil { return err } @@ -149,10 +152,11 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { return nil } -// restoreEvalBroker is used to restore all pending evaluations -// into the eval broker. The broker is maintained only by the leader, -// so it must be restored anytime a leadership transition takes place. -func (s *Server) restoreEvalBroker() error { +// restoreEvals is used to restore pending evaluations into the eval broker and +// blocked evaluations into the blocked eval tracker. The broker and blocked +// eval tracker is maintained only by the leader, so it must be restored anytime +// a leadership transition takes place. +func (s *Server) restoreEvals() error { // Get an iterator over every evaluation iter, err := s.fsm.State().Evals() if err != nil { @@ -166,12 +170,12 @@ func (s *Server) restoreEvalBroker() error { } eval := raw.(*structs.Evaluation) - if !eval.ShouldEnqueue() { - continue - } - - if err := s.evalBroker.Enqueue(eval); err != nil { - return fmt.Errorf("failed to enqueue evaluation %s: %v", eval.ID, err) + if eval.ShouldEnqueue() { + if err := s.evalBroker.Enqueue(eval); err != nil { + return fmt.Errorf("failed to enqueue evaluation %s: %v", eval.ID, err) + } + } else if eval.ShouldBlock() { + s.blockedEvals.Block(eval) } } return nil diff --git a/nomad/server.go b/nomad/server.go index b6c13826d80..491490d2ed7 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -109,6 +109,10 @@ type Server struct { // that are waiting to be brokered to a sub-scheduler evalBroker *EvalBroker + // BlockedEvals is used to manage evaluations that are blocked on node + // capacity changes. + blockedEvals *BlockedEvals + // planQueue is used to manage the submitted allocation // plans that are waiting to be assessed by the leader planQueue *PlanQueue @@ -164,6 +168,9 @@ func NewServer(config *Config) (*Server, error) { return nil, err } + // Create a new blocked eval tracker. + blockedEvals := NewBlockedEvals(evalBroker) + // Create a plan queue planQueue, err := NewPlanQueue() if err != nil { @@ -172,17 +179,18 @@ func NewServer(config *Config) (*Server, error) { // Create the server s := &Server{ - config: config, - connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, nil), - logger: logger, - rpcServer: rpc.NewServer(), - peers: make(map[string][]*serverParts), - localPeers: make(map[string]*serverParts), - reconcileCh: make(chan serf.Member, 32), - eventCh: make(chan serf.Event, 256), - evalBroker: evalBroker, - planQueue: planQueue, - shutdownCh: make(chan struct{}), + config: config, + connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, nil), + logger: logger, + rpcServer: rpc.NewServer(), + peers: make(map[string][]*serverParts), + localPeers: make(map[string]*serverParts), + reconcileCh: make(chan serf.Member, 32), + eventCh: make(chan serf.Event, 256), + evalBroker: evalBroker, + blockedEvals: blockedEvals, + planQueue: planQueue, + shutdownCh: make(chan struct{}), } // Create the periodic dispatcher for launching periodic jobs. @@ -415,7 +423,7 @@ func (s *Server) setupRaft() error { // Create the FSM var err error - s.fsm, err = NewFSM(s.evalBroker, s.periodicDispatcher, s.config.LogOutput) + s.fsm, err = NewFSM(s.evalBroker, s.periodicDispatcher, s.blockedEvals, s.config.LogOutput) if err != nil { return err } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index db58a726451..482bf0315dc 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1915,15 +1915,15 @@ type Evaluation struct { // EligibleClasses are the computed node classes that have explicitely been // marked as eligible for placement for some task groups of the job. - EligibleClasses []uint64 + EligibleClasses map[uint64]struct{} `json:"-"` // IneligibleClasses are the computed node classes that have explicitely been // marked as ineligible for placement for some task groups of the job. - IneligibleClasses []uint64 + IneligibleClasses map[uint64]struct{} `json:"-"` // EscapedComputedClass marks whether the job has constraints that are not // captured by computed node classes. - EscapedComputedClass bool + EscapedComputedClass bool `json:"-"` // Raft Indexes CreateIndex uint64 @@ -1951,12 +1951,26 @@ func (e *Evaluation) Copy() *Evaluation { return ne } -// ShouldEnqueue checks if a given evaluation should be enqueued +// ShouldEnqueue checks if a given evaluation should be enqueued into the +// eval_broker func (e *Evaluation) ShouldEnqueue() bool { switch e.Status { case EvalStatusPending: return true - case EvalStatusComplete, EvalStatusFailed: + case EvalStatusComplete, EvalStatusFailed, EvalStatusBlocked: + return false + default: + panic(fmt.Sprintf("unhandled evaluation (%s) status %s", e.ID, e.Status)) + } +} + +// ShouldBlock checks if a given evaluation should be entered into the blocked +// eval tracker. +func (e *Evaluation) ShouldBlock() bool { + switch e.Status { + case EvalStatusBlocked: + return true + case EvalStatusComplete, EvalStatusFailed, EvalStatusPending: return false default: panic(fmt.Sprintf("unhandled evaluation (%s) status %s", e.ID, e.Status)) @@ -1997,6 +2011,15 @@ func (e *Evaluation) NextRollingEval(wait time.Duration) *Evaluation { // failed allocations. It takes the classes marked explicitely eligible or // ineligible and whether the job has escaped computed node classes. func (e *Evaluation) BlockedEval(elig, inelig []uint64, escaped bool) *Evaluation { + eligSet := make(map[uint64]struct{}, len(elig)) + ineligSet := make(map[uint64]struct{}, len(inelig)) + for _, e := range elig { + eligSet[e] = struct{}{} + } + for _, i := range inelig { + ineligSet[i] = struct{}{} + } + return &Evaluation{ ID: GenerateUUID(), Priority: e.Priority, @@ -2006,8 +2029,8 @@ func (e *Evaluation) BlockedEval(elig, inelig []uint64, escaped bool) *Evaluatio JobModifyIndex: e.JobModifyIndex, Status: EvalStatusBlocked, PreviousEval: e.ID, - EligibleClasses: elig, - IneligibleClasses: inelig, + EligibleClasses: eligSet, + IneligibleClasses: ineligSet, EscapedComputedClass: escaped, } } From 6b641764f2ec51d596168c1903fa407d0339ac5c Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 29 Jan 2016 16:45:09 -0800 Subject: [PATCH 03/10] Rename counters --- nomad/blocked_evals.go | 32 ++++++++++++++------------------ nomad/blocked_evals_test.go | 14 +++++++------- nomad/fsm_test.go | 8 ++++---- 3 files changed, 25 insertions(+), 29 deletions(-) diff --git a/nomad/blocked_evals.go b/nomad/blocked_evals.go index fda018cef77..72795e49b52 100644 --- a/nomad/blocked_evals.go +++ b/nomad/blocked_evals.go @@ -31,13 +31,12 @@ type BlockedEvals struct { // BlockedStats returns all the stats about the blocked eval tracker. type BlockedStats struct { - // The number of blocked evaluations that have escaped computed node - // classses. + // TotalEscaped is the total number of blocked evaluations that have escaped + // computed node classes. TotalEscaped int - // The number of blocked evaluations that are captured by computed node - // classses. - TotalCaptured int + // TotalBlocked is the total number of blocked evaluations. + TotalBlocked int } // NewBlockedEvals creates a new blocked eval tracker that will enqueue @@ -80,6 +79,7 @@ func (b *BlockedEvals) Block(eval *structs.Evaluation) { return } + b.stats.TotalBlocked++ if eval.EscapedComputedClass { b.escaped[eval.ID] = eval b.stats.TotalEscaped++ @@ -87,7 +87,6 @@ func (b *BlockedEvals) Block(eval *structs.Evaluation) { } b.captured[eval.ID] = eval - b.stats.TotalCaptured++ } // Unblock causes any evaluation that could potentially make progress on a @@ -105,9 +104,8 @@ func (b *BlockedEvals) Unblock(computedClass uint64) { // Every eval that has escaped computed node class has to be unblocked // because any node could potentially be feasible. i := 0 - l := len(b.escaped) var unblocked []*structs.Evaluation - if l != 0 { + if l := len(b.escaped); l != 0 { unblocked = make([]*structs.Evaluation, l) for id, eval := range b.escaped { unblocked[i] = eval @@ -116,9 +114,6 @@ func (b *BlockedEvals) Unblock(computedClass uint64) { } } - // Reset the escaped - b.stats.TotalEscaped = 0 - // We unblock any eval that is explicitely eligible for the computed class // and also any eval that is not eligible or uneligible. This signifies that // when the evaluation was originally run through the scheduler, that it @@ -146,12 +141,13 @@ func (b *BlockedEvals) Unblock(computedClass uint64) { for _, id := range untrack { delete(b.captured, id) } - - // Update the stats on captured evals. - b.stats.TotalCaptured -= len(untrack) } - if len(unblocked) != 0 { + if l := len(unblocked); l != 0 { + // Update the counters + b.stats.TotalEscaped = 0 + b.stats.TotalBlocked -= l + // Enqueue all the unblocked evals into the broker. b.evalBroker.EnqueueAll(unblocked) } @@ -164,7 +160,7 @@ func (b *BlockedEvals) Flush() { // Reset the blocked eval tracker. b.stats.TotalEscaped = 0 - b.stats.TotalCaptured = 0 + b.stats.TotalBlocked = 0 b.captured = make(map[string]*structs.Evaluation) b.escaped = make(map[string]*structs.Evaluation) } @@ -179,7 +175,7 @@ func (b *BlockedEvals) Stats() *BlockedStats { // Copy all the stats stats.TotalEscaped = b.stats.TotalEscaped - stats.TotalCaptured = b.stats.TotalCaptured + stats.TotalBlocked = b.stats.TotalBlocked return stats } @@ -189,7 +185,7 @@ func (b *BlockedEvals) EmitStats(period time.Duration, stopCh chan struct{}) { select { case <-time.After(period): stats := b.Stats() - metrics.SetGauge([]string{"nomad", "blocked_evals", "total_captured"}, float32(stats.TotalCaptured)) + metrics.SetGauge([]string{"nomad", "blocked_evals", "total_blocked"}, float32(stats.TotalBlocked)) metrics.SetGauge([]string{"nomad", "blocked_evals", "total_escaped"}, float32(stats.TotalEscaped)) case <-stopCh: return diff --git a/nomad/blocked_evals_test.go b/nomad/blocked_evals_test.go index c7bf4ad9d61..e4d0686b01b 100644 --- a/nomad/blocked_evals_test.go +++ b/nomad/blocked_evals_test.go @@ -27,7 +27,7 @@ func TestBlockedEvals_Block_Disabled(t *testing.T) { // Verify block did nothing blockedStats := blocked.Stats() - if blockedStats.TotalEscaped != 0 || blockedStats.TotalCaptured != 0 { + if blockedStats.TotalBlocked != 0 || blockedStats.TotalEscaped != 0 { t.Fatalf("bad: %#v", blockedStats) } @@ -75,7 +75,7 @@ func TestBlockedEvals_UnblockEligible(t *testing.T) { // Verify block caused the eval to be tracked blockedStats := blocked.Stats() - if blockedStats.TotalCaptured != 1 { + if blockedStats.TotalBlocked != 1 { t.Fatalf("bad: %#v", blockedStats) } @@ -89,7 +89,7 @@ func TestBlockedEvals_UnblockEligible(t *testing.T) { // Verify Unblock updates the stats blockedStats = blocked.Stats() - if blockedStats.TotalCaptured != 0 { + if blockedStats.TotalBlocked != 0 { t.Fatalf("bad: %#v", blockedStats) } } @@ -106,7 +106,7 @@ func TestBlockedEvals_UnblockIneligible(t *testing.T) { // Verify block caused the eval to be tracked blockedStats := blocked.Stats() - if blockedStats.TotalCaptured != 1 && blockedStats.TotalEscaped != 0 { + if blockedStats.TotalBlocked != 1 && blockedStats.TotalEscaped != 0 { t.Fatalf("bad: %#v", blockedStats) } @@ -121,7 +121,7 @@ func TestBlockedEvals_UnblockIneligible(t *testing.T) { // Verify Unblock updates the stats blockedStats = blocked.Stats() - if blockedStats.TotalCaptured != 1 { + if blockedStats.TotalBlocked != 1 { t.Fatalf("bad: %#v", blockedStats) } } @@ -139,7 +139,7 @@ func TestBlockedEvals_UnblockUnknown(t *testing.T) { // Verify block caused the eval to be tracked blockedStats := blocked.Stats() - if blockedStats.TotalCaptured != 1 && blockedStats.TotalEscaped != 0 { + if blockedStats.TotalBlocked != 1 && blockedStats.TotalEscaped != 0 { t.Fatalf("bad: %#v", blockedStats) } @@ -154,7 +154,7 @@ func TestBlockedEvals_UnblockUnknown(t *testing.T) { // Verify Unblock updates the stats blockedStats = blocked.Stats() - if blockedStats.TotalCaptured != 0 { + if blockedStats.TotalBlocked != 0 { t.Fatalf("bad: %#v", blockedStats) } } diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 24f5dff7ca5..5a6df883000 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -188,7 +188,7 @@ func TestFSM_UpdateNodeStatus(t *testing.T) { // Verify the eval was unblocked. bStats := fsm.blockedEvals.Stats() - if bStats.TotalCaptured != 0 { + if bStats.TotalBlocked != 0 { t.Fatalf("bad: %#v", bStats) } } @@ -413,7 +413,7 @@ func TestFSM_UpdateEval_Blocked(t *testing.T) { // Verify the eval was added to the blocked tracker. bStats := fsm.blockedEvals.Stats() - if bStats.TotalCaptured != 1 { + if bStats.TotalBlocked != 1 { t.Fatalf("bad: %#v %#v", bStats, out) } } @@ -526,7 +526,7 @@ func TestFSM_UpdateAllocFromClient(t *testing.T) { fsm.blockedEvals.Block(eval) bStats := fsm.blockedEvals.Stats() - if bStats.TotalCaptured != 1 { + if bStats.TotalBlocked != 1 { t.Fatalf("bad: %#v", bStats) } @@ -565,7 +565,7 @@ func TestFSM_UpdateAllocFromClient(t *testing.T) { // Verify the eval was unblocked. bStats = fsm.blockedEvals.Stats() - if bStats.TotalCaptured != 0 { + if bStats.TotalBlocked != 0 { t.Fatalf("bad: %#v %#v", bStats, out) } } From 1797f0e67aba90278769bbf5ba62bc1ab2b5769b Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 29 Jan 2016 17:46:44 -0800 Subject: [PATCH 04/10] Make computed node class a string and add versioning --- nomad/blocked_evals.go | 15 +++++---- nomad/blocked_evals_test.go | 15 +++++---- nomad/fsm_test.go | 4 +-- nomad/node_endpoint_test.go | 4 +-- nomad/structs/node_class.go | 2 +- nomad/structs/node_class_test.go | 18 +++++------ nomad/structs/structs.go | 28 +++++------------ scheduler/context.go | 40 ++++++++++++------------ scheduler/context_test.go | 53 ++++++++++++++------------------ scheduler/feasible_test.go | 2 +- scheduler/generic_sched.go | 4 +-- scheduler/generic_sched_test.go | 5 +-- scheduler/system_sched.go | 4 +-- scheduler/system_sched_test.go | 5 +-- 14 files changed, 89 insertions(+), 110 deletions(-) diff --git a/nomad/blocked_evals.go b/nomad/blocked_evals.go index 72795e49b52..c537dec6542 100644 --- a/nomad/blocked_evals.go +++ b/nomad/blocked_evals.go @@ -92,7 +92,7 @@ func (b *BlockedEvals) Block(eval *structs.Evaluation) { // Unblock causes any evaluation that could potentially make progress on a // capacity change on the passed computed node class to be enqueued into the // eval broker. -func (b *BlockedEvals) Unblock(computedClass uint64) { +func (b *BlockedEvals) Unblock(computedClass string) { b.l.Lock() defer b.l.Unlock() @@ -121,15 +121,14 @@ func (b *BlockedEvals) Unblock(computedClass uint64) { // unblocked for correctness. var untrack []string for id, eval := range b.captured { - if _, ok := eval.EligibleClasses[computedClass]; ok { - goto UNBLOCK - } else if _, ok := eval.IneligibleClasses[computedClass]; ok { - // Can skip because the eval has explicitely marked the node class - // as ineligible. - continue + if elig, ok := eval.ClassEligibility[computedClass]; ok { + if !elig { + // Can skip because the eval has explicitely marked the node class + // as ineligible. + continue + } } - UNBLOCK: // The computed node class has never been seen by the eval so we unblock // it. unblocked = append(unblocked, eval) diff --git a/nomad/blocked_evals_test.go b/nomad/blocked_evals_test.go index e4d0686b01b..a02859871e3 100644 --- a/nomad/blocked_evals_test.go +++ b/nomad/blocked_evals_test.go @@ -48,7 +48,7 @@ func TestBlockedEvals_UnblockEscaped(t *testing.T) { t.Fatalf("bad: %#v", blockedStats) } - blocked.Unblock(123) + blocked.Unblock("v1:123") // Verify Unblock caused an enqueue brokerStats := broker.Stats() @@ -70,7 +70,7 @@ func TestBlockedEvals_UnblockEligible(t *testing.T) { // it to the blocked tracker. e := mock.Eval() e.Status = structs.EvalStatusBlocked - e.EligibleClasses = map[uint64]struct{}{123: struct{}{}} + e.ClassEligibility = map[string]bool{"v1:123": true} blocked.Block(e) // Verify block caused the eval to be tracked @@ -79,7 +79,7 @@ func TestBlockedEvals_UnblockEligible(t *testing.T) { t.Fatalf("bad: %#v", blockedStats) } - blocked.Unblock(123) + blocked.Unblock("v1:123") // Verify Unblock caused an enqueue brokerStats := broker.Stats() @@ -101,7 +101,7 @@ func TestBlockedEvals_UnblockIneligible(t *testing.T) { // it to the blocked tracker. e := mock.Eval() e.Status = structs.EvalStatusBlocked - e.IneligibleClasses = map[uint64]struct{}{123: struct{}{}} + e.ClassEligibility = map[string]bool{"v1:123": false} blocked.Block(e) // Verify block caused the eval to be tracked @@ -111,7 +111,7 @@ func TestBlockedEvals_UnblockIneligible(t *testing.T) { } // Should do nothing - blocked.Unblock(123) + blocked.Unblock("v1:123") // Verify Unblock didn't cause an enqueue brokerStats := broker.Stats() @@ -133,8 +133,7 @@ func TestBlockedEvals_UnblockUnknown(t *testing.T) { // it to the blocked tracker. e := mock.Eval() e.Status = structs.EvalStatusBlocked - e.EligibleClasses = map[uint64]struct{}{123: struct{}{}} - e.IneligibleClasses = map[uint64]struct{}{456: struct{}{}} + e.ClassEligibility = map[string]bool{"v1:123": true, "v1:456": false} blocked.Block(e) // Verify block caused the eval to be tracked @@ -144,7 +143,7 @@ func TestBlockedEvals_UnblockUnknown(t *testing.T) { } // Should unblock because the eval hasn't seen this node class. - blocked.Unblock(789) + blocked.Unblock("v1:789") // Verify Unblock didn't cause an enqueue brokerStats := broker.Stats() diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 5a6df883000..e5b3c1a20b8 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -160,7 +160,7 @@ func TestFSM_UpdateNodeStatus(t *testing.T) { // Mark an eval as blocked. eval := mock.Eval() - eval.EligibleClasses = map[uint64]struct{}{node.ComputedClass: struct{}{}} + eval.ClassEligibility = map[string]bool{node.ComputedClass: true} fsm.blockedEvals.Block(eval) req2 := structs.NodeUpdateStatusRequest{ @@ -522,7 +522,7 @@ func TestFSM_UpdateAllocFromClient(t *testing.T) { // Mark an eval as blocked. eval := mock.Eval() - eval.EligibleClasses = map[uint64]struct{}{node.ComputedClass: struct{}{}} + eval.ClassEligibility = map[string]bool{node.ComputedClass: true} fsm.blockedEvals.Block(eval) bStats := fsm.blockedEvals.Stats() diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 743e6127ed2..9de93fc6646 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -45,7 +45,7 @@ func TestClientEndpoint_Register(t *testing.T) { if out.CreateIndex != resp.Index { t.Fatalf("index mis-match") } - if out.ComputedClass == 0 { + if out.ComputedClass == "" { t.Fatal("ComputedClass not set") } } @@ -357,7 +357,7 @@ func TestClientEndpoint_GetNode(t *testing.T) { t.Fatalf("Bad index: %d %d", resp2.Index, resp.Index) } - if resp2.Node.ComputedClass == 0 { + if resp2.Node.ComputedClass == "" { t.Fatalf("bad ComputedClass: %#v", resp2.Node) } diff --git a/nomad/structs/node_class.go b/nomad/structs/node_class.go index b37bf926a07..44f0375d5a4 100644 --- a/nomad/structs/node_class.go +++ b/nomad/structs/node_class.go @@ -34,7 +34,7 @@ func (n *Node) ComputeClass() error { return err } - n.ComputedClass = hash + n.ComputedClass = fmt.Sprintf("v1:%d", hash) return nil } diff --git a/nomad/structs/node_class_test.go b/nomad/structs/node_class_test.go index 27c72d96012..1759bece79f 100644 --- a/nomad/structs/node_class_test.go +++ b/nomad/structs/node_class_test.go @@ -46,7 +46,7 @@ func TestNode_ComputedClass(t *testing.T) { if err := n.ComputeClass(); err != nil { t.Fatalf("ComputeClass() failed: %v", err) } - if n.ComputedClass == 0 { + if n.ComputedClass == "" { t.Fatal("ComputeClass() didn't set computed class") } old := n.ComputedClass @@ -64,7 +64,7 @@ func TestNode_ComputedClass(t *testing.T) { if err := n.ComputeClass(); err != nil { t.Fatalf("ComputeClass() failed: %v", err) } - if n.ComputedClass == 0 { + if n.ComputedClass == "" { t.Fatal("ComputeClass() didn't set computed class") } @@ -79,7 +79,7 @@ func TestNode_ComputedClass_Ignore(t *testing.T) { if err := n.ComputeClass(); err != nil { t.Fatalf("ComputeClass() failed: %v", err) } - if n.ComputedClass == 0 { + if n.ComputedClass == "" { t.Fatal("ComputeClass() didn't set computed class") } old := n.ComputedClass @@ -89,7 +89,7 @@ func TestNode_ComputedClass_Ignore(t *testing.T) { if err := n.ComputeClass(); err != nil { t.Fatalf("ComputeClass() failed: %v", err) } - if n.ComputedClass == 0 { + if n.ComputedClass == "" { t.Fatal("ComputeClass() didn't set computed class") } @@ -104,7 +104,7 @@ func TestNode_ComputedClass_Attr(t *testing.T) { if err := n.ComputeClass(); err != nil { t.Fatalf("ComputeClass() failed: %v", err) } - if n.ComputedClass == 0 { + if n.ComputedClass == "" { t.Fatal("ComputeClass() didn't set computed class") } old := n.ComputedClass @@ -123,7 +123,7 @@ func TestNode_ComputedClass_Attr(t *testing.T) { if err := n.ComputeClass(); err != nil { t.Fatalf("ComputeClass() failed: %v", err) } - if n.ComputedClass == 0 { + if n.ComputedClass == "" { t.Fatal("ComputeClass() didn't set computed class") } if old == n.ComputedClass { @@ -138,7 +138,7 @@ func TestNode_ComputedClass_Meta(t *testing.T) { if err := n.ComputeClass(); err != nil { t.Fatalf("ComputeClass() failed: %v", err) } - if n.ComputedClass == 0 { + if n.ComputedClass == "" { t.Fatal("ComputeClass() didn't set computed class") } old := n.ComputedClass @@ -148,7 +148,7 @@ func TestNode_ComputedClass_Meta(t *testing.T) { if err := n.ComputeClass(); err != nil { t.Fatalf("ComputeClass() failed: %v", err) } - if n.ComputedClass == 0 { + if n.ComputedClass == "" { t.Fatal("ComputeClass() didn't set computed class") } if old == n.ComputedClass { @@ -161,7 +161,7 @@ func TestNode_ComputedClass_Meta(t *testing.T) { if err := n.ComputeClass(); err != nil { t.Fatalf("ComputeClass() failed: %v", err) } - if n.ComputedClass == 0 { + if n.ComputedClass == "" { t.Fatal("ComputeClass() didn't set computed class") } if old != n.ComputedClass { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 482bf0315dc..7d0a566f825 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -512,7 +512,7 @@ type Node struct { // ComputedClass is a unique id that identifies nodes with a common set of // attributes and capabilities. - ComputedClass uint64 + ComputedClass string // Drain is controlled by the servers, and not the client. // If true, no jobs will be scheduled to this node, and existing @@ -1913,17 +1913,13 @@ type Evaluation struct { // This is used to support rolling upgrades, where we need a chain of evaluations. PreviousEval string - // EligibleClasses are the computed node classes that have explicitely been - // marked as eligible for placement for some task groups of the job. - EligibleClasses map[uint64]struct{} `json:"-"` - - // IneligibleClasses are the computed node classes that have explicitely been - // marked as ineligible for placement for some task groups of the job. - IneligibleClasses map[uint64]struct{} `json:"-"` + // ClassEligibility tracks computed node classes that have been explicitely + // marked as eligible or ineligible. + ClassEligibility map[string]bool // EscapedComputedClass marks whether the job has constraints that are not // captured by computed node classes. - EscapedComputedClass bool `json:"-"` + EscapedComputedClass bool // Raft Indexes CreateIndex uint64 @@ -2010,16 +2006,7 @@ func (e *Evaluation) NextRollingEval(wait time.Duration) *Evaluation { // BlockedEval creates a blocked evaluation to followup this eval to place any // failed allocations. It takes the classes marked explicitely eligible or // ineligible and whether the job has escaped computed node classes. -func (e *Evaluation) BlockedEval(elig, inelig []uint64, escaped bool) *Evaluation { - eligSet := make(map[uint64]struct{}, len(elig)) - ineligSet := make(map[uint64]struct{}, len(inelig)) - for _, e := range elig { - eligSet[e] = struct{}{} - } - for _, i := range inelig { - ineligSet[i] = struct{}{} - } - +func (e *Evaluation) BlockedEval(classEligibility map[string]bool, escaped bool) *Evaluation { return &Evaluation{ ID: GenerateUUID(), Priority: e.Priority, @@ -2029,8 +2016,7 @@ func (e *Evaluation) BlockedEval(elig, inelig []uint64, escaped bool) *Evaluatio JobModifyIndex: e.JobModifyIndex, Status: EvalStatusBlocked, PreviousEval: e.ID, - EligibleClasses: eligSet, - IneligibleClasses: ineligSet, + ClassEligibility: classEligibility, EscapedComputedClass: escaped, } } diff --git a/scheduler/context.go b/scheduler/context.go index d809fdb338b..b95a4e51ac0 100644 --- a/scheduler/context.go +++ b/scheduler/context.go @@ -164,14 +164,14 @@ const ( // course of an evaluation. type EvalEligibility struct { // job tracks the eligibility at the job level per computed node class. - job map[uint64]ComputedClassFeasibility + job map[string]ComputedClassFeasibility // jobEscaped marks whether constraints have escaped at the job level. jobEscaped bool // taskGroups tracks the eligibility at the task group level per computed // node class. - taskGroups map[string]map[uint64]ComputedClassFeasibility + taskGroups map[string]map[string]ComputedClassFeasibility // tgEscapedConstraints is a map of task groups to whether constraints have // escaped. @@ -181,8 +181,8 @@ type EvalEligibility struct { // NewEvalEligibility returns an eligibility tracker for the context of an evaluation. func NewEvalEligibility() *EvalEligibility { return &EvalEligibility{ - job: make(map[uint64]ComputedClassFeasibility), - taskGroups: make(map[string]map[uint64]ComputedClassFeasibility), + job: make(map[string]ComputedClassFeasibility), + taskGroups: make(map[string]map[string]ComputedClassFeasibility), tgEscapedConstraints: make(map[string]bool), } } @@ -220,18 +220,18 @@ func (e *EvalEligibility) HasEscaped() bool { return false } -// GetClasses returns the eligible classes and the ineligible classes, -// respectively, across the job and task groups. -func (e *EvalEligibility) GetClasses() ([]uint64, []uint64) { - var elig, inelig []uint64 +// GetClasses returns the tracked classes to their eligibility, across the job +// and task groups. +func (e *EvalEligibility) GetClasses() map[string]bool { + elig := make(map[string]bool) // Go through the job. for class, feas := range e.job { switch feas { case EvalComputedClassEligible: - elig = append(elig, class) + elig[class] = true case EvalComputedClassIneligible: - inelig = append(inelig, class) + elig[class] = false } } @@ -240,22 +240,22 @@ func (e *EvalEligibility) GetClasses() ([]uint64, []uint64) { for class, feas := range classes { switch feas { case EvalComputedClassEligible: - elig = append(elig, class) + elig[class] = true case EvalComputedClassIneligible: - inelig = append(inelig, class) + elig[class] = false } } } - return elig, inelig + return elig } // JobStatus returns the eligibility status of the job. -func (e *EvalEligibility) JobStatus(class uint64) ComputedClassFeasibility { +func (e *EvalEligibility) JobStatus(class string) ComputedClassFeasibility { // COMPAT: Computed node class was introduced in 0.3. Clients running < 0.3 // will not have a computed class. The safest value to return is the escaped // case, since it disables any optimization. - if e.jobEscaped || class == 0 { + if e.jobEscaped || class == "" { fmt.Println(e.jobEscaped, class) return EvalComputedClassEscaped } @@ -268,7 +268,7 @@ func (e *EvalEligibility) JobStatus(class uint64) ComputedClassFeasibility { // SetJobEligibility sets the eligibility status of the job for the computed // node class. -func (e *EvalEligibility) SetJobEligibility(eligible bool, class uint64) { +func (e *EvalEligibility) SetJobEligibility(eligible bool, class string) { if eligible { e.job[class] = EvalComputedClassEligible } else { @@ -277,11 +277,11 @@ func (e *EvalEligibility) SetJobEligibility(eligible bool, class uint64) { } // TaskGroupStatus returns the eligibility status of the task group. -func (e *EvalEligibility) TaskGroupStatus(tg string, class uint64) ComputedClassFeasibility { +func (e *EvalEligibility) TaskGroupStatus(tg, class string) ComputedClassFeasibility { // COMPAT: Computed node class was introduced in 0.3. Clients running < 0.3 // will not have a computed class. The safest value to return is the escaped // case, since it disables any optimization. - if class == 0 { + if class == "" { return EvalComputedClassEscaped } @@ -301,7 +301,7 @@ func (e *EvalEligibility) TaskGroupStatus(tg string, class uint64) ComputedClass // SetTaskGroupEligibility sets the eligibility status of the task group for the // computed node class. -func (e *EvalEligibility) SetTaskGroupEligibility(eligible bool, tg string, class uint64) { +func (e *EvalEligibility) SetTaskGroupEligibility(eligible bool, tg, class string) { var eligibility ComputedClassFeasibility if eligible { eligibility = EvalComputedClassEligible @@ -312,6 +312,6 @@ func (e *EvalEligibility) SetTaskGroupEligibility(eligible bool, tg string, clas if classes, ok := e.taskGroups[tg]; ok { classes[class] = eligibility } else { - e.taskGroups[tg] = map[uint64]ComputedClassFeasibility{class: eligibility} + e.taskGroups[tg] = map[string]ComputedClassFeasibility{class: eligibility} } } diff --git a/scheduler/context_test.go b/scheduler/context_test.go index d97f99113e0..782b526c3ba 100644 --- a/scheduler/context_test.go +++ b/scheduler/context_test.go @@ -4,7 +4,6 @@ import ( "log" "os" "reflect" - "sort" "testing" "github.com/hashicorp/nomad/nomad/mock" @@ -113,7 +112,7 @@ func TestEvalContext_ProposedAlloc(t *testing.T) { func TestEvalEligibility_JobStatus(t *testing.T) { e := NewEvalEligibility() - cc := uint64(100) + cc := "v1:100" // Get the job before its been set. if status := e.JobStatus(cc); status != EvalComputedClassUnknown { @@ -131,15 +130,15 @@ func TestEvalEligibility_JobStatus(t *testing.T) { t.Fatalf("JobStatus() returned %v; want %v", status, EvalComputedClassEligible) } - // Check that if I pass class zero it returns escaped - if status := e.JobStatus(0); status != EvalComputedClassEscaped { + // Check that if I pass an empty class it returns escaped + if status := e.JobStatus(""); status != EvalComputedClassEscaped { t.Fatalf("JobStatus() returned %v; want %v", status, EvalComputedClassEscaped) } } func TestEvalEligibility_TaskGroupStatus(t *testing.T) { e := NewEvalEligibility() - cc := uint64(100) + cc := "v1:100" tg := "foo" // Get the tg before its been set. @@ -158,8 +157,8 @@ func TestEvalEligibility_TaskGroupStatus(t *testing.T) { t.Fatalf("TaskGroupStatus() returned %v; want %v", status, EvalComputedClassEligible) } - // Check that if I pass class zero it returns escaped - if status := e.TaskGroupStatus(tg, 0); status != EvalComputedClassEscaped { + // Check that if I pass an empty class it returns escaped + if status := e.TaskGroupStatus(tg, ""); status != EvalComputedClassEscaped { t.Fatalf("TaskGroupStatus() returned %v; want %v", status, EvalComputedClassEscaped) } } @@ -209,30 +208,24 @@ func TestEvalEligibility_SetJob(t *testing.T) { } } -type uint64Array []uint64 - -func (u uint64Array) Len() int { return len(u) } -func (u uint64Array) Less(i, j int) bool { return u[i] <= u[j] } -func (u uint64Array) Swap(i, j int) { u[i], u[j] = u[j], u[i] } - func TestEvalEligibility_GetClasses(t *testing.T) { e := NewEvalEligibility() - e.SetJobEligibility(true, 1) - e.SetJobEligibility(false, 2) - e.SetTaskGroupEligibility(true, "foo", 3) - e.SetTaskGroupEligibility(false, "bar", 4) - e.SetTaskGroupEligibility(true, "bar", 5) - expElig := []uint64{1, 3, 5} - expInelig := []uint64{2, 4} - - actElig, actInelig := e.GetClasses() - sort.Sort(uint64Array(actElig)) - sort.Sort(uint64Array(actInelig)) - - if !reflect.DeepEqual(actElig, expElig) { - t.Fatalf("GetClasses() returned %#v; want %#v", actElig, expElig) - } - if !reflect.DeepEqual(actInelig, expInelig) { - t.Fatalf("GetClasses() returned %#v; want %#v", actInelig, expInelig) + e.SetJobEligibility(true, "v1:1") + e.SetJobEligibility(false, "v1:2") + e.SetTaskGroupEligibility(true, "foo", "v1:3") + e.SetTaskGroupEligibility(false, "bar", "v1:4") + e.SetTaskGroupEligibility(true, "bar", "v1:5") + + expClasses := map[string]bool{ + "v1:1": true, + "v1:2": false, + "v1:3": true, + "v1:4": false, + "v1:5": true, + } + + actClasses := e.GetClasses() + if !reflect.DeepEqual(actClasses, expClasses) { + t.Fatalf("GetClasses() returned %#v; want %#v", actClasses, expClasses) } } diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index 229eff5946e..5a69761ad97 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -733,7 +733,7 @@ func TestFeasibilityWrapper_JobEligible_TgEscaped(t *testing.T) { cc := nodes[0].ComputedClass ctx.Eligibility().job[cc] = EvalComputedClassEligible ctx.Eligibility().taskGroups["foo"] = - map[uint64]ComputedClassFeasibility{cc: EvalComputedClassEscaped} + map[string]ComputedClassFeasibility{cc: EvalComputedClassEscaped} wrapper.SetTaskGroup("foo") // Run the wrapper. diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index d772836aa1a..36d459efa18 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -164,8 +164,8 @@ func (s *GenericScheduler) process() (bool, error) { // to place the failed allocations when resources become available. if len(s.plan.FailedAllocs) != 0 && s.blocked == nil { e := s.ctx.Eligibility() - elig, inelig := e.GetClasses() - s.blocked = s.eval.BlockedEval(elig, inelig, e.HasEscaped()) + classes := e.GetClasses() + s.blocked = s.eval.BlockedEval(classes, e.HasEscaped()) if err := s.planner.CreateEval(s.blocked); err != nil { s.logger.Printf("[ERR] sched: %#v failed to make blocked eval: %v", s.eval, err) return false, err diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index be46d17cd58..2c77eff29ce 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -185,8 +185,9 @@ func TestServiceSched_JobRegister_BlockedEval(t *testing.T) { t.Fatalf("bad: %#v", created) } - if len(created.EligibleClasses) != 1 && len(created.IneligibleClasses) != 1 { - t.Fatalf("bad: %#v", created) + classes := created.ClassEligibility + if len(classes) != 2 || !classes[node.ComputedClass] || classes[node2.ComputedClass] { + t.Fatalf("bad: %#v", classes) } if created.EscapedComputedClass { diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index 60d76a2b482..cbb27c3d841 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -133,8 +133,8 @@ func (s *SystemScheduler) process() (bool, error) { // to place the failed allocations when resources become available. if len(s.plan.FailedAllocs) != 0 && s.blocked == nil { e := s.ctx.Eligibility() - elig, inelig := e.GetClasses() - s.blocked = s.eval.BlockedEval(elig, inelig, e.HasEscaped()) + classes := e.GetClasses() + s.blocked = s.eval.BlockedEval(classes, e.HasEscaped()) if err := s.planner.CreateEval(s.blocked); err != nil { s.logger.Printf("[ERR] sched: %#v failed to make blocked eval: %v", s.eval, err) return false, err diff --git a/scheduler/system_sched_test.go b/scheduler/system_sched_test.go index a275a58b0c7..ce61b93e93a 100644 --- a/scheduler/system_sched_test.go +++ b/scheduler/system_sched_test.go @@ -233,8 +233,9 @@ func TestSystemSched_JobRegister_BlockedEval(t *testing.T) { t.Fatalf("bad: %#v", created) } - if len(created.EligibleClasses) != 1 && len(created.IneligibleClasses) != 1 { - t.Fatalf("bad: %#v", created) + classes := created.ClassEligibility + if len(classes) != 2 || !classes[node.ComputedClass] || classes[node2.ComputedClass] { + t.Fatalf("bad: %#v", classes) } if created.EscapedComputedClass { From e9c936134189923d7847b77b417ce9600cc8f932 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 29 Jan 2016 18:18:29 -0800 Subject: [PATCH 05/10] Buffered unblock --- nomad/blocked_evals.go | 116 ++++++++++++++++++++++------------ nomad/blocked_evals_test.go | 122 +++++++++++++++++++++--------------- nomad/fsm_test.go | 28 ++++++--- 3 files changed, 166 insertions(+), 100 deletions(-) diff --git a/nomad/blocked_evals.go b/nomad/blocked_evals.go index c537dec6542..3573960d525 100644 --- a/nomad/blocked_evals.go +++ b/nomad/blocked_evals.go @@ -8,6 +8,11 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +const ( + // unblockBuffer is the buffer size for the unblock channel. + unblockBuffer = 8096 +) + // BlockedEvals is used to track evaluations that shouldn't be queued until a // certain class of nodes becomes available. An evaluation is put into the // blocked state when it is run through the scheduler and produced failed @@ -16,7 +21,9 @@ import ( type BlockedEvals struct { evalBroker *EvalBroker enabled bool + running bool stats *BlockedStats + l sync.RWMutex // captured is the set of evaluations that are captured by computed node // classes. @@ -26,7 +33,11 @@ type BlockedEvals struct { // classes. escaped map[string]*structs.Evaluation - l sync.RWMutex + // unblockCh is used to buffer unblocking of evaluations. + unblockCh chan string + + // stopCh is used to stop any created goroutines. + stopCh chan struct{} } // BlockedStats returns all the stats about the blocked eval tracker. @@ -46,6 +57,8 @@ func NewBlockedEvals(evalBroker *EvalBroker) *BlockedEvals { evalBroker: evalBroker, captured: make(map[string]*structs.Evaluation), escaped: make(map[string]*structs.Evaluation), + unblockCh: make(chan string, unblockBuffer), + stopCh: make(chan struct{}), stats: new(BlockedStats), } } @@ -62,6 +75,10 @@ func (b *BlockedEvals) Enabled() bool { func (b *BlockedEvals) SetEnabled(enabled bool) { b.l.Lock() b.enabled = enabled + if !b.running { + b.running = true + go b.unblock() + } b.l.Unlock() if !enabled { b.Flush() @@ -101,54 +118,63 @@ func (b *BlockedEvals) Unblock(computedClass string) { return } - // Every eval that has escaped computed node class has to be unblocked - // because any node could potentially be feasible. - i := 0 - var unblocked []*structs.Evaluation - if l := len(b.escaped); l != 0 { - unblocked = make([]*structs.Evaluation, l) - for id, eval := range b.escaped { - unblocked[i] = eval - delete(b.escaped, id) - i++ - } - } + b.unblockCh <- computedClass +} - // We unblock any eval that is explicitely eligible for the computed class - // and also any eval that is not eligible or uneligible. This signifies that - // when the evaluation was originally run through the scheduler, that it - // never saw a node with the given computed class and thus needs to be - // unblocked for correctness. - var untrack []string - for id, eval := range b.captured { - if elig, ok := eval.ClassEligibility[computedClass]; ok { - if !elig { - // Can skip because the eval has explicitely marked the node class - // as ineligible. - continue +func (b *BlockedEvals) unblock() { + select { + case <-b.stopCh: + return + case computedClass := <-b.unblockCh: + // Every eval that has escaped computed node class has to be unblocked + // because any node could potentially be feasible. + i := 0 + var unblocked []*structs.Evaluation + if l := len(b.escaped); l != 0 { + unblocked = make([]*structs.Evaluation, l) + for id, eval := range b.escaped { + unblocked[i] = eval + delete(b.escaped, id) + i++ } } - // The computed node class has never been seen by the eval so we unblock - // it. - unblocked = append(unblocked, eval) - untrack = append(untrack, id) - } + // We unblock any eval that is explicitely eligible for the computed class + // and also any eval that is not eligible or uneligible. This signifies that + // when the evaluation was originally run through the scheduler, that it + // never saw a node with the given computed class and thus needs to be + // unblocked for correctness. + var untrack []string + for id, eval := range b.captured { + if elig, ok := eval.ClassEligibility[computedClass]; ok { + if !elig { + // Can skip because the eval has explicitely marked the node class + // as ineligible. + continue + } + } - // Untrack the unblocked evals. - if l := len(untrack); l != 0 { - for _, id := range untrack { - delete(b.captured, id) + // The computed node class has never been seen by the eval so we unblock + // it. + unblocked = append(unblocked, eval) + untrack = append(untrack, id) + } + + // Untrack the unblocked evals. + if l := len(untrack); l != 0 { + for _, id := range untrack { + delete(b.captured, id) + } } - } - if l := len(unblocked); l != 0 { - // Update the counters - b.stats.TotalEscaped = 0 - b.stats.TotalBlocked -= l + if l := len(unblocked); l != 0 { + // Update the counters + b.stats.TotalEscaped = 0 + b.stats.TotalBlocked -= l - // Enqueue all the unblocked evals into the broker. - b.evalBroker.EnqueueAll(unblocked) + // Enqueue all the unblocked evals into the broker. + b.evalBroker.EnqueueAll(unblocked) + } } } @@ -157,11 +183,19 @@ func (b *BlockedEvals) Flush() { b.l.Lock() defer b.l.Unlock() + // Kill any running goroutines + if b.running { + close(b.stopCh) + b.running = false + } + // Reset the blocked eval tracker. b.stats.TotalEscaped = 0 b.stats.TotalBlocked = 0 b.captured = make(map[string]*structs.Evaluation) b.escaped = make(map[string]*structs.Evaluation) + b.unblockCh = make(chan string, unblockBuffer) + b.stopCh = make(chan struct{}) } // Stats is used to query the state of the blocked eval tracker. diff --git a/nomad/blocked_evals_test.go b/nomad/blocked_evals_test.go index a02859871e3..cee99f5814c 100644 --- a/nomad/blocked_evals_test.go +++ b/nomad/blocked_evals_test.go @@ -1,10 +1,12 @@ package nomad import ( + "fmt" "testing" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" ) func testBlockedEvals(t *testing.T) (*BlockedEvals, *EvalBroker) { @@ -26,11 +28,10 @@ func TestBlockedEvals_Block_Disabled(t *testing.T) { blocked.Block(e) // Verify block did nothing - blockedStats := blocked.Stats() - if blockedStats.TotalBlocked != 0 || blockedStats.TotalEscaped != 0 { - t.Fatalf("bad: %#v", blockedStats) + bStats := blocked.Stats() + if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 { + t.Fatalf("bad: %#v", bStats) } - } func TestBlockedEvals_UnblockEscaped(t *testing.T) { @@ -43,24 +44,29 @@ func TestBlockedEvals_UnblockEscaped(t *testing.T) { blocked.Block(e) // Verify block caused the eval to be tracked - blockedStats := blocked.Stats() - if blockedStats.TotalEscaped != 1 { - t.Fatalf("bad: %#v", blockedStats) + bStats := blocked.Stats() + if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 1 { + t.Fatalf("bad: %#v", bStats) } blocked.Unblock("v1:123") - // Verify Unblock caused an enqueue - brokerStats := broker.Stats() - if brokerStats.TotalReady != 1 { - t.Fatalf("bad: %#v", brokerStats) - } - - // Verify Unblock updates the stats - blockedStats = blocked.Stats() - if blockedStats.TotalEscaped != 0 { - t.Fatalf("bad: %#v", blockedStats) - } + testutil.WaitForResult(func() (bool, error) { + // Verify Unblock caused an enqueue + brokerStats := broker.Stats() + if brokerStats.TotalReady != 1 { + return false, fmt.Errorf("bad: %#v", brokerStats) + } + + // Verify Unblock updates the stats + bStats := blocked.Stats() + if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 { + return false, fmt.Errorf("bad: %#v", bStats) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) } func TestBlockedEvals_UnblockEligible(t *testing.T) { @@ -81,17 +87,22 @@ func TestBlockedEvals_UnblockEligible(t *testing.T) { blocked.Unblock("v1:123") - // Verify Unblock caused an enqueue - brokerStats := broker.Stats() - if brokerStats.TotalReady != 1 { - t.Fatalf("bad: %#v", brokerStats) - } - - // Verify Unblock updates the stats - blockedStats = blocked.Stats() - if blockedStats.TotalBlocked != 0 { - t.Fatalf("bad: %#v", blockedStats) - } + testutil.WaitForResult(func() (bool, error) { + // Verify Unblock caused an enqueue + brokerStats := broker.Stats() + if brokerStats.TotalReady != 1 { + return false, fmt.Errorf("bad: %#v", brokerStats) + } + + // Verify Unblock updates the stats + bStats := blocked.Stats() + if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 { + return false, fmt.Errorf("bad: %#v", bStats) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) } func TestBlockedEvals_UnblockIneligible(t *testing.T) { @@ -113,17 +124,21 @@ func TestBlockedEvals_UnblockIneligible(t *testing.T) { // Should do nothing blocked.Unblock("v1:123") - // Verify Unblock didn't cause an enqueue - brokerStats := broker.Stats() - if brokerStats.TotalReady != 0 { - t.Fatalf("bad: %#v", brokerStats) - } - - // Verify Unblock updates the stats - blockedStats = blocked.Stats() - if blockedStats.TotalBlocked != 1 { - t.Fatalf("bad: %#v", blockedStats) - } + testutil.WaitForResult(func() (bool, error) { + // Verify Unblock didn't cause an enqueue + brokerStats := broker.Stats() + if brokerStats.TotalReady != 0 { + return false, fmt.Errorf("bad: %#v", brokerStats) + } + + bStats := blocked.Stats() + if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 { + return false, fmt.Errorf("bad: %#v", bStats) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) } func TestBlockedEvals_UnblockUnknown(t *testing.T) { @@ -145,15 +160,20 @@ func TestBlockedEvals_UnblockUnknown(t *testing.T) { // Should unblock because the eval hasn't seen this node class. blocked.Unblock("v1:789") - // Verify Unblock didn't cause an enqueue - brokerStats := broker.Stats() - if brokerStats.TotalReady != 1 { - t.Fatalf("bad: %#v", brokerStats) - } - - // Verify Unblock updates the stats - blockedStats = blocked.Stats() - if blockedStats.TotalBlocked != 0 { - t.Fatalf("bad: %#v", blockedStats) - } + testutil.WaitForResult(func() (bool, error) { + // Verify Unblock causes an enqueue + brokerStats := broker.Stats() + if brokerStats.TotalReady != 1 { + return false, fmt.Errorf("bad: %#v", brokerStats) + } + + // Verify Unblock updates the stats + bStats := blocked.Stats() + if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 { + return false, fmt.Errorf("bad: %#v", bStats) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) } diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index e5b3c1a20b8..bb7f48f3de5 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -2,6 +2,7 @@ package nomad import ( "bytes" + "fmt" "os" "reflect" "testing" @@ -10,6 +11,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" "github.com/hashicorp/raft" ) @@ -187,10 +189,15 @@ func TestFSM_UpdateNodeStatus(t *testing.T) { } // Verify the eval was unblocked. - bStats := fsm.blockedEvals.Stats() - if bStats.TotalBlocked != 0 { - t.Fatalf("bad: %#v", bStats) - } + testutil.WaitForResult(func() (bool, error) { + bStats := fsm.blockedEvals.Stats() + if bStats.TotalBlocked != 0 { + return false, fmt.Errorf("bad: %#v", bStats) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) } func TestFSM_UpdateNodeDrain(t *testing.T) { @@ -564,10 +571,15 @@ func TestFSM_UpdateAllocFromClient(t *testing.T) { } // Verify the eval was unblocked. - bStats = fsm.blockedEvals.Stats() - if bStats.TotalBlocked != 0 { - t.Fatalf("bad: %#v %#v", bStats, out) - } + testutil.WaitForResult(func() (bool, error) { + bStats = fsm.blockedEvals.Stats() + if bStats.TotalBlocked != 0 { + return false, fmt.Errorf("bad: %#v %#v", bStats, out) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) } func TestFSM_UpdateAllocFromClient_Unblock(t *testing.T) { From 5956bee6d04b6784375c589cbd0b3332d34d8b18 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Sat, 30 Jan 2016 15:55:36 -0800 Subject: [PATCH 06/10] dedup blocked evals by job id --- nomad/blocked_evals.go | 180 +++++++++++++++++++++++++----------- nomad/blocked_evals_test.go | 56 +++++++++++ 2 files changed, 184 insertions(+), 52 deletions(-) diff --git a/nomad/blocked_evals.go b/nomad/blocked_evals.go index 3573960d525..0ed7be07e41 100644 --- a/nomad/blocked_evals.go +++ b/nomad/blocked_evals.go @@ -36,6 +36,20 @@ type BlockedEvals struct { // unblockCh is used to buffer unblocking of evaluations. unblockCh chan string + // jobs is the map of blocked job and is used to ensure that only one + // blocked eval exists for each job. + jobs map[string]struct{} + + // duplicates is the set of evaluations for jobs that had pre-existing + // blocked evaluations. These should be marked as cancelled since only one + // blocked eval is neeeded bper job. + duplicates []*structs.Evaluation + + // duplicateCh is used to signal that a duplicate eval was added to the + // duplicate set. It can be used to unblock waiting callers looking for + // duplicates. + duplicateCh chan struct{} + // stopCh is used to stop any created goroutines. stopCh chan struct{} } @@ -54,12 +68,14 @@ type BlockedStats struct { // unblocked evals into the passed broker. func NewBlockedEvals(evalBroker *EvalBroker) *BlockedEvals { return &BlockedEvals{ - evalBroker: evalBroker, - captured: make(map[string]*structs.Evaluation), - escaped: make(map[string]*structs.Evaluation), - unblockCh: make(chan string, unblockBuffer), - stopCh: make(chan struct{}), - stats: new(BlockedStats), + evalBroker: evalBroker, + captured: make(map[string]*structs.Evaluation), + escaped: make(map[string]*structs.Evaluation), + jobs: make(map[string]struct{}), + unblockCh: make(chan string, unblockBuffer), + duplicateCh: make(chan struct{}), + stopCh: make(chan struct{}), + stats: new(BlockedStats), } } @@ -96,7 +112,21 @@ func (b *BlockedEvals) Block(eval *structs.Evaluation) { return } + // Check if the job already has a blocked evaluation + if _, existing := b.jobs[eval.JobID]; existing { + b.duplicates = append(b.duplicates, eval) + + // Unblock any waiter. + select { + case b.duplicateCh <- struct{}{}: + default: + } + + return + } + b.stats.TotalBlocked++ + b.jobs[eval.JobID] = struct{}{} if eval.EscapedComputedClass { b.escaped[eval.ID] = eval b.stats.TotalEscaped++ @@ -110,9 +140,6 @@ func (b *BlockedEvals) Block(eval *structs.Evaluation) { // capacity change on the passed computed node class to be enqueued into the // eval broker. func (b *BlockedEvals) Unblock(computedClass string) { - b.l.Lock() - defer b.l.Unlock() - // Do nothing if not enabled if !b.enabled { return @@ -122,62 +149,108 @@ func (b *BlockedEvals) Unblock(computedClass string) { } func (b *BlockedEvals) unblock() { - select { - case <-b.stopCh: - return - case computedClass := <-b.unblockCh: - // Every eval that has escaped computed node class has to be unblocked - // because any node could potentially be feasible. - i := 0 - var unblocked []*structs.Evaluation - if l := len(b.escaped); l != 0 { - unblocked = make([]*structs.Evaluation, l) - for id, eval := range b.escaped { - unblocked[i] = eval - delete(b.escaped, id) - i++ + for { + select { + case <-b.stopCh: + return + case computedClass := <-b.unblockCh: + b.l.Lock() + + // Protect against the case of a flush. + if !b.running { + return } - } - // We unblock any eval that is explicitely eligible for the computed class - // and also any eval that is not eligible or uneligible. This signifies that - // when the evaluation was originally run through the scheduler, that it - // never saw a node with the given computed class and thus needs to be - // unblocked for correctness. - var untrack []string - for id, eval := range b.captured { - if elig, ok := eval.ClassEligibility[computedClass]; ok { - if !elig { - // Can skip because the eval has explicitely marked the node class - // as ineligible. - continue + // Every eval that has escaped computed node class has to be unblocked + // because any node could potentially be feasible. + i := 0 + var unblocked []*structs.Evaluation + if l := len(b.escaped); l != 0 { + unblocked = make([]*structs.Evaluation, l) + for id, eval := range b.escaped { + unblocked[i] = eval + delete(b.escaped, id) + delete(b.jobs, eval.JobID) + i++ } } - // The computed node class has never been seen by the eval so we unblock - // it. - unblocked = append(unblocked, eval) - untrack = append(untrack, id) - } + // We unblock any eval that is explicitely eligible for the computed class + // and also any eval that is not eligible or uneligible. This signifies that + // when the evaluation was originally run through the scheduler, that it + // never saw a node with the given computed class and thus needs to be + // unblocked for correctness. + var untrack []string + for id, eval := range b.captured { + if elig, ok := eval.ClassEligibility[computedClass]; ok { + if !elig { + // Can skip because the eval has explicitely marked the node class + // as ineligible. + continue + } + } - // Untrack the unblocked evals. - if l := len(untrack); l != 0 { - for _, id := range untrack { - delete(b.captured, id) + // The computed node class has never been seen by the eval so we unblock + // it. + unblocked = append(unblocked, eval) + untrack = append(untrack, id) + delete(b.jobs, eval.JobID) + } + + // Untrack the unblocked evals. + if l := len(untrack); l != 0 { + for _, id := range untrack { + delete(b.captured, id) + } } - } - if l := len(unblocked); l != 0 { - // Update the counters - b.stats.TotalEscaped = 0 - b.stats.TotalBlocked -= l + if l := len(unblocked); l != 0 { + // Update the counters + b.stats.TotalEscaped = 0 + b.stats.TotalBlocked -= l - // Enqueue all the unblocked evals into the broker. - b.evalBroker.EnqueueAll(unblocked) + // Enqueue all the unblocked evals into the broker. + b.evalBroker.EnqueueAll(unblocked) + } + b.l.Unlock() } } } +// GetDuplicates returns all the duplicate evaluations and blocks until the +// passed timeout. +func (b *BlockedEvals) GetDuplicates(timeout time.Duration) []*structs.Evaluation { + var timeoutTimer *time.Timer + var timeoutCh <-chan time.Time +SCAN: + b.l.Lock() + if len(b.duplicates) != 0 { + dups := b.duplicates + b.duplicates = nil + b.l.Unlock() + return dups + } + b.l.Unlock() + + // Create the timer + if timeoutTimer == nil && timeout != 0 { + timeoutTimer = time.NewTimer(timeout) + timeoutCh = timeoutTimer.C + defer timeoutTimer.Stop() + } + + select { + case <-b.stopCh: + return nil + case <-timeoutCh: + return nil + case <-b.duplicateCh: + goto SCAN + } + + return nil +} + // Flush is used to clear the state of blocked evaluations. func (b *BlockedEvals) Flush() { b.l.Lock() @@ -194,8 +267,11 @@ func (b *BlockedEvals) Flush() { b.stats.TotalBlocked = 0 b.captured = make(map[string]*structs.Evaluation) b.escaped = make(map[string]*structs.Evaluation) + b.jobs = make(map[string]struct{}) + b.duplicates = nil b.unblockCh = make(chan string, unblockBuffer) b.stopCh = make(chan struct{}) + b.duplicateCh = make(chan struct{}) } // Stats is used to query the state of the blocked eval tracker. diff --git a/nomad/blocked_evals_test.go b/nomad/blocked_evals_test.go index cee99f5814c..cf725a7a110 100644 --- a/nomad/blocked_evals_test.go +++ b/nomad/blocked_evals_test.go @@ -2,7 +2,9 @@ package nomad import ( "fmt" + "reflect" "testing" + "time" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -34,6 +36,60 @@ func TestBlockedEvals_Block_Disabled(t *testing.T) { } } +func TestBlockedEvals_Block_SameJob(t *testing.T) { + blocked, _ := testBlockedEvals(t) + + // Create two blocked evals and add them to the blocked tracker. + e := mock.Eval() + e2 := mock.Eval() + e2.JobID = e.JobID + blocked.Block(e) + blocked.Block(e2) + + // Verify block did track both + bStats := blocked.Stats() + if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 { + t.Fatalf("bad: %#v", bStats) + } +} + +func TestBlockedEvals_GetDuplicates(t *testing.T) { + blocked, _ := testBlockedEvals(t) + + // Create duplicate blocked evals and add them to the blocked tracker. + e := mock.Eval() + e2 := mock.Eval() + e2.JobID = e.JobID + e3 := mock.Eval() + e3.JobID = e.JobID + blocked.Block(e) + blocked.Block(e2) + + // Verify block did track both + bStats := blocked.Stats() + if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 { + t.Fatalf("bad: %#v", bStats) + } + + // Get the duplicates. + out := blocked.GetDuplicates(0) + if len(out) != 1 || !reflect.DeepEqual(out[0], e2) { + t.Fatalf("bad: %#v %#v", out, e2) + } + + // Call block again after a small sleep. + go func() { + time.Sleep(500 * time.Millisecond) + blocked.Block(e3) + }() + + // Get the duplicates. + out = blocked.GetDuplicates(1 * time.Second) + if len(out) != 1 || !reflect.DeepEqual(out[0], e3) { + t.Fatalf("bad: %#v %#v", out, e2) + } +} + func TestBlockedEvals_UnblockEscaped(t *testing.T) { blocked, broker := testBlockedEvals(t) From a987dece122f60c4e1b90aa1651ec07599674362 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Sat, 30 Jan 2016 16:16:13 -0800 Subject: [PATCH 07/10] Leader reaps and cancels duplicate evals --- nomad/leader.go | 38 ++++++++++++++++++++++++++++++++++++++ nomad/leader_test.go | 27 +++++++++++++++++++++++++++ nomad/structs/structs.go | 15 ++++++++------- 3 files changed, 73 insertions(+), 7 deletions(-) diff --git a/nomad/leader.go b/nomad/leader.go index b511f3d13b9..153a795250a 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -136,6 +136,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Reap any failed evaluations go s.reapFailedEvaluations(stopCh) + // Reap any duplicate blocked evaluations + go s.reapDupBlockedEvaluations(stopCh) + // Setup the heartbeat timers. This is done both when starting up or when // a leader fail over happens. Since the timers are maintained by the leader // node, effectively this means all the timers are renewed at the time of failover. @@ -301,6 +304,41 @@ func (s *Server) reapFailedEvaluations(stopCh chan struct{}) { } } +// reapDupBlockedEvaluations is used to reap duplicate blocked evaluations and +// should be cancelled. +func (s *Server) reapDupBlockedEvaluations(stopCh chan struct{}) { + for { + select { + case <-stopCh: + return + default: + // Scan for duplicate blocked evals. + dups := s.blockedEvals.GetDuplicates(time.Second) + if dups == nil { + continue + } + + cancel := make([]*structs.Evaluation, len(dups)) + for i, dup := range dups { + // Update the status to cancelled + newEval := dup.Copy() + newEval.Status = structs.EvalStatusCancelled + newEval.StatusDescription = fmt.Sprintf("existing blocked evaluation exists for job %q", newEval.JobID) + cancel[i] = newEval + } + + // Update via Raft + req := structs.EvalUpdateRequest{ + Evals: cancel, + } + if _, _, err := s.raftApply(structs.EvalUpdateRequestType, &req); err != nil { + s.logger.Printf("[ERR] nomad: failed to update duplicate evals %#v: %v", cancel, err) + continue + } + } + } +} + // revokeLeadership is invoked once we step down as leader. // This is used to cleanup any state that may be specific to a leader. func (s *Server) revokeLeadership() error { diff --git a/nomad/leader_test.go b/nomad/leader_test.go index f3029815b42..57b63aba71c 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -522,3 +522,30 @@ func TestLeader_ReapFailedEval(t *testing.T) { t.Fatalf("err: %v", err) }) } + +func TestLeader_ReapDuplicateEval(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 + }) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Create a duplicate blocked eval + eval := mock.Eval() + eval2 := mock.Eval() + eval2.JobID = eval.JobID + s1.blockedEvals.Block(eval) + s1.blockedEvals.Block(eval2) + + // Wait for the evaluation to marked as cancelled + state := s1.fsm.State() + testutil.WaitForResult(func() (bool, error) { + out, err := state.EvalByID(eval2.ID) + if err != nil { + return false, err + } + return out != nil && out.Status == structs.EvalStatusCancelled, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 7d0a566f825..d367d4fc69e 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1824,10 +1824,11 @@ func (a *AllocMetric) ScoreNode(node *Node, name string, score float64) { } const ( - EvalStatusBlocked = "blocked" - EvalStatusPending = "pending" - EvalStatusComplete = "complete" - EvalStatusFailed = "failed" + EvalStatusBlocked = "blocked" + EvalStatusPending = "pending" + EvalStatusComplete = "complete" + EvalStatusFailed = "failed" + EvalStatusCancelled = "cancelled" ) const ( @@ -1930,7 +1931,7 @@ type Evaluation struct { // will no longer transition. func (e *Evaluation) TerminalStatus() bool { switch e.Status { - case EvalStatusComplete, EvalStatusFailed: + case EvalStatusComplete, EvalStatusFailed, EvalStatusCancelled: return true default: return false @@ -1953,7 +1954,7 @@ func (e *Evaluation) ShouldEnqueue() bool { switch e.Status { case EvalStatusPending: return true - case EvalStatusComplete, EvalStatusFailed, EvalStatusBlocked: + case EvalStatusComplete, EvalStatusFailed, EvalStatusBlocked, EvalStatusCancelled: return false default: panic(fmt.Sprintf("unhandled evaluation (%s) status %s", e.ID, e.Status)) @@ -1966,7 +1967,7 @@ func (e *Evaluation) ShouldBlock() bool { switch e.Status { case EvalStatusBlocked: return true - case EvalStatusComplete, EvalStatusFailed, EvalStatusPending: + case EvalStatusComplete, EvalStatusFailed, EvalStatusPending, EvalStatusCancelled: return false default: panic(fmt.Sprintf("unhandled evaluation (%s) status %s", e.ID, e.Status)) From cc2ee4fbd810838dcfe0c32e2162268c3ce02241 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Sat, 30 Jan 2016 16:21:37 -0800 Subject: [PATCH 08/10] Disable blocked eval tracker when leadership is lost --- nomad/leader.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/nomad/leader.go b/nomad/leader.go index 153a795250a..ce60966f4e4 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -348,6 +348,9 @@ func (s *Server) revokeLeadership() error { // Disable the eval broker, since it is only useful as a leader s.evalBroker.SetEnabled(false) + // Disable the blocked eval tracker, since it is only useful as a leader + s.blockedEvals.SetEnabled(false) + // Disable the periodic dispatcher, since it is only useful as a leader s.periodicDispatcher.SetEnabled(false) From 3d8e7d00502484aa8d1a21aecedc2480efba370f Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Sun, 31 Jan 2016 18:46:45 -0800 Subject: [PATCH 09/10] Address comments --- nomad/blocked_evals.go | 161 +++++++++++++++++++++------------------ nomad/server.go | 3 + nomad/structs/structs.go | 2 +- 3 files changed, 89 insertions(+), 77 deletions(-) diff --git a/nomad/blocked_evals.go b/nomad/blocked_evals.go index 0ed7be07e41..782b7b96b9b 100644 --- a/nomad/blocked_evals.go +++ b/nomad/blocked_evals.go @@ -9,7 +9,9 @@ import ( ) const ( - // unblockBuffer is the buffer size for the unblock channel. + // unblockBuffer is the buffer size for the unblock channel. The buffer + // should be large to ensure that the FSM doesn't block when calling Unblock + // as this would apply back-pressure on Raft. unblockBuffer = 8096 ) @@ -34,7 +36,7 @@ type BlockedEvals struct { escaped map[string]*structs.Evaluation // unblockCh is used to buffer unblocking of evaluations. - unblockCh chan string + capacityChangeCh chan string // jobs is the map of blocked job and is used to ensure that only one // blocked eval exists for each job. @@ -68,14 +70,14 @@ type BlockedStats struct { // unblocked evals into the passed broker. func NewBlockedEvals(evalBroker *EvalBroker) *BlockedEvals { return &BlockedEvals{ - evalBroker: evalBroker, - captured: make(map[string]*structs.Evaluation), - escaped: make(map[string]*structs.Evaluation), - jobs: make(map[string]struct{}), - unblockCh: make(chan string, unblockBuffer), - duplicateCh: make(chan struct{}), - stopCh: make(chan struct{}), - stats: new(BlockedStats), + evalBroker: evalBroker, + captured: make(map[string]*structs.Evaluation), + escaped: make(map[string]*structs.Evaluation), + jobs: make(map[string]struct{}), + capacityChangeCh: make(chan string, unblockBuffer), + duplicateCh: make(chan struct{}, 1), + stopCh: make(chan struct{}), + stats: new(BlockedStats), } } @@ -93,7 +95,7 @@ func (b *BlockedEvals) SetEnabled(enabled bool) { b.enabled = enabled if !b.running { b.running = true - go b.unblock() + go b.watchCapacity() } b.l.Unlock() if !enabled { @@ -112,7 +114,10 @@ func (b *BlockedEvals) Block(eval *structs.Evaluation) { return } - // Check if the job already has a blocked evaluation + // Check if the job already has a blocked evaluation. If it does add it to + // the list of duplicates. We omly ever want one blocked evaluation per job, + // otherwise we would create unnecessary work for the scheduler as multiple + // evals for the same job would be run, all producing the same outcome. if _, existing := b.jobs[eval.JobID]; existing { b.duplicates = append(b.duplicates, eval) @@ -125,14 +130,22 @@ func (b *BlockedEvals) Block(eval *structs.Evaluation) { return } + // Mark the job as tracked. b.stats.TotalBlocked++ b.jobs[eval.JobID] = struct{}{} + + // If the eval has escaped, meaning computed node classes could not capture + // the constraints of the job, we store the eval separately as we have to + // unblock it whenever node capacity changes. This is because we don't know + // what node class is feasible for the jobs constraints. if eval.EscapedComputedClass { b.escaped[eval.ID] = eval b.stats.TotalEscaped++ return } + // Add the eval to the set of blocked evals whose jobs constraints are + // captured by computed node class. b.captured[eval.ID] = eval } @@ -145,78 +158,74 @@ func (b *BlockedEvals) Unblock(computedClass string) { return } - b.unblockCh <- computedClass + b.capacityChangeCh <- computedClass } -func (b *BlockedEvals) unblock() { +// watchCapacity is a long lived function that watches for capacity changes in +// nodes and unblocks the correct set of evals. +func (b *BlockedEvals) watchCapacity() { for { select { case <-b.stopCh: return - case computedClass := <-b.unblockCh: - b.l.Lock() - - // Protect against the case of a flush. - if !b.running { - return - } - - // Every eval that has escaped computed node class has to be unblocked - // because any node could potentially be feasible. - i := 0 - var unblocked []*structs.Evaluation - if l := len(b.escaped); l != 0 { - unblocked = make([]*structs.Evaluation, l) - for id, eval := range b.escaped { - unblocked[i] = eval - delete(b.escaped, id) - delete(b.jobs, eval.JobID) - i++ - } - } - - // We unblock any eval that is explicitely eligible for the computed class - // and also any eval that is not eligible or uneligible. This signifies that - // when the evaluation was originally run through the scheduler, that it - // never saw a node with the given computed class and thus needs to be - // unblocked for correctness. - var untrack []string - for id, eval := range b.captured { - if elig, ok := eval.ClassEligibility[computedClass]; ok { - if !elig { - // Can skip because the eval has explicitely marked the node class - // as ineligible. - continue - } - } - - // The computed node class has never been seen by the eval so we unblock - // it. - unblocked = append(unblocked, eval) - untrack = append(untrack, id) - delete(b.jobs, eval.JobID) - } - - // Untrack the unblocked evals. - if l := len(untrack); l != 0 { - for _, id := range untrack { - delete(b.captured, id) - } - } - - if l := len(unblocked); l != 0 { - // Update the counters - b.stats.TotalEscaped = 0 - b.stats.TotalBlocked -= l - - // Enqueue all the unblocked evals into the broker. - b.evalBroker.EnqueueAll(unblocked) - } - b.l.Unlock() + case computedClass := <-b.capacityChangeCh: + b.unblock(computedClass) } } } +// unblock unblocks all blocked evals that could run on the passed computed node +// class. +func (b *BlockedEvals) unblock(computedClass string) { + b.l.Lock() + defer b.l.Unlock() + + // Protect against the case of a flush. + if !b.running { + return + } + + // Every eval that has escaped computed node class has to be unblocked + // because any node could potentially be feasible. + var unblocked []*structs.Evaluation + if l := len(b.escaped); l != 0 { + unblocked = make([]*structs.Evaluation, 0, l) + for id, eval := range b.escaped { + unblocked = append(unblocked, eval) + delete(b.escaped, id) + delete(b.jobs, eval.JobID) + } + } + + // We unblock any eval that is explicitely eligible for the computed class + // and also any eval that is not eligible or uneligible. This signifies that + // when the evaluation was originally run through the scheduler, that it + // never saw a node with the given computed class and thus needs to be + // unblocked for correctness. + for id, eval := range b.captured { + if elig, ok := eval.ClassEligibility[computedClass]; ok && !elig { + // Can skip because the eval has explicitely marked the node class + // as ineligible. + continue + } + + // The computed node class has never been seen by the eval so we unblock + // it. + unblocked = append(unblocked, eval) + delete(b.jobs, eval.JobID) + delete(b.captured, id) + } + + if l := len(unblocked); l != 0 { + // Update the counters + b.stats.TotalEscaped = 0 + b.stats.TotalBlocked -= l + + // Enqueue all the unblocked evals into the broker. + b.evalBroker.EnqueueAll(unblocked) + } +} + // GetDuplicates returns all the duplicate evaluations and blocks until the // passed timeout. func (b *BlockedEvals) GetDuplicates(timeout time.Duration) []*structs.Evaluation { @@ -269,9 +278,9 @@ func (b *BlockedEvals) Flush() { b.escaped = make(map[string]*structs.Evaluation) b.jobs = make(map[string]struct{}) b.duplicates = nil - b.unblockCh = make(chan string, unblockBuffer) + b.capacityChangeCh = make(chan string, unblockBuffer) b.stopCh = make(chan struct{}) - b.duplicateCh = make(chan struct{}) + b.duplicateCh = make(chan struct{}, 1) } // Stats is used to query the state of the blocked eval tracker. diff --git a/nomad/server.go b/nomad/server.go index 491490d2ed7..2c70216cf96 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -241,6 +241,9 @@ func NewServer(config *Config) (*Server, error) { // Emit metrics for the plan queue go planQueue.EmitStats(time.Second, s.shutdownCh) + // Emit metrics for the blocked eval tracker. + go blockedEvals.EmitStats(time.Second, s.shutdownCh) + // Emit metrics go s.heartbeatStats() diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index d367d4fc69e..6ba1d767d08 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1828,7 +1828,7 @@ const ( EvalStatusPending = "pending" EvalStatusComplete = "complete" EvalStatusFailed = "failed" - EvalStatusCancelled = "cancelled" + EvalStatusCancelled = "canceled" ) const ( From 65b8b5295c75c4d47ccf76d1bb78cee631a04795 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Sun, 31 Jan 2016 20:56:52 -0800 Subject: [PATCH 10/10] Remove running, system scheduler, and fix tg overriding eligibility --- nomad/blocked_evals.go | 19 ++++---- scheduler/context.go | 7 ++- scheduler/context_test.go | 4 ++ scheduler/system_sched.go | 15 ------ scheduler/system_sched_test.go | 83 ---------------------------------- 5 files changed, 18 insertions(+), 110 deletions(-) diff --git a/nomad/blocked_evals.go b/nomad/blocked_evals.go index 782b7b96b9b..8d818c18f20 100644 --- a/nomad/blocked_evals.go +++ b/nomad/blocked_evals.go @@ -23,7 +23,6 @@ const ( type BlockedEvals struct { evalBroker *EvalBroker enabled bool - running bool stats *BlockedStats l sync.RWMutex @@ -92,11 +91,15 @@ func (b *BlockedEvals) Enabled() bool { // should only be enabled on the active leader. func (b *BlockedEvals) SetEnabled(enabled bool) { b.l.Lock() - b.enabled = enabled - if !b.running { - b.running = true + if b.enabled == enabled { + // No-op + return + } else if enabled { go b.watchCapacity() + } else { + close(b.stopCh) } + b.enabled = enabled b.l.Unlock() if !enabled { b.Flush() @@ -181,7 +184,7 @@ func (b *BlockedEvals) unblock(computedClass string) { defer b.l.Unlock() // Protect against the case of a flush. - if !b.running { + if !b.enabled { return } @@ -265,12 +268,6 @@ func (b *BlockedEvals) Flush() { b.l.Lock() defer b.l.Unlock() - // Kill any running goroutines - if b.running { - close(b.stopCh) - b.running = false - } - // Reset the blocked eval tracker. b.stats.TotalEscaped = 0 b.stats.TotalBlocked = 0 diff --git a/scheduler/context.go b/scheduler/context.go index b95a4e51ac0..0a765944fc7 100644 --- a/scheduler/context.go +++ b/scheduler/context.go @@ -242,7 +242,12 @@ func (e *EvalEligibility) GetClasses() map[string]bool { case EvalComputedClassEligible: elig[class] = true case EvalComputedClassIneligible: - elig[class] = false + // Only mark as ineligible if it hasn't been marked before. This + // prevents one task group marking a class as ineligible when it + // is eligible on another task group. + if _, ok := elig[class]; !ok { + elig[class] = false + } } } } diff --git a/scheduler/context_test.go b/scheduler/context_test.go index 782b526c3ba..b8b3cbfa2df 100644 --- a/scheduler/context_test.go +++ b/scheduler/context_test.go @@ -216,6 +216,10 @@ func TestEvalEligibility_GetClasses(t *testing.T) { e.SetTaskGroupEligibility(false, "bar", "v1:4") e.SetTaskGroupEligibility(true, "bar", "v1:5") + // Mark an existing eligible class as ineligible in the TG. + e.SetTaskGroupEligibility(false, "fizz", "v1:1") + e.SetTaskGroupEligibility(false, "fizz", "v1:3") + expClasses := map[string]bool{ "v1:1": true, "v1:2": false, diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index cbb27c3d841..9b352b81fb1 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -35,8 +35,6 @@ type SystemScheduler struct { limitReached bool nextEval *structs.Evaluation - - blocked *structs.Evaluation } // NewSystemScheduler is a factory function to instantiate a new system @@ -129,19 +127,6 @@ func (s *SystemScheduler) process() (bool, error) { s.logger.Printf("[DEBUG] sched: %#v: rolling update limit reached, next eval '%s' created", s.eval, s.nextEval.ID) } - // If there are failed allocations, we need to create a blocked evaluation - // to place the failed allocations when resources become available. - if len(s.plan.FailedAllocs) != 0 && s.blocked == nil { - e := s.ctx.Eligibility() - classes := e.GetClasses() - s.blocked = s.eval.BlockedEval(classes, e.HasEscaped()) - if err := s.planner.CreateEval(s.blocked); err != nil { - s.logger.Printf("[ERR] sched: %#v failed to make blocked eval: %v", s.eval, err) - return false, err - } - s.logger.Printf("[DEBUG] sched: %#v: failed to place all allocations, blocked eval '%s' created", s.eval, s.blocked.ID) - } - // Submit the plan result, newState, err := s.planner.SubmitPlan(s.plan) if err != nil { diff --git a/scheduler/system_sched_test.go b/scheduler/system_sched_test.go index ce61b93e93a..fae5f322a76 100644 --- a/scheduler/system_sched_test.go +++ b/scheduler/system_sched_test.go @@ -184,89 +184,6 @@ func TestSystemSched_JobRegister_AllocFail(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } -func TestSystemSched_JobRegister_BlockedEval(t *testing.T) { - h := NewHarness(t) - - // Create a full node - node := mock.Node() - node.Reserved = node.Resources - node.ComputeClass() - noErr(t, h.State.UpsertNode(h.NextIndex(), node)) - - // Create an ineligible node - node2 := mock.Node() - node2.Attributes["kernel.name"] = "windows" - node2.ComputeClass() - noErr(t, h.State.UpsertNode(h.NextIndex(), node2)) - - // Create a jobs - job := mock.SystemJob() - noErr(t, h.State.UpsertJob(h.NextIndex(), job)) - - // Create a mock evaluation to register the job - eval := &structs.Evaluation{ - ID: structs.GenerateUUID(), - Priority: job.Priority, - TriggeredBy: structs.EvalTriggerJobRegister, - JobID: job.ID, - } - - // Process the evaluation - err := h.Process(NewServiceScheduler, eval) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Ensure a single plan - if len(h.Plans) != 1 { - t.Fatalf("bad: %#v", h.Plans) - } - plan := h.Plans[0] - - // Ensure the plan has created a follow up eval. - if len(h.CreateEvals) != 1 { - t.Fatalf("bad: %#v", h.CreateEvals) - } - - created := h.CreateEvals[0] - if created.Status != structs.EvalStatusBlocked { - t.Fatalf("bad: %#v", created) - } - - classes := created.ClassEligibility - if len(classes) != 2 || !classes[node.ComputedClass] || classes[node2.ComputedClass] { - t.Fatalf("bad: %#v", classes) - } - - if created.EscapedComputedClass { - t.Fatalf("bad: %#v", created) - } - - // Ensure the plan failed to alloc - if len(plan.FailedAllocs) != 1 { - t.Fatalf("bad: %#v", plan) - } - - // Lookup the allocations by JobID - out, err := h.State.AllocsByJob(job.ID) - noErr(t, err) - - // Ensure all allocations placed - if len(out) != 1 { - for _, a := range out { - t.Logf("%#v", a) - } - t.Fatalf("bad: %#v", out) - } - - // Check the available nodes - if count, ok := out[0].Metrics.NodesAvailable["dc1"]; !ok || count != 2 { - t.Fatalf("bad: %#v", out[0].Metrics) - } - - h.AssertEvalStatus(t, structs.EvalStatusComplete) -} - func TestSystemSched_JobModify(t *testing.T) { h := NewHarness(t)