Skip to content

Commit

Permalink
Merge pull request #1339 from hashicorp/b-wait-index-statestore
Browse files Browse the repository at this point in the history
Worker waitForIndex uses StateStore index, not Raft Applied Index
  • Loading branch information
dadgar authored Jun 22, 2016
2 parents 3bbf3a5 + e6abae7 commit 36c6a50
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 34 deletions.
33 changes: 12 additions & 21 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ func TestCoreScheduler_EvalGC(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -112,8 +111,7 @@ func TestCoreScheduler_EvalGC_Partial(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -173,8 +171,7 @@ func TestCoreScheduler_EvalGC_Batch_NoAllocs(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -235,8 +232,7 @@ func TestCoreScheduler_EvalGC_Batch_Allocs_WithJob(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -296,8 +292,7 @@ func TestCoreScheduler_EvalGC_Batch_Allocs_NoJob(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -344,7 +339,7 @@ func TestCoreScheduler_EvalGC_Force(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobForceGC)
gc := s1.coreJobEval(structs.CoreJobForceGC, 1001)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -394,8 +389,7 @@ func TestCoreScheduler_NodeGC(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobNodeGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobNodeGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -444,8 +438,7 @@ func TestCoreScheduler_NodeGC_TerminalAllocs(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobNodeGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobNodeGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -496,8 +489,7 @@ func TestCoreScheduler_NodeGC_RunningAllocs(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobNodeGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobNodeGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -535,7 +527,7 @@ func TestCoreScheduler_NodeGC_Force(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobForceGC)
gc := s1.coreJobEval(structs.CoreJobForceGC, 1000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -621,8 +613,7 @@ func TestCoreScheduler_JobGC(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobJobGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobJobGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
Expand Down Expand Up @@ -721,7 +712,7 @@ func TestCoreScheduler_JobGC_Force(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobForceGC)
gc := s1.coreJobEval(structs.CoreJobForceGC, 1002)
err = core.Process(gc)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
Expand Down
29 changes: 24 additions & 5 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,30 +251,49 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) {
jobGC := time.NewTicker(s.config.JobGCInterval)
defer jobGC.Stop()

// getLatest grabs the latest index from the state store. It returns true if
// the index was retrieved successfully.
getLatest := func() (uint64, bool) {
snapshotIndex, err := s.fsm.State().LatestIndex()
if err != nil {
s.logger.Printf("[ERR] nomad: failed to determine state store's index: %v", err)
return 0, false
}

return snapshotIndex, true
}

for {

select {
case <-evalGC.C:
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobEvalGC))
if index, ok := getLatest(); ok {
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobEvalGC, index))
}
case <-nodeGC.C:
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobNodeGC))
if index, ok := getLatest(); ok {
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobNodeGC, index))
}
case <-jobGC.C:
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobJobGC))
if index, ok := getLatest(); ok {
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobJobGC, index))
}
case <-stopCh:
return
}
}
}

// coreJobEval returns an evaluation for a core job
func (s *Server) coreJobEval(job string) *structs.Evaluation {
func (s *Server) coreJobEval(job string, modifyIndex uint64) *structs.Evaluation {
return &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: structs.CoreJobPriority,
Type: structs.JobTypeCore,
TriggeredBy: structs.EvalTriggerScheduled,
JobID: job,
Status: structs.EvalStatusPending,
ModifyIndex: s.raft.AppliedIndex(),
ModifyIndex: modifyIndex,
}
}

Expand Down
10 changes: 9 additions & 1 deletion nomad/system_endpoint.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package nomad

import (
"fmt"

"github.com/hashicorp/nomad/nomad/structs"
)

Expand All @@ -16,6 +18,12 @@ func (s *System) GarbageCollect(args *structs.GenericRequest, reply *structs.Gen
return err
}

s.srv.evalBroker.Enqueue(s.srv.coreJobEval(structs.CoreJobForceGC))
// Get the states current index
snapshotIndex, err := s.srv.fsm.State().LatestIndex()
if err != nil {
return fmt.Errorf("failed to determine state store's index: %v", err)
}

s.srv.evalBroker.Enqueue(s.srv.coreJobEval(structs.CoreJobForceGC, snapshotIndex))
return nil
}
5 changes: 1 addition & 4 deletions nomad/system_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ import (
)

func TestSystemEndpoint_GarbageCollect(t *testing.T) {
//s1 := testServer(t, func(c *Config) {
//c.NumSchedulers = 0 // Prevent automatic dequeue
//})
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
Expand All @@ -23,7 +20,7 @@ func TestSystemEndpoint_GarbageCollect(t *testing.T) {
state := s1.fsm.State()
job := mock.Job()
job.Type = structs.JobTypeBatch
if err := state.UpsertJob(0, job); err != nil {
if err := state.UpsertJob(1000, job); err != nil {
t.Fatalf("UpsertAllocs() failed: %v", err)
}

Expand Down
13 changes: 11 additions & 2 deletions nomad/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,21 @@ func (w *Worker) sendAck(evalID, token string, ack bool) {
// state (attempt to allocate to a failed/dead node), we may need
// to sync our state again and do the planning with more recent data.
func (w *Worker) waitForIndex(index uint64, timeout time.Duration) error {
// XXX: Potential optimization is to set up a watch on the state stores
// index table and only unblock via a trigger rather than timing out and
// checking.

start := time.Now()
defer metrics.MeasureSince([]string{"nomad", "worker", "wait_for_index"}, start)
CHECK:
// Get the states current index
snapshotIndex, err := w.srv.fsm.State().LatestIndex()
if err != nil {
return fmt.Errorf("failed to determine state store's index: %v", err)
}

// We only need the FSM state to be as recent as the given index
appliedIndex := w.srv.raft.AppliedIndex()
if index <= appliedIndex {
if index <= snapshotIndex {
w.backoffReset()
return nil
}
Expand Down
5 changes: 4 additions & 1 deletion nomad/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,10 @@ func TestWorker_waitForIndex(t *testing.T) {
// Cause an increment
go func() {
time.Sleep(10 * time.Millisecond)
s1.raft.Barrier(0)
n := mock.Node()
if err := s1.fsm.state.UpsertNode(index+1, n); err != nil {
t.Fatalf("failed to upsert node: %v", err)
}
}()

// Wait for a future index
Expand Down

0 comments on commit 36c6a50

Please sign in to comment.