From 6df0da58f8bb89e4acdf63acddd4a43ccfbf75ea Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 22 Jun 2016 09:04:22 -0700 Subject: [PATCH 1/3] Worker waitForIndex uses StateStore index, not Raft Applied Index --- nomad/core_sched_test.go | 33 ++++++++++++--------------------- nomad/leader.go | 24 +++++++++++++++++++----- nomad/system_endpoint.go | 16 +++++++++++++++- nomad/system_endpoint_test.go | 5 +---- nomad/worker.go | 15 +++++++++++++-- nomad/worker_test.go | 5 ++++- 6 files changed, 64 insertions(+), 34 deletions(-) diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index f0d6c4165ec..ae9aebc8fbc 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -44,8 +44,7 @@ func TestCoreScheduler_EvalGC(t *testing.T) { core := NewCoreScheduler(s1, snap) // Attempt the GC - gc := s1.coreJobEval(structs.CoreJobEvalGC) - gc.ModifyIndex = 2000 + gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000) err = core.Process(gc) if err != nil { t.Fatalf("err: %v", err) @@ -112,8 +111,7 @@ func TestCoreScheduler_EvalGC_Partial(t *testing.T) { core := NewCoreScheduler(s1, snap) // Attempt the GC - gc := s1.coreJobEval(structs.CoreJobEvalGC) - gc.ModifyIndex = 2000 + gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000) err = core.Process(gc) if err != nil { t.Fatalf("err: %v", err) @@ -173,8 +171,7 @@ func TestCoreScheduler_EvalGC_Batch_NoAllocs(t *testing.T) { core := NewCoreScheduler(s1, snap) // Attempt the GC - gc := s1.coreJobEval(structs.CoreJobEvalGC) - gc.ModifyIndex = 2000 + gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000) err = core.Process(gc) if err != nil { t.Fatalf("err: %v", err) @@ -235,8 +232,7 @@ func TestCoreScheduler_EvalGC_Batch_Allocs_WithJob(t *testing.T) { core := NewCoreScheduler(s1, snap) // Attempt the GC - gc := s1.coreJobEval(structs.CoreJobEvalGC) - gc.ModifyIndex = 2000 + gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000) err = core.Process(gc) if err != nil { t.Fatalf("err: %v", err) @@ -296,8 +292,7 @@ func TestCoreScheduler_EvalGC_Batch_Allocs_NoJob(t *testing.T) { core := NewCoreScheduler(s1, snap) // Attempt the GC - gc := s1.coreJobEval(structs.CoreJobEvalGC) - gc.ModifyIndex = 2000 + gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000) err = core.Process(gc) if err != nil { t.Fatalf("err: %v", err) @@ -344,7 +339,7 @@ func TestCoreScheduler_EvalGC_Force(t *testing.T) { core := NewCoreScheduler(s1, snap) // Attempt the GC - gc := s1.coreJobEval(structs.CoreJobForceGC) + gc := s1.coreJobEval(structs.CoreJobForceGC, 1001) err = core.Process(gc) if err != nil { t.Fatalf("err: %v", err) @@ -394,8 +389,7 @@ func TestCoreScheduler_NodeGC(t *testing.T) { core := NewCoreScheduler(s1, snap) // Attempt the GC - gc := s1.coreJobEval(structs.CoreJobNodeGC) - gc.ModifyIndex = 2000 + gc := s1.coreJobEval(structs.CoreJobNodeGC, 2000) err = core.Process(gc) if err != nil { t.Fatalf("err: %v", err) @@ -444,8 +438,7 @@ func TestCoreScheduler_NodeGC_TerminalAllocs(t *testing.T) { core := NewCoreScheduler(s1, snap) // Attempt the GC - gc := s1.coreJobEval(structs.CoreJobNodeGC) - gc.ModifyIndex = 2000 + gc := s1.coreJobEval(structs.CoreJobNodeGC, 2000) err = core.Process(gc) if err != nil { t.Fatalf("err: %v", err) @@ -496,8 +489,7 @@ func TestCoreScheduler_NodeGC_RunningAllocs(t *testing.T) { core := NewCoreScheduler(s1, snap) // Attempt the GC - gc := s1.coreJobEval(structs.CoreJobNodeGC) - gc.ModifyIndex = 2000 + gc := s1.coreJobEval(structs.CoreJobNodeGC, 2000) err = core.Process(gc) if err != nil { t.Fatalf("err: %v", err) @@ -535,7 +527,7 @@ func TestCoreScheduler_NodeGC_Force(t *testing.T) { core := NewCoreScheduler(s1, snap) // Attempt the GC - gc := s1.coreJobEval(structs.CoreJobForceGC) + gc := s1.coreJobEval(structs.CoreJobForceGC, 1000) err = core.Process(gc) if err != nil { t.Fatalf("err: %v", err) @@ -621,8 +613,7 @@ func TestCoreScheduler_JobGC(t *testing.T) { core := NewCoreScheduler(s1, snap) // Attempt the GC - gc := s1.coreJobEval(structs.CoreJobJobGC) - gc.ModifyIndex = 2000 + gc := s1.coreJobEval(structs.CoreJobJobGC, 2000) err = core.Process(gc) if err != nil { t.Fatalf("test(%s) err: %v", test.test, err) @@ -721,7 +712,7 @@ func TestCoreScheduler_JobGC_Force(t *testing.T) { core := NewCoreScheduler(s1, snap) // Attempt the GC - gc := s1.coreJobEval(structs.CoreJobForceGC) + gc := s1.coreJobEval(structs.CoreJobForceGC, 1002) err = core.Process(gc) if err != nil { t.Fatalf("test(%s) err: %v", test.test, err) diff --git a/nomad/leader.go b/nomad/leader.go index f1e10b71d40..bc5cc23ff99 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -252,13 +252,27 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) { defer jobGC.Stop() for { + // Snapshot the current state + snap, err := s.fsm.State().Snapshot() + if err != nil { + s.logger.Printf("[ERR] nomad: failed to snapshot state for periodic GC: %v", err) + continue + } + + // Store the snapshot's index + snapshotIndex, err := snap.LatestIndex() + if err != nil { + s.logger.Printf("[ERR] nomad: failed to determine snapshot's index for periodic GC: %v", err) + continue + } + select { case <-evalGC.C: - s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobEvalGC)) + s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobEvalGC, snapshotIndex)) case <-nodeGC.C: - s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobNodeGC)) + s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobNodeGC, snapshotIndex)) case <-jobGC.C: - s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobJobGC)) + s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobJobGC, snapshotIndex)) case <-stopCh: return } @@ -266,7 +280,7 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) { } // coreJobEval returns an evaluation for a core job -func (s *Server) coreJobEval(job string) *structs.Evaluation { +func (s *Server) coreJobEval(job string, modifyIndex uint64) *structs.Evaluation { return &structs.Evaluation{ ID: structs.GenerateUUID(), Priority: structs.CoreJobPriority, @@ -274,7 +288,7 @@ func (s *Server) coreJobEval(job string) *structs.Evaluation { TriggeredBy: structs.EvalTriggerScheduled, JobID: job, Status: structs.EvalStatusPending, - ModifyIndex: s.raft.AppliedIndex(), + ModifyIndex: modifyIndex, } } diff --git a/nomad/system_endpoint.go b/nomad/system_endpoint.go index 510dfef1ae9..46ca1505333 100644 --- a/nomad/system_endpoint.go +++ b/nomad/system_endpoint.go @@ -1,6 +1,8 @@ package nomad import ( + "fmt" + "github.com/hashicorp/nomad/nomad/structs" ) @@ -16,6 +18,18 @@ func (s *System) GarbageCollect(args *structs.GenericRequest, reply *structs.Gen return err } - s.srv.evalBroker.Enqueue(s.srv.coreJobEval(structs.CoreJobForceGC)) + // Snapshot the current state + snap, err := s.srv.fsm.State().Snapshot() + if err != nil { + return fmt.Errorf("failed to snapshot state: %v", err) + } + + // Store the snapshot's index + snapshotIndex, err := snap.LatestIndex() + if err != nil { + return fmt.Errorf("failed to determine snapshot's index: %v", err) + } + + s.srv.evalBroker.Enqueue(s.srv.coreJobEval(structs.CoreJobForceGC, snapshotIndex)) return nil } diff --git a/nomad/system_endpoint_test.go b/nomad/system_endpoint_test.go index 4302d9435dc..9f4cc1f9d6b 100644 --- a/nomad/system_endpoint_test.go +++ b/nomad/system_endpoint_test.go @@ -11,9 +11,6 @@ import ( ) func TestSystemEndpoint_GarbageCollect(t *testing.T) { - //s1 := testServer(t, func(c *Config) { - //c.NumSchedulers = 0 // Prevent automatic dequeue - //}) s1 := testServer(t, nil) defer s1.Shutdown() codec := rpcClient(t, s1) @@ -23,7 +20,7 @@ func TestSystemEndpoint_GarbageCollect(t *testing.T) { state := s1.fsm.State() job := mock.Job() job.Type = structs.JobTypeBatch - if err := state.UpsertJob(0, job); err != nil { + if err := state.UpsertJob(1000, job); err != nil { t.Fatalf("UpsertAllocs() failed: %v", err) } diff --git a/nomad/worker.go b/nomad/worker.go index 9326f112e44..7717600ec76 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -215,9 +215,20 @@ func (w *Worker) waitForIndex(index uint64, timeout time.Duration) error { start := time.Now() defer metrics.MeasureSince([]string{"nomad", "worker", "wait_for_index"}, start) CHECK: + // Snapshot the current state + snap, err := w.srv.fsm.State().Snapshot() + if err != nil { + return fmt.Errorf("failed to snapshot state: %v", err) + } + + // Store the snapshot's index + snapshotIndex, err := snap.LatestIndex() + if err != nil { + return fmt.Errorf("failed to determine snapshot's index: %v", err) + } + // We only need the FSM state to be as recent as the given index - appliedIndex := w.srv.raft.AppliedIndex() - if index <= appliedIndex { + if index <= snapshotIndex { w.backoffReset() return nil } diff --git a/nomad/worker_test.go b/nomad/worker_test.go index 52b557bba6e..0a04a92cbf5 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -203,7 +203,10 @@ func TestWorker_waitForIndex(t *testing.T) { // Cause an increment go func() { time.Sleep(10 * time.Millisecond) - s1.raft.Barrier(0) + n := mock.Node() + if err := s1.fsm.state.UpsertNode(index+1, n); err != nil { + t.Fatalf("failed to upsert node: %v", err) + } }() // Wait for a future index From 1f169fb83f2d60c37166294269816d404f1e5fcf Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 22 Jun 2016 09:11:25 -0700 Subject: [PATCH 2/3] tighter index bound when creating GC evals --- nomad/leader.go | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/nomad/leader.go b/nomad/leader.go index bc5cc23ff99..d69478a80f2 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -251,28 +251,41 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) { jobGC := time.NewTicker(s.config.JobGCInterval) defer jobGC.Stop() - for { + // getLatest grabs the latest index from the state store. It returns true if + // the index was retrieved successfully. + getLatest := func() (uint64, bool) { // Snapshot the current state snap, err := s.fsm.State().Snapshot() if err != nil { s.logger.Printf("[ERR] nomad: failed to snapshot state for periodic GC: %v", err) - continue + return 0, false } // Store the snapshot's index snapshotIndex, err := snap.LatestIndex() if err != nil { s.logger.Printf("[ERR] nomad: failed to determine snapshot's index for periodic GC: %v", err) - continue + return 0, false } + return snapshotIndex, true + } + + for { + select { case <-evalGC.C: - s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobEvalGC, snapshotIndex)) + if index, ok := getLatest(); ok { + s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobEvalGC, index)) + } case <-nodeGC.C: - s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobNodeGC, snapshotIndex)) + if index, ok := getLatest(); ok { + s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobNodeGC, index)) + } case <-jobGC.C: - s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobJobGC, snapshotIndex)) + if index, ok := getLatest(); ok { + s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobJobGC, index)) + } case <-stopCh: return } From e6abae77ff7bed0aa0c29b192beb7ef3332ed1f3 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 22 Jun 2016 09:33:15 -0700 Subject: [PATCH 3/3] Do not use snapshot --- nomad/leader.go | 12 ++---------- nomad/system_endpoint.go | 12 +++--------- nomad/worker.go | 16 +++++++--------- 3 files changed, 12 insertions(+), 28 deletions(-) diff --git a/nomad/leader.go b/nomad/leader.go index d69478a80f2..b3964cf8ffc 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -254,17 +254,9 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) { // getLatest grabs the latest index from the state store. It returns true if // the index was retrieved successfully. getLatest := func() (uint64, bool) { - // Snapshot the current state - snap, err := s.fsm.State().Snapshot() + snapshotIndex, err := s.fsm.State().LatestIndex() if err != nil { - s.logger.Printf("[ERR] nomad: failed to snapshot state for periodic GC: %v", err) - return 0, false - } - - // Store the snapshot's index - snapshotIndex, err := snap.LatestIndex() - if err != nil { - s.logger.Printf("[ERR] nomad: failed to determine snapshot's index for periodic GC: %v", err) + s.logger.Printf("[ERR] nomad: failed to determine state store's index: %v", err) return 0, false } diff --git a/nomad/system_endpoint.go b/nomad/system_endpoint.go index 46ca1505333..3c3e4529d52 100644 --- a/nomad/system_endpoint.go +++ b/nomad/system_endpoint.go @@ -18,16 +18,10 @@ func (s *System) GarbageCollect(args *structs.GenericRequest, reply *structs.Gen return err } - // Snapshot the current state - snap, err := s.srv.fsm.State().Snapshot() + // Get the states current index + snapshotIndex, err := s.srv.fsm.State().LatestIndex() if err != nil { - return fmt.Errorf("failed to snapshot state: %v", err) - } - - // Store the snapshot's index - snapshotIndex, err := snap.LatestIndex() - if err != nil { - return fmt.Errorf("failed to determine snapshot's index: %v", err) + return fmt.Errorf("failed to determine state store's index: %v", err) } s.srv.evalBroker.Enqueue(s.srv.coreJobEval(structs.CoreJobForceGC, snapshotIndex)) diff --git a/nomad/worker.go b/nomad/worker.go index 7717600ec76..0c414fc900e 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -212,19 +212,17 @@ func (w *Worker) sendAck(evalID, token string, ack bool) { // state (attempt to allocate to a failed/dead node), we may need // to sync our state again and do the planning with more recent data. func (w *Worker) waitForIndex(index uint64, timeout time.Duration) error { + // XXX: Potential optimization is to set up a watch on the state stores + // index table and only unblock via a trigger rather than timing out and + // checking. + start := time.Now() defer metrics.MeasureSince([]string{"nomad", "worker", "wait_for_index"}, start) CHECK: - // Snapshot the current state - snap, err := w.srv.fsm.State().Snapshot() + // Get the states current index + snapshotIndex, err := w.srv.fsm.State().LatestIndex() if err != nil { - return fmt.Errorf("failed to snapshot state: %v", err) - } - - // Store the snapshot's index - snapshotIndex, err := snap.LatestIndex() - if err != nil { - return fmt.Errorf("failed to determine snapshot's index: %v", err) + return fmt.Errorf("failed to determine state store's index: %v", err) } // We only need the FSM state to be as recent as the given index