diff --git a/demo/vagrant/client2.hcl b/demo/vagrant/client2.hcl index 839dc46af4a..1b1372ae2a8 100644 --- a/demo/vagrant/client2.hcl +++ b/demo/vagrant/client2.hcl @@ -15,7 +15,7 @@ client { # Set ourselves as thing one meta { - thing = "two" + ssd = "true" } } diff --git a/nomad/blocked_evals.go b/nomad/blocked_evals.go new file mode 100644 index 00000000000..8d818c18f20 --- /dev/null +++ b/nomad/blocked_evals.go @@ -0,0 +1,309 @@ +package nomad + +import ( + "sync" + "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/nomad/nomad/structs" +) + +const ( + // unblockBuffer is the buffer size for the unblock channel. The buffer + // should be large to ensure that the FSM doesn't block when calling Unblock + // as this would apply back-pressure on Raft. + unblockBuffer = 8096 +) + +// BlockedEvals is used to track evaluations that shouldn't be queued until a +// certain class of nodes becomes available. An evaluation is put into the +// blocked state when it is run through the scheduler and produced failed +// allocations. It is unblocked when the capacity of a node that could run the +// failed allocation becomes available. +type BlockedEvals struct { + evalBroker *EvalBroker + enabled bool + stats *BlockedStats + l sync.RWMutex + + // captured is the set of evaluations that are captured by computed node + // classes. + captured map[string]*structs.Evaluation + + // escaped is the set of evaluations that have escaped computed node + // classes. + escaped map[string]*structs.Evaluation + + // unblockCh is used to buffer unblocking of evaluations. + capacityChangeCh chan string + + // jobs is the map of blocked job and is used to ensure that only one + // blocked eval exists for each job. + jobs map[string]struct{} + + // duplicates is the set of evaluations for jobs that had pre-existing + // blocked evaluations. These should be marked as cancelled since only one + // blocked eval is neeeded bper job. + duplicates []*structs.Evaluation + + // duplicateCh is used to signal that a duplicate eval was added to the + // duplicate set. It can be used to unblock waiting callers looking for + // duplicates. + duplicateCh chan struct{} + + // stopCh is used to stop any created goroutines. + stopCh chan struct{} +} + +// BlockedStats returns all the stats about the blocked eval tracker. +type BlockedStats struct { + // TotalEscaped is the total number of blocked evaluations that have escaped + // computed node classes. + TotalEscaped int + + // TotalBlocked is the total number of blocked evaluations. + TotalBlocked int +} + +// NewBlockedEvals creates a new blocked eval tracker that will enqueue +// unblocked evals into the passed broker. +func NewBlockedEvals(evalBroker *EvalBroker) *BlockedEvals { + return &BlockedEvals{ + evalBroker: evalBroker, + captured: make(map[string]*structs.Evaluation), + escaped: make(map[string]*structs.Evaluation), + jobs: make(map[string]struct{}), + capacityChangeCh: make(chan string, unblockBuffer), + duplicateCh: make(chan struct{}, 1), + stopCh: make(chan struct{}), + stats: new(BlockedStats), + } +} + +// Enabled is used to check if the broker is enabled. +func (b *BlockedEvals) Enabled() bool { + b.l.RLock() + defer b.l.RUnlock() + return b.enabled +} + +// SetEnabled is used to control if the broker is enabled. The broker +// should only be enabled on the active leader. +func (b *BlockedEvals) SetEnabled(enabled bool) { + b.l.Lock() + if b.enabled == enabled { + // No-op + return + } else if enabled { + go b.watchCapacity() + } else { + close(b.stopCh) + } + b.enabled = enabled + b.l.Unlock() + if !enabled { + b.Flush() + } +} + +// Block tracks the passed evaluation and enqueues it into the eval broker when +// a suitable node calls unblock. +func (b *BlockedEvals) Block(eval *structs.Evaluation) { + b.l.Lock() + defer b.l.Unlock() + + // Do nothing if not enabled + if !b.enabled { + return + } + + // Check if the job already has a blocked evaluation. If it does add it to + // the list of duplicates. We omly ever want one blocked evaluation per job, + // otherwise we would create unnecessary work for the scheduler as multiple + // evals for the same job would be run, all producing the same outcome. + if _, existing := b.jobs[eval.JobID]; existing { + b.duplicates = append(b.duplicates, eval) + + // Unblock any waiter. + select { + case b.duplicateCh <- struct{}{}: + default: + } + + return + } + + // Mark the job as tracked. + b.stats.TotalBlocked++ + b.jobs[eval.JobID] = struct{}{} + + // If the eval has escaped, meaning computed node classes could not capture + // the constraints of the job, we store the eval separately as we have to + // unblock it whenever node capacity changes. This is because we don't know + // what node class is feasible for the jobs constraints. + if eval.EscapedComputedClass { + b.escaped[eval.ID] = eval + b.stats.TotalEscaped++ + return + } + + // Add the eval to the set of blocked evals whose jobs constraints are + // captured by computed node class. + b.captured[eval.ID] = eval +} + +// Unblock causes any evaluation that could potentially make progress on a +// capacity change on the passed computed node class to be enqueued into the +// eval broker. +func (b *BlockedEvals) Unblock(computedClass string) { + // Do nothing if not enabled + if !b.enabled { + return + } + + b.capacityChangeCh <- computedClass +} + +// watchCapacity is a long lived function that watches for capacity changes in +// nodes and unblocks the correct set of evals. +func (b *BlockedEvals) watchCapacity() { + for { + select { + case <-b.stopCh: + return + case computedClass := <-b.capacityChangeCh: + b.unblock(computedClass) + } + } +} + +// unblock unblocks all blocked evals that could run on the passed computed node +// class. +func (b *BlockedEvals) unblock(computedClass string) { + b.l.Lock() + defer b.l.Unlock() + + // Protect against the case of a flush. + if !b.enabled { + return + } + + // Every eval that has escaped computed node class has to be unblocked + // because any node could potentially be feasible. + var unblocked []*structs.Evaluation + if l := len(b.escaped); l != 0 { + unblocked = make([]*structs.Evaluation, 0, l) + for id, eval := range b.escaped { + unblocked = append(unblocked, eval) + delete(b.escaped, id) + delete(b.jobs, eval.JobID) + } + } + + // We unblock any eval that is explicitely eligible for the computed class + // and also any eval that is not eligible or uneligible. This signifies that + // when the evaluation was originally run through the scheduler, that it + // never saw a node with the given computed class and thus needs to be + // unblocked for correctness. + for id, eval := range b.captured { + if elig, ok := eval.ClassEligibility[computedClass]; ok && !elig { + // Can skip because the eval has explicitely marked the node class + // as ineligible. + continue + } + + // The computed node class has never been seen by the eval so we unblock + // it. + unblocked = append(unblocked, eval) + delete(b.jobs, eval.JobID) + delete(b.captured, id) + } + + if l := len(unblocked); l != 0 { + // Update the counters + b.stats.TotalEscaped = 0 + b.stats.TotalBlocked -= l + + // Enqueue all the unblocked evals into the broker. + b.evalBroker.EnqueueAll(unblocked) + } +} + +// GetDuplicates returns all the duplicate evaluations and blocks until the +// passed timeout. +func (b *BlockedEvals) GetDuplicates(timeout time.Duration) []*structs.Evaluation { + var timeoutTimer *time.Timer + var timeoutCh <-chan time.Time +SCAN: + b.l.Lock() + if len(b.duplicates) != 0 { + dups := b.duplicates + b.duplicates = nil + b.l.Unlock() + return dups + } + b.l.Unlock() + + // Create the timer + if timeoutTimer == nil && timeout != 0 { + timeoutTimer = time.NewTimer(timeout) + timeoutCh = timeoutTimer.C + defer timeoutTimer.Stop() + } + + select { + case <-b.stopCh: + return nil + case <-timeoutCh: + return nil + case <-b.duplicateCh: + goto SCAN + } + + return nil +} + +// Flush is used to clear the state of blocked evaluations. +func (b *BlockedEvals) Flush() { + b.l.Lock() + defer b.l.Unlock() + + // Reset the blocked eval tracker. + b.stats.TotalEscaped = 0 + b.stats.TotalBlocked = 0 + b.captured = make(map[string]*structs.Evaluation) + b.escaped = make(map[string]*structs.Evaluation) + b.jobs = make(map[string]struct{}) + b.duplicates = nil + b.capacityChangeCh = make(chan string, unblockBuffer) + b.stopCh = make(chan struct{}) + b.duplicateCh = make(chan struct{}, 1) +} + +// Stats is used to query the state of the blocked eval tracker. +func (b *BlockedEvals) Stats() *BlockedStats { + // Allocate a new stats struct + stats := new(BlockedStats) + + b.l.RLock() + defer b.l.RUnlock() + + // Copy all the stats + stats.TotalEscaped = b.stats.TotalEscaped + stats.TotalBlocked = b.stats.TotalBlocked + return stats +} + +// EmitStats is used to export metrics about the blocked eval tracker while enabled +func (b *BlockedEvals) EmitStats(period time.Duration, stopCh chan struct{}) { + for { + select { + case <-time.After(period): + stats := b.Stats() + metrics.SetGauge([]string{"nomad", "blocked_evals", "total_blocked"}, float32(stats.TotalBlocked)) + metrics.SetGauge([]string{"nomad", "blocked_evals", "total_escaped"}, float32(stats.TotalEscaped)) + case <-stopCh: + return + } + } +} diff --git a/nomad/blocked_evals_test.go b/nomad/blocked_evals_test.go new file mode 100644 index 00000000000..cf725a7a110 --- /dev/null +++ b/nomad/blocked_evals_test.go @@ -0,0 +1,235 @@ +package nomad + +import ( + "fmt" + "reflect" + "testing" + "time" + + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" +) + +func testBlockedEvals(t *testing.T) (*BlockedEvals, *EvalBroker) { + broker := testBroker(t, 0) + broker.SetEnabled(true) + blocked := NewBlockedEvals(broker) + blocked.SetEnabled(true) + return blocked, broker +} + +func TestBlockedEvals_Block_Disabled(t *testing.T) { + blocked, _ := testBlockedEvals(t) + blocked.SetEnabled(false) + + // Create an escaped eval and add it to the blocked tracker. + e := mock.Eval() + e.Status = structs.EvalStatusBlocked + e.EscapedComputedClass = true + blocked.Block(e) + + // Verify block did nothing + bStats := blocked.Stats() + if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 { + t.Fatalf("bad: %#v", bStats) + } +} + +func TestBlockedEvals_Block_SameJob(t *testing.T) { + blocked, _ := testBlockedEvals(t) + + // Create two blocked evals and add them to the blocked tracker. + e := mock.Eval() + e2 := mock.Eval() + e2.JobID = e.JobID + blocked.Block(e) + blocked.Block(e2) + + // Verify block did track both + bStats := blocked.Stats() + if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 { + t.Fatalf("bad: %#v", bStats) + } +} + +func TestBlockedEvals_GetDuplicates(t *testing.T) { + blocked, _ := testBlockedEvals(t) + + // Create duplicate blocked evals and add them to the blocked tracker. + e := mock.Eval() + e2 := mock.Eval() + e2.JobID = e.JobID + e3 := mock.Eval() + e3.JobID = e.JobID + blocked.Block(e) + blocked.Block(e2) + + // Verify block did track both + bStats := blocked.Stats() + if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 { + t.Fatalf("bad: %#v", bStats) + } + + // Get the duplicates. + out := blocked.GetDuplicates(0) + if len(out) != 1 || !reflect.DeepEqual(out[0], e2) { + t.Fatalf("bad: %#v %#v", out, e2) + } + + // Call block again after a small sleep. + go func() { + time.Sleep(500 * time.Millisecond) + blocked.Block(e3) + }() + + // Get the duplicates. + out = blocked.GetDuplicates(1 * time.Second) + if len(out) != 1 || !reflect.DeepEqual(out[0], e3) { + t.Fatalf("bad: %#v %#v", out, e2) + } +} + +func TestBlockedEvals_UnblockEscaped(t *testing.T) { + blocked, broker := testBlockedEvals(t) + + // Create an escaped eval and add it to the blocked tracker. + e := mock.Eval() + e.Status = structs.EvalStatusBlocked + e.EscapedComputedClass = true + blocked.Block(e) + + // Verify block caused the eval to be tracked + bStats := blocked.Stats() + if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 1 { + t.Fatalf("bad: %#v", bStats) + } + + blocked.Unblock("v1:123") + + testutil.WaitForResult(func() (bool, error) { + // Verify Unblock caused an enqueue + brokerStats := broker.Stats() + if brokerStats.TotalReady != 1 { + return false, fmt.Errorf("bad: %#v", brokerStats) + } + + // Verify Unblock updates the stats + bStats := blocked.Stats() + if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 { + return false, fmt.Errorf("bad: %#v", bStats) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) +} + +func TestBlockedEvals_UnblockEligible(t *testing.T) { + blocked, broker := testBlockedEvals(t) + + // Create a blocked eval that is eligible on a specific node class and add + // it to the blocked tracker. + e := mock.Eval() + e.Status = structs.EvalStatusBlocked + e.ClassEligibility = map[string]bool{"v1:123": true} + blocked.Block(e) + + // Verify block caused the eval to be tracked + blockedStats := blocked.Stats() + if blockedStats.TotalBlocked != 1 { + t.Fatalf("bad: %#v", blockedStats) + } + + blocked.Unblock("v1:123") + + testutil.WaitForResult(func() (bool, error) { + // Verify Unblock caused an enqueue + brokerStats := broker.Stats() + if brokerStats.TotalReady != 1 { + return false, fmt.Errorf("bad: %#v", brokerStats) + } + + // Verify Unblock updates the stats + bStats := blocked.Stats() + if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 { + return false, fmt.Errorf("bad: %#v", bStats) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) +} + +func TestBlockedEvals_UnblockIneligible(t *testing.T) { + blocked, broker := testBlockedEvals(t) + + // Create a blocked eval that is ineligible on a specific node class and add + // it to the blocked tracker. + e := mock.Eval() + e.Status = structs.EvalStatusBlocked + e.ClassEligibility = map[string]bool{"v1:123": false} + blocked.Block(e) + + // Verify block caused the eval to be tracked + blockedStats := blocked.Stats() + if blockedStats.TotalBlocked != 1 && blockedStats.TotalEscaped != 0 { + t.Fatalf("bad: %#v", blockedStats) + } + + // Should do nothing + blocked.Unblock("v1:123") + + testutil.WaitForResult(func() (bool, error) { + // Verify Unblock didn't cause an enqueue + brokerStats := broker.Stats() + if brokerStats.TotalReady != 0 { + return false, fmt.Errorf("bad: %#v", brokerStats) + } + + bStats := blocked.Stats() + if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 { + return false, fmt.Errorf("bad: %#v", bStats) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) +} + +func TestBlockedEvals_UnblockUnknown(t *testing.T) { + blocked, broker := testBlockedEvals(t) + + // Create a blocked eval that is ineligible on a specific node class and add + // it to the blocked tracker. + e := mock.Eval() + e.Status = structs.EvalStatusBlocked + e.ClassEligibility = map[string]bool{"v1:123": true, "v1:456": false} + blocked.Block(e) + + // Verify block caused the eval to be tracked + blockedStats := blocked.Stats() + if blockedStats.TotalBlocked != 1 && blockedStats.TotalEscaped != 0 { + t.Fatalf("bad: %#v", blockedStats) + } + + // Should unblock because the eval hasn't seen this node class. + blocked.Unblock("v1:789") + + testutil.WaitForResult(func() (bool, error) { + // Verify Unblock causes an enqueue + brokerStats := broker.Stats() + if brokerStats.TotalReady != 1 { + return false, fmt.Errorf("bad: %#v", brokerStats) + } + + // Verify Unblock updates the stats + bStats := blocked.Stats() + if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 { + return false, fmt.Errorf("bad: %#v", bStats) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) +} diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index 92e49996941..ead6f86bf71 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -128,7 +128,19 @@ func (b *EvalBroker) SetEnabled(enabled bool) { } } +// EnqueueAll is used to enqueue many evaluations. +// TODO: Update enqueueLocked to take a list and use heap.Fix instead of +// heap.Push in order to make the running time O(log(n+m)) instead of +// O(m*log(n)) where m is the size of the evals and n is the size of the +// existing heap. +func (b *EvalBroker) EnqueueAll(evals []*structs.Evaluation) { + for _, e := range evals { + b.Enqueue(e) + } +} + // Enqueue is used to enqueue an evaluation +// TODO: remove the error return value func (b *EvalBroker) Enqueue(eval *structs.Evaluation) error { b.l.Lock() defer b.l.Unlock() diff --git a/nomad/fsm.go b/nomad/fsm.go index 5fe4009a26a..7e9a57f3f2d 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -40,6 +40,7 @@ const ( // this outside the Server to avoid exposing this outside the package. type nomadFSM struct { evalBroker *EvalBroker + blockedEvals *BlockedEvals periodicDispatcher *PeriodicDispatch logOutput io.Writer logger *log.Logger @@ -60,7 +61,8 @@ type snapshotHeader struct { } // NewFSMPath is used to construct a new FSM with a blank state -func NewFSM(evalBroker *EvalBroker, periodic *PeriodicDispatch, logOutput io.Writer) (*nomadFSM, error) { +func NewFSM(evalBroker *EvalBroker, periodic *PeriodicDispatch, + blocked *BlockedEvals, logOutput io.Writer) (*nomadFSM, error) { // Create a state store state, err := state.NewStateStore(logOutput) if err != nil { @@ -70,6 +72,7 @@ func NewFSM(evalBroker *EvalBroker, periodic *PeriodicDispatch, logOutput io.Wri fsm := &nomadFSM{ evalBroker: evalBroker, periodicDispatcher: periodic, + blockedEvals: blocked, logOutput: logOutput, logger: log.New(logOutput, "", log.LstdFlags), state: state, @@ -179,6 +182,19 @@ func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} { n.logger.Printf("[ERR] nomad.fsm: UpdateNodeStatus failed: %v", err) return err } + + // Unblock evals for the nodes computed node class if it is in a ready + // state. + if req.Status == structs.NodeStatusReady { + node, err := n.state.NodeByID(req.NodeID) + if err != nil { + n.logger.Printf("[ERR] nomad.fsm: looking up node %q failed: %v", req.NodeID, err) + return err + + } + n.blockedEvals.Unblock(node.ComputedClass) + } + return nil } @@ -312,6 +328,8 @@ func (n *nomadFSM) applyUpdateEval(buf []byte, index uint64) interface{} { n.logger.Printf("[ERR] nomad.fsm: failed to enqueue evaluation %s: %v", eval.ID, err) return err } + } else if eval.ShouldBlock() { + n.blockedEvals.Block(eval) } } return nil @@ -355,10 +373,26 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{} return nil } - if err := n.state.UpdateAllocFromClient(index, req.Alloc[0]); err != nil { + alloc := req.Alloc[0] + if err := n.state.UpdateAllocFromClient(index, alloc); err != nil { n.logger.Printf("[ERR] nomad.fsm: UpdateAllocFromClient failed: %v", err) return err } + + // Unblock evals for the nodes computed node class if the client has + // finished running an allocation. + if alloc.ClientStatus == structs.AllocClientStatusDead || + alloc.ClientStatus == structs.AllocClientStatusFailed { + nodeID := alloc.NodeID + node, err := n.state.NodeByID(nodeID) + if err != nil || node == nil { + n.logger.Printf("[ERR] nomad.fsm: looking up node %q failed: %v", nodeID, err) + return err + + } + n.blockedEvals.Unblock(node.ComputedClass) + } + return nil } diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index ef76486a842..bb7f48f3de5 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -2,6 +2,7 @@ package nomad import ( "bytes" + "fmt" "os" "reflect" "testing" @@ -10,6 +11,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" "github.com/hashicorp/raft" ) @@ -44,7 +46,9 @@ func testStateStore(t *testing.T) *state.StateStore { func testFSM(t *testing.T) *nomadFSM { p, _ := testPeriodicDispatcher() - fsm, err := NewFSM(testBroker(t, 0), p, os.Stderr) + broker := testBroker(t, 0) + blocked := NewBlockedEvals(broker) + fsm, err := NewFSM(broker, p, blocked, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -140,6 +144,7 @@ func TestFSM_DeregisterNode(t *testing.T) { func TestFSM_UpdateNodeStatus(t *testing.T) { fsm := testFSM(t) + fsm.blockedEvals.SetEnabled(true) node := mock.Node() req := structs.NodeRegisterRequest{ @@ -155,6 +160,11 @@ func TestFSM_UpdateNodeStatus(t *testing.T) { t.Fatalf("resp: %v", resp) } + // Mark an eval as blocked. + eval := mock.Eval() + eval.ClassEligibility = map[string]bool{node.ComputedClass: true} + fsm.blockedEvals.Block(eval) + req2 := structs.NodeUpdateStatusRequest{ NodeID: node.ID, Status: structs.NodeStatusReady, @@ -169,7 +179,7 @@ func TestFSM_UpdateNodeStatus(t *testing.T) { t.Fatalf("resp: %v", resp) } - // Verify we are NOT registered + // Verify the status is ready. node, err = fsm.State().NodeByID(req.Node.ID) if err != nil { t.Fatalf("err: %v", err) @@ -177,6 +187,17 @@ func TestFSM_UpdateNodeStatus(t *testing.T) { if node.Status != structs.NodeStatusReady { t.Fatalf("bad node: %#v", node) } + + // Verify the eval was unblocked. + testutil.WaitForResult(func() (bool, error) { + bStats := fsm.blockedEvals.Stats() + if bStats.TotalBlocked != 0 { + return false, fmt.Errorf("bad: %#v", bStats) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) } func TestFSM_UpdateNodeDrain(t *testing.T) { @@ -357,6 +378,53 @@ func TestFSM_UpdateEval(t *testing.T) { } } +func TestFSM_UpdateEval_Blocked(t *testing.T) { + fsm := testFSM(t) + fsm.evalBroker.SetEnabled(true) + fsm.blockedEvals.SetEnabled(true) + + // Create a blocked eval. + eval := mock.Eval() + eval.Status = structs.EvalStatusBlocked + + req := structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{eval}, + } + buf, err := structs.Encode(structs.EvalUpdateRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are registered + out, err := fsm.State().EvalByID(eval.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("not found!") + } + if out.CreateIndex != 1 { + t.Fatalf("bad index: %d", out.CreateIndex) + } + + // Verify the eval wasn't enqueued + stats := fsm.evalBroker.Stats() + if stats.TotalReady != 0 { + t.Fatalf("bad: %#v %#v", stats, out) + } + + // Verify the eval was added to the blocked tracker. + bStats := fsm.blockedEvals.Stats() + if bStats.TotalBlocked != 1 { + t.Fatalf("bad: %#v %#v", bStats, out) + } +} + func TestFSM_DeleteEval(t *testing.T) { fsm := testFSM(t) @@ -452,6 +520,69 @@ func TestFSM_UpsertAllocs(t *testing.T) { } func TestFSM_UpdateAllocFromClient(t *testing.T) { + fsm := testFSM(t) + fsm.blockedEvals.SetEnabled(true) + state := fsm.State() + + node := mock.Node() + state.UpsertNode(1, node) + + // Mark an eval as blocked. + eval := mock.Eval() + eval.ClassEligibility = map[string]bool{node.ComputedClass: true} + fsm.blockedEvals.Block(eval) + + bStats := fsm.blockedEvals.Stats() + if bStats.TotalBlocked != 1 { + t.Fatalf("bad: %#v", bStats) + } + + // Create a completed eval + alloc := mock.Alloc() + alloc.NodeID = node.ID + state.UpsertAllocs(1, []*structs.Allocation{alloc}) + + clientAlloc := new(structs.Allocation) + *clientAlloc = *alloc + clientAlloc.ClientStatus = structs.AllocClientStatusDead + + req := structs.AllocUpdateRequest{ + Alloc: []*structs.Allocation{clientAlloc}, + } + buf, err := structs.Encode(structs.AllocClientUpdateRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are registered + out, err := fsm.State().AllocByID(alloc.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + clientAlloc.CreateIndex = out.CreateIndex + clientAlloc.ModifyIndex = out.ModifyIndex + if !reflect.DeepEqual(clientAlloc, out) { + t.Fatalf("bad: %#v %#v", clientAlloc, out) + } + + // Verify the eval was unblocked. + testutil.WaitForResult(func() (bool, error) { + bStats = fsm.blockedEvals.Stats() + if bStats.TotalBlocked != 0 { + return false, fmt.Errorf("bad: %#v %#v", bStats, out) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) +} + +func TestFSM_UpdateAllocFromClient_Unblock(t *testing.T) { fsm := testFSM(t) state := fsm.State() diff --git a/nomad/leader.go b/nomad/leader.go index b11ff81f04a..ce60966f4e4 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -113,8 +113,11 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Enable the eval broker, since we are now the leader s.evalBroker.SetEnabled(true) + // Enable the blocked eval tracker, since we are now the leader + s.blockedEvals.SetEnabled(true) + // Restore the eval broker state - if err := s.restoreEvalBroker(); err != nil { + if err := s.restoreEvals(); err != nil { return err } @@ -133,6 +136,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Reap any failed evaluations go s.reapFailedEvaluations(stopCh) + // Reap any duplicate blocked evaluations + go s.reapDupBlockedEvaluations(stopCh) + // Setup the heartbeat timers. This is done both when starting up or when // a leader fail over happens. Since the timers are maintained by the leader // node, effectively this means all the timers are renewed at the time of failover. @@ -149,10 +155,11 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { return nil } -// restoreEvalBroker is used to restore all pending evaluations -// into the eval broker. The broker is maintained only by the leader, -// so it must be restored anytime a leadership transition takes place. -func (s *Server) restoreEvalBroker() error { +// restoreEvals is used to restore pending evaluations into the eval broker and +// blocked evaluations into the blocked eval tracker. The broker and blocked +// eval tracker is maintained only by the leader, so it must be restored anytime +// a leadership transition takes place. +func (s *Server) restoreEvals() error { // Get an iterator over every evaluation iter, err := s.fsm.State().Evals() if err != nil { @@ -166,12 +173,12 @@ func (s *Server) restoreEvalBroker() error { } eval := raw.(*structs.Evaluation) - if !eval.ShouldEnqueue() { - continue - } - - if err := s.evalBroker.Enqueue(eval); err != nil { - return fmt.Errorf("failed to enqueue evaluation %s: %v", eval.ID, err) + if eval.ShouldEnqueue() { + if err := s.evalBroker.Enqueue(eval); err != nil { + return fmt.Errorf("failed to enqueue evaluation %s: %v", eval.ID, err) + } + } else if eval.ShouldBlock() { + s.blockedEvals.Block(eval) } } return nil @@ -297,6 +304,41 @@ func (s *Server) reapFailedEvaluations(stopCh chan struct{}) { } } +// reapDupBlockedEvaluations is used to reap duplicate blocked evaluations and +// should be cancelled. +func (s *Server) reapDupBlockedEvaluations(stopCh chan struct{}) { + for { + select { + case <-stopCh: + return + default: + // Scan for duplicate blocked evals. + dups := s.blockedEvals.GetDuplicates(time.Second) + if dups == nil { + continue + } + + cancel := make([]*structs.Evaluation, len(dups)) + for i, dup := range dups { + // Update the status to cancelled + newEval := dup.Copy() + newEval.Status = structs.EvalStatusCancelled + newEval.StatusDescription = fmt.Sprintf("existing blocked evaluation exists for job %q", newEval.JobID) + cancel[i] = newEval + } + + // Update via Raft + req := structs.EvalUpdateRequest{ + Evals: cancel, + } + if _, _, err := s.raftApply(structs.EvalUpdateRequestType, &req); err != nil { + s.logger.Printf("[ERR] nomad: failed to update duplicate evals %#v: %v", cancel, err) + continue + } + } + } +} + // revokeLeadership is invoked once we step down as leader. // This is used to cleanup any state that may be specific to a leader. func (s *Server) revokeLeadership() error { @@ -306,6 +348,9 @@ func (s *Server) revokeLeadership() error { // Disable the eval broker, since it is only useful as a leader s.evalBroker.SetEnabled(false) + // Disable the blocked eval tracker, since it is only useful as a leader + s.blockedEvals.SetEnabled(false) + // Disable the periodic dispatcher, since it is only useful as a leader s.periodicDispatcher.SetEnabled(false) diff --git a/nomad/leader_test.go b/nomad/leader_test.go index f3029815b42..57b63aba71c 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -522,3 +522,30 @@ func TestLeader_ReapFailedEval(t *testing.T) { t.Fatalf("err: %v", err) }) } + +func TestLeader_ReapDuplicateEval(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 + }) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Create a duplicate blocked eval + eval := mock.Eval() + eval2 := mock.Eval() + eval2.JobID = eval.JobID + s1.blockedEvals.Block(eval) + s1.blockedEvals.Block(eval2) + + // Wait for the evaluation to marked as cancelled + state := s1.fsm.State() + testutil.WaitForResult(func() (bool, error) { + out, err := state.EvalByID(eval2.ID) + if err != nil { + return false, err + } + return out != nil && out.Status == structs.EvalStatusCancelled, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) +} diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 743e6127ed2..9de93fc6646 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -45,7 +45,7 @@ func TestClientEndpoint_Register(t *testing.T) { if out.CreateIndex != resp.Index { t.Fatalf("index mis-match") } - if out.ComputedClass == 0 { + if out.ComputedClass == "" { t.Fatal("ComputedClass not set") } } @@ -357,7 +357,7 @@ func TestClientEndpoint_GetNode(t *testing.T) { t.Fatalf("Bad index: %d %d", resp2.Index, resp.Index) } - if resp2.Node.ComputedClass == 0 { + if resp2.Node.ComputedClass == "" { t.Fatalf("bad ComputedClass: %#v", resp2.Node) } diff --git a/nomad/server.go b/nomad/server.go index b6c13826d80..2c70216cf96 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -109,6 +109,10 @@ type Server struct { // that are waiting to be brokered to a sub-scheduler evalBroker *EvalBroker + // BlockedEvals is used to manage evaluations that are blocked on node + // capacity changes. + blockedEvals *BlockedEvals + // planQueue is used to manage the submitted allocation // plans that are waiting to be assessed by the leader planQueue *PlanQueue @@ -164,6 +168,9 @@ func NewServer(config *Config) (*Server, error) { return nil, err } + // Create a new blocked eval tracker. + blockedEvals := NewBlockedEvals(evalBroker) + // Create a plan queue planQueue, err := NewPlanQueue() if err != nil { @@ -172,17 +179,18 @@ func NewServer(config *Config) (*Server, error) { // Create the server s := &Server{ - config: config, - connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, nil), - logger: logger, - rpcServer: rpc.NewServer(), - peers: make(map[string][]*serverParts), - localPeers: make(map[string]*serverParts), - reconcileCh: make(chan serf.Member, 32), - eventCh: make(chan serf.Event, 256), - evalBroker: evalBroker, - planQueue: planQueue, - shutdownCh: make(chan struct{}), + config: config, + connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, nil), + logger: logger, + rpcServer: rpc.NewServer(), + peers: make(map[string][]*serverParts), + localPeers: make(map[string]*serverParts), + reconcileCh: make(chan serf.Member, 32), + eventCh: make(chan serf.Event, 256), + evalBroker: evalBroker, + blockedEvals: blockedEvals, + planQueue: planQueue, + shutdownCh: make(chan struct{}), } // Create the periodic dispatcher for launching periodic jobs. @@ -233,6 +241,9 @@ func NewServer(config *Config) (*Server, error) { // Emit metrics for the plan queue go planQueue.EmitStats(time.Second, s.shutdownCh) + // Emit metrics for the blocked eval tracker. + go blockedEvals.EmitStats(time.Second, s.shutdownCh) + // Emit metrics go s.heartbeatStats() @@ -415,7 +426,7 @@ func (s *Server) setupRaft() error { // Create the FSM var err error - s.fsm, err = NewFSM(s.evalBroker, s.periodicDispatcher, s.config.LogOutput) + s.fsm, err = NewFSM(s.evalBroker, s.periodicDispatcher, s.blockedEvals, s.config.LogOutput) if err != nil { return err } diff --git a/nomad/structs/node_class.go b/nomad/structs/node_class.go index b37bf926a07..44f0375d5a4 100644 --- a/nomad/structs/node_class.go +++ b/nomad/structs/node_class.go @@ -34,7 +34,7 @@ func (n *Node) ComputeClass() error { return err } - n.ComputedClass = hash + n.ComputedClass = fmt.Sprintf("v1:%d", hash) return nil } diff --git a/nomad/structs/node_class_test.go b/nomad/structs/node_class_test.go index 27c72d96012..1759bece79f 100644 --- a/nomad/structs/node_class_test.go +++ b/nomad/structs/node_class_test.go @@ -46,7 +46,7 @@ func TestNode_ComputedClass(t *testing.T) { if err := n.ComputeClass(); err != nil { t.Fatalf("ComputeClass() failed: %v", err) } - if n.ComputedClass == 0 { + if n.ComputedClass == "" { t.Fatal("ComputeClass() didn't set computed class") } old := n.ComputedClass @@ -64,7 +64,7 @@ func TestNode_ComputedClass(t *testing.T) { if err := n.ComputeClass(); err != nil { t.Fatalf("ComputeClass() failed: %v", err) } - if n.ComputedClass == 0 { + if n.ComputedClass == "" { t.Fatal("ComputeClass() didn't set computed class") } @@ -79,7 +79,7 @@ func TestNode_ComputedClass_Ignore(t *testing.T) { if err := n.ComputeClass(); err != nil { t.Fatalf("ComputeClass() failed: %v", err) } - if n.ComputedClass == 0 { + if n.ComputedClass == "" { t.Fatal("ComputeClass() didn't set computed class") } old := n.ComputedClass @@ -89,7 +89,7 @@ func TestNode_ComputedClass_Ignore(t *testing.T) { if err := n.ComputeClass(); err != nil { t.Fatalf("ComputeClass() failed: %v", err) } - if n.ComputedClass == 0 { + if n.ComputedClass == "" { t.Fatal("ComputeClass() didn't set computed class") } @@ -104,7 +104,7 @@ func TestNode_ComputedClass_Attr(t *testing.T) { if err := n.ComputeClass(); err != nil { t.Fatalf("ComputeClass() failed: %v", err) } - if n.ComputedClass == 0 { + if n.ComputedClass == "" { t.Fatal("ComputeClass() didn't set computed class") } old := n.ComputedClass @@ -123,7 +123,7 @@ func TestNode_ComputedClass_Attr(t *testing.T) { if err := n.ComputeClass(); err != nil { t.Fatalf("ComputeClass() failed: %v", err) } - if n.ComputedClass == 0 { + if n.ComputedClass == "" { t.Fatal("ComputeClass() didn't set computed class") } if old == n.ComputedClass { @@ -138,7 +138,7 @@ func TestNode_ComputedClass_Meta(t *testing.T) { if err := n.ComputeClass(); err != nil { t.Fatalf("ComputeClass() failed: %v", err) } - if n.ComputedClass == 0 { + if n.ComputedClass == "" { t.Fatal("ComputeClass() didn't set computed class") } old := n.ComputedClass @@ -148,7 +148,7 @@ func TestNode_ComputedClass_Meta(t *testing.T) { if err := n.ComputeClass(); err != nil { t.Fatalf("ComputeClass() failed: %v", err) } - if n.ComputedClass == 0 { + if n.ComputedClass == "" { t.Fatal("ComputeClass() didn't set computed class") } if old == n.ComputedClass { @@ -161,7 +161,7 @@ func TestNode_ComputedClass_Meta(t *testing.T) { if err := n.ComputeClass(); err != nil { t.Fatalf("ComputeClass() failed: %v", err) } - if n.ComputedClass == 0 { + if n.ComputedClass == "" { t.Fatal("ComputeClass() didn't set computed class") } if old != n.ComputedClass { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index fa7ac8bfc84..6ba1d767d08 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -512,7 +512,7 @@ type Node struct { // ComputedClass is a unique id that identifies nodes with a common set of // attributes and capabilities. - ComputedClass uint64 + ComputedClass string // Drain is controlled by the servers, and not the client. // If true, no jobs will be scheduled to this node, and existing @@ -1824,9 +1824,11 @@ func (a *AllocMetric) ScoreNode(node *Node, name string, score float64) { } const ( - EvalStatusPending = "pending" - EvalStatusComplete = "complete" - EvalStatusFailed = "failed" + EvalStatusBlocked = "blocked" + EvalStatusPending = "pending" + EvalStatusComplete = "complete" + EvalStatusFailed = "failed" + EvalStatusCancelled = "canceled" ) const ( @@ -1912,6 +1914,14 @@ type Evaluation struct { // This is used to support rolling upgrades, where we need a chain of evaluations. PreviousEval string + // ClassEligibility tracks computed node classes that have been explicitely + // marked as eligible or ineligible. + ClassEligibility map[string]bool + + // EscapedComputedClass marks whether the job has constraints that are not + // captured by computed node classes. + EscapedComputedClass bool + // Raft Indexes CreateIndex uint64 ModifyIndex uint64 @@ -1921,7 +1931,7 @@ type Evaluation struct { // will no longer transition. func (e *Evaluation) TerminalStatus() bool { switch e.Status { - case EvalStatusComplete, EvalStatusFailed: + case EvalStatusComplete, EvalStatusFailed, EvalStatusCancelled: return true default: return false @@ -1938,12 +1948,26 @@ func (e *Evaluation) Copy() *Evaluation { return ne } -// ShouldEnqueue checks if a given evaluation should be enqueued +// ShouldEnqueue checks if a given evaluation should be enqueued into the +// eval_broker func (e *Evaluation) ShouldEnqueue() bool { switch e.Status { case EvalStatusPending: return true - case EvalStatusComplete, EvalStatusFailed: + case EvalStatusComplete, EvalStatusFailed, EvalStatusBlocked, EvalStatusCancelled: + return false + default: + panic(fmt.Sprintf("unhandled evaluation (%s) status %s", e.ID, e.Status)) + } +} + +// ShouldBlock checks if a given evaluation should be entered into the blocked +// eval tracker. +func (e *Evaluation) ShouldBlock() bool { + switch e.Status { + case EvalStatusBlocked: + return true + case EvalStatusComplete, EvalStatusFailed, EvalStatusPending, EvalStatusCancelled: return false default: panic(fmt.Sprintf("unhandled evaluation (%s) status %s", e.ID, e.Status)) @@ -1980,6 +2004,24 @@ func (e *Evaluation) NextRollingEval(wait time.Duration) *Evaluation { } } +// BlockedEval creates a blocked evaluation to followup this eval to place any +// failed allocations. It takes the classes marked explicitely eligible or +// ineligible and whether the job has escaped computed node classes. +func (e *Evaluation) BlockedEval(classEligibility map[string]bool, escaped bool) *Evaluation { + return &Evaluation{ + ID: GenerateUUID(), + Priority: e.Priority, + Type: e.Type, + TriggeredBy: e.TriggeredBy, + JobID: e.JobID, + JobModifyIndex: e.JobModifyIndex, + Status: EvalStatusBlocked, + PreviousEval: e.ID, + ClassEligibility: classEligibility, + EscapedComputedClass: escaped, + } +} + // Plan is used to submit a commit plan for task allocations. These // are submitted to the leader which verifies that resources have // not been overcommitted before admiting the plan. diff --git a/scheduler/context.go b/scheduler/context.go index f4b92a17014..0a765944fc7 100644 --- a/scheduler/context.go +++ b/scheduler/context.go @@ -164,14 +164,14 @@ const ( // course of an evaluation. type EvalEligibility struct { // job tracks the eligibility at the job level per computed node class. - job map[uint64]ComputedClassFeasibility + job map[string]ComputedClassFeasibility // jobEscaped marks whether constraints have escaped at the job level. jobEscaped bool // taskGroups tracks the eligibility at the task group level per computed // node class. - taskGroups map[string]map[uint64]ComputedClassFeasibility + taskGroups map[string]map[string]ComputedClassFeasibility // tgEscapedConstraints is a map of task groups to whether constraints have // escaped. @@ -181,8 +181,8 @@ type EvalEligibility struct { // NewEvalEligibility returns an eligibility tracker for the context of an evaluation. func NewEvalEligibility() *EvalEligibility { return &EvalEligibility{ - job: make(map[uint64]ComputedClassFeasibility), - taskGroups: make(map[string]map[uint64]ComputedClassFeasibility), + job: make(map[string]ComputedClassFeasibility), + taskGroups: make(map[string]map[string]ComputedClassFeasibility), tgEscapedConstraints: make(map[string]bool), } } @@ -220,12 +220,47 @@ func (e *EvalEligibility) HasEscaped() bool { return false } +// GetClasses returns the tracked classes to their eligibility, across the job +// and task groups. +func (e *EvalEligibility) GetClasses() map[string]bool { + elig := make(map[string]bool) + + // Go through the job. + for class, feas := range e.job { + switch feas { + case EvalComputedClassEligible: + elig[class] = true + case EvalComputedClassIneligible: + elig[class] = false + } + } + + // Go through the task groups. + for _, classes := range e.taskGroups { + for class, feas := range classes { + switch feas { + case EvalComputedClassEligible: + elig[class] = true + case EvalComputedClassIneligible: + // Only mark as ineligible if it hasn't been marked before. This + // prevents one task group marking a class as ineligible when it + // is eligible on another task group. + if _, ok := elig[class]; !ok { + elig[class] = false + } + } + } + } + + return elig +} + // JobStatus returns the eligibility status of the job. -func (e *EvalEligibility) JobStatus(class uint64) ComputedClassFeasibility { +func (e *EvalEligibility) JobStatus(class string) ComputedClassFeasibility { // COMPAT: Computed node class was introduced in 0.3. Clients running < 0.3 // will not have a computed class. The safest value to return is the escaped // case, since it disables any optimization. - if e.jobEscaped || class == 0 { + if e.jobEscaped || class == "" { fmt.Println(e.jobEscaped, class) return EvalComputedClassEscaped } @@ -238,7 +273,7 @@ func (e *EvalEligibility) JobStatus(class uint64) ComputedClassFeasibility { // SetJobEligibility sets the eligibility status of the job for the computed // node class. -func (e *EvalEligibility) SetJobEligibility(eligible bool, class uint64) { +func (e *EvalEligibility) SetJobEligibility(eligible bool, class string) { if eligible { e.job[class] = EvalComputedClassEligible } else { @@ -247,11 +282,11 @@ func (e *EvalEligibility) SetJobEligibility(eligible bool, class uint64) { } // TaskGroupStatus returns the eligibility status of the task group. -func (e *EvalEligibility) TaskGroupStatus(tg string, class uint64) ComputedClassFeasibility { +func (e *EvalEligibility) TaskGroupStatus(tg, class string) ComputedClassFeasibility { // COMPAT: Computed node class was introduced in 0.3. Clients running < 0.3 // will not have a computed class. The safest value to return is the escaped // case, since it disables any optimization. - if class == 0 { + if class == "" { return EvalComputedClassEscaped } @@ -271,7 +306,7 @@ func (e *EvalEligibility) TaskGroupStatus(tg string, class uint64) ComputedClass // SetTaskGroupEligibility sets the eligibility status of the task group for the // computed node class. -func (e *EvalEligibility) SetTaskGroupEligibility(eligible bool, tg string, class uint64) { +func (e *EvalEligibility) SetTaskGroupEligibility(eligible bool, tg, class string) { var eligibility ComputedClassFeasibility if eligible { eligibility = EvalComputedClassEligible @@ -282,6 +317,6 @@ func (e *EvalEligibility) SetTaskGroupEligibility(eligible bool, tg string, clas if classes, ok := e.taskGroups[tg]; ok { classes[class] = eligibility } else { - e.taskGroups[tg] = map[uint64]ComputedClassFeasibility{class: eligibility} + e.taskGroups[tg] = map[string]ComputedClassFeasibility{class: eligibility} } } diff --git a/scheduler/context_test.go b/scheduler/context_test.go index 27d906ddbf1..b8b3cbfa2df 100644 --- a/scheduler/context_test.go +++ b/scheduler/context_test.go @@ -3,6 +3,7 @@ package scheduler import ( "log" "os" + "reflect" "testing" "github.com/hashicorp/nomad/nomad/mock" @@ -111,7 +112,7 @@ func TestEvalContext_ProposedAlloc(t *testing.T) { func TestEvalEligibility_JobStatus(t *testing.T) { e := NewEvalEligibility() - cc := uint64(100) + cc := "v1:100" // Get the job before its been set. if status := e.JobStatus(cc); status != EvalComputedClassUnknown { @@ -129,15 +130,15 @@ func TestEvalEligibility_JobStatus(t *testing.T) { t.Fatalf("JobStatus() returned %v; want %v", status, EvalComputedClassEligible) } - // Check that if I pass class zero it returns escaped - if status := e.JobStatus(0); status != EvalComputedClassEscaped { + // Check that if I pass an empty class it returns escaped + if status := e.JobStatus(""); status != EvalComputedClassEscaped { t.Fatalf("JobStatus() returned %v; want %v", status, EvalComputedClassEscaped) } } func TestEvalEligibility_TaskGroupStatus(t *testing.T) { e := NewEvalEligibility() - cc := uint64(100) + cc := "v1:100" tg := "foo" // Get the tg before its been set. @@ -156,8 +157,8 @@ func TestEvalEligibility_TaskGroupStatus(t *testing.T) { t.Fatalf("TaskGroupStatus() returned %v; want %v", status, EvalComputedClassEligible) } - // Check that if I pass class zero it returns escaped - if status := e.TaskGroupStatus(tg, 0); status != EvalComputedClassEscaped { + // Check that if I pass an empty class it returns escaped + if status := e.TaskGroupStatus(tg, ""); status != EvalComputedClassEscaped { t.Fatalf("TaskGroupStatus() returned %v; want %v", status, EvalComputedClassEscaped) } } @@ -206,3 +207,29 @@ func TestEvalEligibility_SetJob(t *testing.T) { t.Fatalf("SetJob() should mark task group as escaped") } } + +func TestEvalEligibility_GetClasses(t *testing.T) { + e := NewEvalEligibility() + e.SetJobEligibility(true, "v1:1") + e.SetJobEligibility(false, "v1:2") + e.SetTaskGroupEligibility(true, "foo", "v1:3") + e.SetTaskGroupEligibility(false, "bar", "v1:4") + e.SetTaskGroupEligibility(true, "bar", "v1:5") + + // Mark an existing eligible class as ineligible in the TG. + e.SetTaskGroupEligibility(false, "fizz", "v1:1") + e.SetTaskGroupEligibility(false, "fizz", "v1:3") + + expClasses := map[string]bool{ + "v1:1": true, + "v1:2": false, + "v1:3": true, + "v1:4": false, + "v1:5": true, + } + + actClasses := e.GetClasses() + if !reflect.DeepEqual(actClasses, expClasses) { + t.Fatalf("GetClasses() returned %#v; want %#v", actClasses, expClasses) + } +} diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index 229eff5946e..5a69761ad97 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -733,7 +733,7 @@ func TestFeasibilityWrapper_JobEligible_TgEscaped(t *testing.T) { cc := nodes[0].ComputedClass ctx.Eligibility().job[cc] = EvalComputedClassEligible ctx.Eligibility().taskGroups["foo"] = - map[uint64]ComputedClassFeasibility{cc: EvalComputedClassEscaped} + map[string]ComputedClassFeasibility{cc: EvalComputedClassEscaped} wrapper.SetTaskGroup("foo") // Run the wrapper. diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 0f1fb757ae6..36d459efa18 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -58,6 +58,8 @@ type GenericScheduler struct { limitReached bool nextEval *structs.Evaluation + + blocked *structs.Evaluation } // NewServiceScheduler is a factory function to instantiate a new service scheduler @@ -158,6 +160,19 @@ func (s *GenericScheduler) process() (bool, error) { s.logger.Printf("[DEBUG] sched: %#v: rolling update limit reached, next eval '%s' created", s.eval, s.nextEval.ID) } + // If there are failed allocations, we need to create a blocked evaluation + // to place the failed allocations when resources become available. + if len(s.plan.FailedAllocs) != 0 && s.blocked == nil { + e := s.ctx.Eligibility() + classes := e.GetClasses() + s.blocked = s.eval.BlockedEval(classes, e.HasEscaped()) + if err := s.planner.CreateEval(s.blocked); err != nil { + s.logger.Printf("[ERR] sched: %#v failed to make blocked eval: %v", s.eval, err) + return false, err + } + s.logger.Printf("[DEBUG] sched: %#v: failed to place all allocations, blocked eval '%s' created", s.eval, s.blocked.ID) + } + // Submit the plan result, newState, err := s.planner.SubmitPlan(s.plan) if err != nil { diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 81cc3fd5700..2c77eff29ce 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -104,6 +104,11 @@ func TestServiceSched_JobRegister_AllocFail(t *testing.T) { } plan := h.Plans[0] + // Ensure the plan has created a follow up eval. + if len(h.CreateEvals) != 1 || h.CreateEvals[0].Status != structs.EvalStatusBlocked { + t.Fatalf("bad: %#v", h.CreateEvals) + } + // Ensure the plan failed to alloc if len(plan.FailedAllocs) != 1 { t.Fatalf("bad: %#v", plan) @@ -131,6 +136,94 @@ func TestServiceSched_JobRegister_AllocFail(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } +func TestServiceSched_JobRegister_BlockedEval(t *testing.T) { + h := NewHarness(t) + + // Create a full node + node := mock.Node() + node.Reserved = node.Resources + node.ComputeClass() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + + // Create an ineligible node + node2 := mock.Node() + node2.Attributes["kernel.name"] = "windows" + node2.ComputeClass() + noErr(t, h.State.UpsertNode(h.NextIndex(), node2)) + + // Create a jobs + job := mock.Job() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan has created a follow up eval. + if len(h.CreateEvals) != 1 { + t.Fatalf("bad: %#v", h.CreateEvals) + } + + created := h.CreateEvals[0] + if created.Status != structs.EvalStatusBlocked { + t.Fatalf("bad: %#v", created) + } + + classes := created.ClassEligibility + if len(classes) != 2 || !classes[node.ComputedClass] || classes[node2.ComputedClass] { + t.Fatalf("bad: %#v", classes) + } + + if created.EscapedComputedClass { + t.Fatalf("bad: %#v", created) + } + + // Ensure the plan failed to alloc + if len(plan.FailedAllocs) != 1 { + t.Fatalf("bad: %#v", plan) + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure all allocations placed + if len(out) != 1 { + for _, a := range out { + t.Logf("%#v", a) + } + t.Fatalf("bad: %#v", out) + } + + // Check the coalesced failures + if out[0].Metrics.CoalescedFailures != 9 { + t.Fatalf("bad: %#v", out[0].Metrics) + } + + // Check the available nodes + if count, ok := out[0].Metrics.NodesAvailable["dc1"]; !ok || count != 2 { + t.Fatalf("bad: %#v", out[0].Metrics) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + func TestServiceSched_JobModify(t *testing.T) { h := NewHarness(t)