From 6f66fd326177ee505491661ea15cf20c7f9790cf Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 6 Jun 2019 15:44:47 -0700 Subject: [PATCH] nomad: include snapshot index when submitting plans Plan application should use a state snapshot at or after the Raft index at which the plan was created otherwise it risks being rejected based on stale data. This commit adds a Plan.SnapshotIndex which is set by workers when submitting plan. SnapshotIndex is set to the Raft index of the snapshot the worker used to generate the plan. Plan.SnapshotIndex plays a similar role to PlanResult.RefreshIndex. While RefreshIndex informs workers their StateStore is behind the leader's, SnapshotIndex is a way to prevent the leader from using a StateStore behind the worker's. Plan.SnapshotIndex should be considered the *lower bound* index for consistently handling plan application. Plans must also be committed serially, so Plan N+1 should use a state snapshot containing Plan N. This is guaranteed for plans *after* the first plan after a leader election. The Raft barrier on leader election ensures the leader's statestore has caught up to the log index at which it was elected. This guarantees its StateStore is at an index > lastPlanIndex. --- nomad/plan_apply.go | 12 +++++++++++- nomad/structs/structs.go | 5 +++++ nomad/worker.go | 6 +++++- 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index f9b8d7c63a2..66c90fdba46 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -1,6 +1,7 @@ package nomad import ( + "context" "fmt" "runtime" "time" @@ -99,7 +100,16 @@ func (p *planner) planApply() { // Snapshot the state so that we have a consistent view of the world // if no snapshot is available if waitCh == nil || snap == nil { - snap, err = p.fsm.State().Snapshot() + const timeout = 5 * time.Second + ctx, cancel := context.WithTimeout(context.Background(), timeout) + snap, err = p.fsm.State().SnapshotAfter(ctx, pending.plan.SnapshotIndex) + cancel() + if err == context.DeadlineExceeded { + p.logger.Error("timed out synchronizing to planner's index", + "timeout", timeout, "plan_index", pending.plan.SnapshotIndex) + err = fmt.Errorf("timed out after %s waiting for index=%d", + timeout, pending.plan.SnapshotIndex) + } if err != nil { p.logger.Error("failed to snapshot state", "error", err) pending.respond(nil, err) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index dee93eee733..a89f541cce6 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -8605,6 +8605,11 @@ type Plan struct { // lower priority jobs that are preempted. Preempted allocations are marked // as evicted. NodePreemptions map[string][]*Allocation + + // SnapshotIndex is the Raft index of the snapshot used to create the + // Plan. The leader will wait to evaluate the plan until its StateStore + // has reached at least this index. + SnapshotIndex uint64 } // AppendStoppedAlloc marks an allocation to be stopped. The clientStatus of the diff --git a/nomad/worker.go b/nomad/worker.go index 192a078d5dc..b29fa49ed5a 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -284,6 +284,10 @@ func (w *Worker) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, scheduler. // Add the evaluation token to the plan plan.EvalToken = w.evalToken + // Add SnapshotIndex to ensure leader's StateStore processes the Plan + // at or after the index it was created. + plan.SnapshotIndex = w.snapshotIndex + // Normalize stopped and preempted allocs before RPC normalizePlan := ServersMeetMinimumVersion(w.srv.Members(), MinVersionPlanNormalization, true) if normalizePlan { @@ -319,7 +323,7 @@ SUBMIT: } // Check if a state update is required. This could be required if we - // planning based on stale data, which is causing issues. For example, a + // planned based on stale data, which is causing issues. For example, a // node failure since the time we've started planning or conflicting task // allocations. var state scheduler.State