diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index f0d6c4165ec..ae9aebc8fbc 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -44,8 +44,7 @@ func TestCoreScheduler_EvalGC(t *testing.T) { core := NewCoreScheduler(s1, snap) // Attempt the GC - gc := s1.coreJobEval(structs.CoreJobEvalGC) - gc.ModifyIndex = 2000 + gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000) err = core.Process(gc) if err != nil { t.Fatalf("err: %v", err) @@ -112,8 +111,7 @@ func TestCoreScheduler_EvalGC_Partial(t *testing.T) { core := NewCoreScheduler(s1, snap) // Attempt the GC - gc := s1.coreJobEval(structs.CoreJobEvalGC) - gc.ModifyIndex = 2000 + gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000) err = core.Process(gc) if err != nil { t.Fatalf("err: %v", err) @@ -173,8 +171,7 @@ func TestCoreScheduler_EvalGC_Batch_NoAllocs(t *testing.T) { core := NewCoreScheduler(s1, snap) // Attempt the GC - gc := s1.coreJobEval(structs.CoreJobEvalGC) - gc.ModifyIndex = 2000 + gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000) err = core.Process(gc) if err != nil { t.Fatalf("err: %v", err) @@ -235,8 +232,7 @@ func TestCoreScheduler_EvalGC_Batch_Allocs_WithJob(t *testing.T) { core := NewCoreScheduler(s1, snap) // Attempt the GC - gc := s1.coreJobEval(structs.CoreJobEvalGC) - gc.ModifyIndex = 2000 + gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000) err = core.Process(gc) if err != nil { t.Fatalf("err: %v", err) @@ -296,8 +292,7 @@ func TestCoreScheduler_EvalGC_Batch_Allocs_NoJob(t *testing.T) { core := NewCoreScheduler(s1, snap) // Attempt the GC - gc := s1.coreJobEval(structs.CoreJobEvalGC) - gc.ModifyIndex = 2000 + gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000) err = core.Process(gc) if err != nil { t.Fatalf("err: %v", err) @@ -344,7 +339,7 @@ func TestCoreScheduler_EvalGC_Force(t *testing.T) { core := NewCoreScheduler(s1, snap) // Attempt the GC - gc := s1.coreJobEval(structs.CoreJobForceGC) + gc := s1.coreJobEval(structs.CoreJobForceGC, 1001) err = core.Process(gc) if err != nil { t.Fatalf("err: %v", err) @@ -394,8 +389,7 @@ func TestCoreScheduler_NodeGC(t *testing.T) { core := NewCoreScheduler(s1, snap) // Attempt the GC - gc := s1.coreJobEval(structs.CoreJobNodeGC) - gc.ModifyIndex = 2000 + gc := s1.coreJobEval(structs.CoreJobNodeGC, 2000) err = core.Process(gc) if err != nil { t.Fatalf("err: %v", err) @@ -444,8 +438,7 @@ func TestCoreScheduler_NodeGC_TerminalAllocs(t *testing.T) { core := NewCoreScheduler(s1, snap) // Attempt the GC - gc := s1.coreJobEval(structs.CoreJobNodeGC) - gc.ModifyIndex = 2000 + gc := s1.coreJobEval(structs.CoreJobNodeGC, 2000) err = core.Process(gc) if err != nil { t.Fatalf("err: %v", err) @@ -496,8 +489,7 @@ func TestCoreScheduler_NodeGC_RunningAllocs(t *testing.T) { core := NewCoreScheduler(s1, snap) // Attempt the GC - gc := s1.coreJobEval(structs.CoreJobNodeGC) - gc.ModifyIndex = 2000 + gc := s1.coreJobEval(structs.CoreJobNodeGC, 2000) err = core.Process(gc) if err != nil { t.Fatalf("err: %v", err) @@ -535,7 +527,7 @@ func TestCoreScheduler_NodeGC_Force(t *testing.T) { core := NewCoreScheduler(s1, snap) // Attempt the GC - gc := s1.coreJobEval(structs.CoreJobForceGC) + gc := s1.coreJobEval(structs.CoreJobForceGC, 1000) err = core.Process(gc) if err != nil { t.Fatalf("err: %v", err) @@ -621,8 +613,7 @@ func TestCoreScheduler_JobGC(t *testing.T) { core := NewCoreScheduler(s1, snap) // Attempt the GC - gc := s1.coreJobEval(structs.CoreJobJobGC) - gc.ModifyIndex = 2000 + gc := s1.coreJobEval(structs.CoreJobJobGC, 2000) err = core.Process(gc) if err != nil { t.Fatalf("test(%s) err: %v", test.test, err) @@ -721,7 +712,7 @@ func TestCoreScheduler_JobGC_Force(t *testing.T) { core := NewCoreScheduler(s1, snap) // Attempt the GC - gc := s1.coreJobEval(structs.CoreJobForceGC) + gc := s1.coreJobEval(structs.CoreJobForceGC, 1002) err = core.Process(gc) if err != nil { t.Fatalf("test(%s) err: %v", test.test, err) diff --git a/nomad/leader.go b/nomad/leader.go index f1e10b71d40..b3964cf8ffc 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -251,14 +251,33 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) { jobGC := time.NewTicker(s.config.JobGCInterval) defer jobGC.Stop() + // getLatest grabs the latest index from the state store. It returns true if + // the index was retrieved successfully. + getLatest := func() (uint64, bool) { + snapshotIndex, err := s.fsm.State().LatestIndex() + if err != nil { + s.logger.Printf("[ERR] nomad: failed to determine state store's index: %v", err) + return 0, false + } + + return snapshotIndex, true + } + for { + select { case <-evalGC.C: - s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobEvalGC)) + if index, ok := getLatest(); ok { + s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobEvalGC, index)) + } case <-nodeGC.C: - s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobNodeGC)) + if index, ok := getLatest(); ok { + s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobNodeGC, index)) + } case <-jobGC.C: - s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobJobGC)) + if index, ok := getLatest(); ok { + s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobJobGC, index)) + } case <-stopCh: return } @@ -266,7 +285,7 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) { } // coreJobEval returns an evaluation for a core job -func (s *Server) coreJobEval(job string) *structs.Evaluation { +func (s *Server) coreJobEval(job string, modifyIndex uint64) *structs.Evaluation { return &structs.Evaluation{ ID: structs.GenerateUUID(), Priority: structs.CoreJobPriority, @@ -274,7 +293,7 @@ func (s *Server) coreJobEval(job string) *structs.Evaluation { TriggeredBy: structs.EvalTriggerScheduled, JobID: job, Status: structs.EvalStatusPending, - ModifyIndex: s.raft.AppliedIndex(), + ModifyIndex: modifyIndex, } } diff --git a/nomad/system_endpoint.go b/nomad/system_endpoint.go index 510dfef1ae9..3c3e4529d52 100644 --- a/nomad/system_endpoint.go +++ b/nomad/system_endpoint.go @@ -1,6 +1,8 @@ package nomad import ( + "fmt" + "github.com/hashicorp/nomad/nomad/structs" ) @@ -16,6 +18,12 @@ func (s *System) GarbageCollect(args *structs.GenericRequest, reply *structs.Gen return err } - s.srv.evalBroker.Enqueue(s.srv.coreJobEval(structs.CoreJobForceGC)) + // Get the states current index + snapshotIndex, err := s.srv.fsm.State().LatestIndex() + if err != nil { + return fmt.Errorf("failed to determine state store's index: %v", err) + } + + s.srv.evalBroker.Enqueue(s.srv.coreJobEval(structs.CoreJobForceGC, snapshotIndex)) return nil } diff --git a/nomad/system_endpoint_test.go b/nomad/system_endpoint_test.go index 4302d9435dc..9f4cc1f9d6b 100644 --- a/nomad/system_endpoint_test.go +++ b/nomad/system_endpoint_test.go @@ -11,9 +11,6 @@ import ( ) func TestSystemEndpoint_GarbageCollect(t *testing.T) { - //s1 := testServer(t, func(c *Config) { - //c.NumSchedulers = 0 // Prevent automatic dequeue - //}) s1 := testServer(t, nil) defer s1.Shutdown() codec := rpcClient(t, s1) @@ -23,7 +20,7 @@ func TestSystemEndpoint_GarbageCollect(t *testing.T) { state := s1.fsm.State() job := mock.Job() job.Type = structs.JobTypeBatch - if err := state.UpsertJob(0, job); err != nil { + if err := state.UpsertJob(1000, job); err != nil { t.Fatalf("UpsertAllocs() failed: %v", err) } diff --git a/nomad/worker.go b/nomad/worker.go index 9326f112e44..0c414fc900e 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -212,12 +212,21 @@ func (w *Worker) sendAck(evalID, token string, ack bool) { // state (attempt to allocate to a failed/dead node), we may need // to sync our state again and do the planning with more recent data. func (w *Worker) waitForIndex(index uint64, timeout time.Duration) error { + // XXX: Potential optimization is to set up a watch on the state stores + // index table and only unblock via a trigger rather than timing out and + // checking. + start := time.Now() defer metrics.MeasureSince([]string{"nomad", "worker", "wait_for_index"}, start) CHECK: + // Get the states current index + snapshotIndex, err := w.srv.fsm.State().LatestIndex() + if err != nil { + return fmt.Errorf("failed to determine state store's index: %v", err) + } + // We only need the FSM state to be as recent as the given index - appliedIndex := w.srv.raft.AppliedIndex() - if index <= appliedIndex { + if index <= snapshotIndex { w.backoffReset() return nil } diff --git a/nomad/worker_test.go b/nomad/worker_test.go index 52b557bba6e..0a04a92cbf5 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -203,7 +203,10 @@ func TestWorker_waitForIndex(t *testing.T) { // Cause an increment go func() { time.Sleep(10 * time.Millisecond) - s1.raft.Barrier(0) + n := mock.Node() + if err := s1.fsm.state.UpsertNode(index+1, n); err != nil { + t.Fatalf("failed to upsert node: %v", err) + } }() // Wait for a future index