Skip to content

Commit

Permalink
nomad: include snapshot index when submitting plans
Browse files Browse the repository at this point in the history
Plan application should use a state snapshot at or after the Raft index
at which the plan was created otherwise it risks being rejected based on
stale data.

This commit adds a Plan.SnapshotIndex which is set by workers when
submitting plan. SnapshotIndex is set to the Raft index of the snapshot
the worker used to generate the plan.

Plan.SnapshotIndex plays a similar role to PlanResult.RefreshIndex.
While RefreshIndex informs workers their StateStore is behind the
leader's, SnapshotIndex is a way to prevent the leader from using a
StateStore behind the worker's.

Plan.SnapshotIndex should be considered the *lower bound* index for
consistently handling plan application.

Plans must also be committed serially, so Plan N+1 should use a state
snapshot containing Plan N. This is guaranteed for plans *after* the
first plan after a leader election.

The Raft barrier on leader election ensures the leader's statestore has
caught up to the log index at which it was elected. This guarantees its
StateStore is at an index > lastPlanIndex.
  • Loading branch information
schmichael committed Jun 24, 2019
1 parent 72b9b87 commit 1d670f2
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 2 deletions.
12 changes: 11 additions & 1 deletion nomad/plan_apply.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nomad

import (
"context"
"fmt"
"runtime"
"time"
Expand Down Expand Up @@ -99,7 +100,16 @@ func (p *planner) planApply() {
// Snapshot the state so that we have a consistent view of the world
// if no snapshot is available
if waitCh == nil || snap == nil {
snap, err = p.fsm.State().Snapshot()
const timeout = 5 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
snap, err = p.fsm.State().SnapshotAfter(ctx, pending.plan.SnapshotIndex)
cancel()
if err == context.DeadlineExceeded {
p.logger.Error("timed out synchronizing to planner's index",
"timeout", timeout, "plan_index", pending.plan.SnapshotIndex)
err = fmt.Errorf("timed out after %s waiting for index=%d",
timeout, pending.plan.SnapshotIndex)
}
if err != nil {
p.logger.Error("failed to snapshot state", "error", err)
pending.respond(nil, err)
Expand Down
5 changes: 5 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8605,6 +8605,11 @@ type Plan struct {
// lower priority jobs that are preempted. Preempted allocations are marked
// as evicted.
NodePreemptions map[string][]*Allocation

// SnapshotIndex is the Raft index of the snapshot used to create the
// Plan. The leader will wait to evaluate the plan until its StateStore
// has reached at least this index.
SnapshotIndex uint64
}

// AppendStoppedAlloc marks an allocation to be stopped. The clientStatus of the
Expand Down
6 changes: 5 additions & 1 deletion nomad/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,10 @@ func (w *Worker) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, scheduler.
// Add the evaluation token to the plan
plan.EvalToken = w.evalToken

// Add SnapshotIndex to ensure leader's StateStore processes the Plan
// at or after the index it was created.
plan.SnapshotIndex = w.snapshotIndex

// Normalize stopped and preempted allocs before RPC
normalizePlan := ServersMeetMinimumVersion(w.srv.Members(), MinVersionPlanNormalization, true)
if normalizePlan {
Expand Down Expand Up @@ -319,7 +323,7 @@ SUBMIT:
}

// Check if a state update is required. This could be required if we
// planning based on stale data, which is causing issues. For example, a
// planned based on stale data, which is causing issues. For example, a
// node failure since the time we've started planning or conflicting task
// allocations.
var state scheduler.State
Expand Down

0 comments on commit 1d670f2

Please sign in to comment.