Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker waitForIndex uses StateStore index, not Raft Applied Index #1339

Merged
merged 3 commits into from
Jun 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 12 additions & 21 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ func TestCoreScheduler_EvalGC(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -112,8 +111,7 @@ func TestCoreScheduler_EvalGC_Partial(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -173,8 +171,7 @@ func TestCoreScheduler_EvalGC_Batch_NoAllocs(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -235,8 +232,7 @@ func TestCoreScheduler_EvalGC_Batch_Allocs_WithJob(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -296,8 +292,7 @@ func TestCoreScheduler_EvalGC_Batch_Allocs_NoJob(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -344,7 +339,7 @@ func TestCoreScheduler_EvalGC_Force(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobForceGC)
gc := s1.coreJobEval(structs.CoreJobForceGC, 1001)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -394,8 +389,7 @@ func TestCoreScheduler_NodeGC(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobNodeGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobNodeGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -444,8 +438,7 @@ func TestCoreScheduler_NodeGC_TerminalAllocs(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobNodeGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobNodeGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -496,8 +489,7 @@ func TestCoreScheduler_NodeGC_RunningAllocs(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobNodeGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobNodeGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -535,7 +527,7 @@ func TestCoreScheduler_NodeGC_Force(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobForceGC)
gc := s1.coreJobEval(structs.CoreJobForceGC, 1000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -621,8 +613,7 @@ func TestCoreScheduler_JobGC(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobJobGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobJobGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
Expand Down Expand Up @@ -721,7 +712,7 @@ func TestCoreScheduler_JobGC_Force(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobForceGC)
gc := s1.coreJobEval(structs.CoreJobForceGC, 1002)
err = core.Process(gc)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
Expand Down
29 changes: 24 additions & 5 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,30 +251,49 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) {
jobGC := time.NewTicker(s.config.JobGCInterval)
defer jobGC.Stop()

// getLatest grabs the latest index from the state store. It returns true if
// the index was retrieved successfully.
getLatest := func() (uint64, bool) {
snapshotIndex, err := s.fsm.State().LatestIndex()
if err != nil {
s.logger.Printf("[ERR] nomad: failed to determine state store's index: %v", err)
return 0, false
}

return snapshotIndex, true
}

for {

select {
case <-evalGC.C:
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobEvalGC))
if index, ok := getLatest(); ok {
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobEvalGC, index))
}
case <-nodeGC.C:
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobNodeGC))
if index, ok := getLatest(); ok {
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobNodeGC, index))
}
case <-jobGC.C:
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobJobGC))
if index, ok := getLatest(); ok {
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobJobGC, index))
}
case <-stopCh:
return
}
}
}

// coreJobEval returns an evaluation for a core job
func (s *Server) coreJobEval(job string) *structs.Evaluation {
func (s *Server) coreJobEval(job string, modifyIndex uint64) *structs.Evaluation {
return &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: structs.CoreJobPriority,
Type: structs.JobTypeCore,
TriggeredBy: structs.EvalTriggerScheduled,
JobID: job,
Status: structs.EvalStatusPending,
ModifyIndex: s.raft.AppliedIndex(),
ModifyIndex: modifyIndex,
}
}

Expand Down
10 changes: 9 additions & 1 deletion nomad/system_endpoint.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package nomad

import (
"fmt"

"github.com/hashicorp/nomad/nomad/structs"
)

Expand All @@ -16,6 +18,12 @@ func (s *System) GarbageCollect(args *structs.GenericRequest, reply *structs.Gen
return err
}

s.srv.evalBroker.Enqueue(s.srv.coreJobEval(structs.CoreJobForceGC))
// Get the states current index
snapshotIndex, err := s.srv.fsm.State().LatestIndex()
if err != nil {
return fmt.Errorf("failed to determine state store's index: %v", err)
}

s.srv.evalBroker.Enqueue(s.srv.coreJobEval(structs.CoreJobForceGC, snapshotIndex))
return nil
}
5 changes: 1 addition & 4 deletions nomad/system_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ import (
)

func TestSystemEndpoint_GarbageCollect(t *testing.T) {
//s1 := testServer(t, func(c *Config) {
//c.NumSchedulers = 0 // Prevent automatic dequeue
//})
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
Expand All @@ -23,7 +20,7 @@ func TestSystemEndpoint_GarbageCollect(t *testing.T) {
state := s1.fsm.State()
job := mock.Job()
job.Type = structs.JobTypeBatch
if err := state.UpsertJob(0, job); err != nil {
if err := state.UpsertJob(1000, job); err != nil {
t.Fatalf("UpsertAllocs() failed: %v", err)
}

Expand Down
13 changes: 11 additions & 2 deletions nomad/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,21 @@ func (w *Worker) sendAck(evalID, token string, ack bool) {
// state (attempt to allocate to a failed/dead node), we may need
// to sync our state again and do the planning with more recent data.
func (w *Worker) waitForIndex(index uint64, timeout time.Duration) error {
// XXX: Potential optimization is to set up a watch on the state stores
// index table and only unblock via a trigger rather than timing out and
// checking.

start := time.Now()
defer metrics.MeasureSince([]string{"nomad", "worker", "wait_for_index"}, start)
CHECK:
// Get the states current index
snapshotIndex, err := w.srv.fsm.State().LatestIndex()
if err != nil {
return fmt.Errorf("failed to determine state store's index: %v", err)
}

// We only need the FSM state to be as recent as the given index
appliedIndex := w.srv.raft.AppliedIndex()
if index <= appliedIndex {
if index <= snapshotIndex {
w.backoffReset()
return nil
}
Expand Down
5 changes: 4 additions & 1 deletion nomad/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,10 @@ func TestWorker_waitForIndex(t *testing.T) {
// Cause an increment
go func() {
time.Sleep(10 * time.Millisecond)
s1.raft.Barrier(0)
n := mock.Node()
if err := s1.fsm.state.UpsertNode(index+1, n); err != nil {
t.Fatalf("failed to upsert node: %v", err)
}
}()

// Wait for a future index
Expand Down