From 985ef83ae7872a07cb03df555646469411db7972 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sun, 11 Oct 2015 18:35:09 -0400 Subject: [PATCH 01/11] nomad: make tests more robust --- nomad/worker_test.go | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/nomad/worker_test.go b/nomad/worker_test.go index 81837d99e0f..a56dfefe181 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -52,7 +52,12 @@ func TestWorker_dequeueEvaluation(t *testing.T) { // Create the evaluation eval1 := mock.Eval() - s1.evalBroker.Enqueue(eval1) + testutil.WaitForResult(func() (bool, error) { + err := s1.evalBroker.Enqueue(eval1) + return err == nil, err + }, func(err error) { + t.Fatalf("err: %v", err) + }) // Create a worker w := &Worker{srv: s1, logger: s1.logger} @@ -82,7 +87,12 @@ func TestWorker_dequeueEvaluation_paused(t *testing.T) { // Create the evaluation eval1 := mock.Eval() - s1.evalBroker.Enqueue(eval1) + testutil.WaitForResult(func() (bool, error) { + err := s1.evalBroker.Enqueue(eval1) + return err == nil, err + }, func(err error) { + t.Fatalf("err: %v", err) + }) // Create a worker w := &Worker{srv: s1, logger: s1.logger} @@ -153,7 +163,12 @@ func TestWorker_sendAck(t *testing.T) { // Create the evaluation eval1 := mock.Eval() - s1.evalBroker.Enqueue(eval1) + testutil.WaitForResult(func() (bool, error) { + err := s1.evalBroker.Enqueue(eval1) + return err == nil, err + }, func(err error) { + t.Fatalf("err: %v", err) + }) // Create a worker w := &Worker{srv: s1, logger: s1.logger} From 5a6687bdb4bd519d9bc3999b386a68e0be94c87c Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sun, 11 Oct 2015 18:47:00 -0400 Subject: [PATCH 02/11] nomad: make test more robust --- nomad/leader_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/nomad/leader_test.go b/nomad/leader_test.go index 475da47b094..b753b41f43a 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -228,6 +228,7 @@ func TestLeader_EvalBroker_Reset(t *testing.T) { defer s3.Shutdown() servers := []*Server{s1, s2, s3} testJoin(t, s1, s2, s3) + testutil.WaitForLeader(t, s1.RPC) for _, s := range servers { testutil.WaitForResult(func() (bool, error) { From 1d2e8135d2c333cc08ce7df67e82da56d19c2c94 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sun, 11 Oct 2015 17:39:34 -0400 Subject: [PATCH 03/11] nomad: raftApplyFuture exposes underlying Future --- nomad/rpc.go | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/nomad/rpc.go b/nomad/rpc.go index dee45e47fab..3d81277d5d8 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/raft" "github.com/hashicorp/yamux" ) @@ -225,12 +226,11 @@ func (s *Server) forwardRegion(region, method string, args interface{}, reply in return s.connPool.RPC(region, server.Addr, server.Version, method, args, reply) } -// raftApply is used to encode a message, run it through raft, and return -// the FSM response along with any errors -func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, uint64, error) { +// raftApplyFuture is used to encode a message, run it through raft, and return the Raft future. +func (s *Server) raftApplyFuture(t structs.MessageType, msg interface{}) (raft.ApplyFuture, error) { buf, err := structs.Encode(t, msg) if err != nil { - return nil, 0, fmt.Errorf("Failed to encode request: %v", err) + return nil, fmt.Errorf("Failed to encode request: %v", err) } // Warn if the command is very large @@ -240,9 +240,18 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, future := s.raft.Apply(buf, enqueueLimit) if err := future.Error(); err != nil { - return nil, 0, err + return nil, err } + return future, nil +} +// raftApply is used to encode a message, run it through raft, and return +// the FSM response along with any errors +func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, uint64, error) { + future, err := s.raftApplyFuture(t, msg) + if err != nil { + return nil, 0, err + } return future.Response(), future.Index(), nil } From 28428e9f8613920b3a86fe33f000a7dcefb2fe54 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sun, 11 Oct 2015 17:42:21 -0400 Subject: [PATCH 04/11] nomad: raftApplyFuture does not block for error --- nomad/rpc.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/nomad/rpc.go b/nomad/rpc.go index 3d81277d5d8..074dec0d61e 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -239,9 +239,6 @@ func (s *Server) raftApplyFuture(t structs.MessageType, msg interface{}) (raft.A } future := s.raft.Apply(buf, enqueueLimit) - if err := future.Error(); err != nil { - return nil, err - } return future, nil } @@ -252,6 +249,9 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, if err != nil { return nil, 0, err } + if err := future.Error(); err != nil { + return nil, 0, err + } return future.Response(), future.Index(), nil } From bb3e9406860c3f61b39e2bebbe5e02947a0a9c18 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sun, 11 Oct 2015 17:48:18 -0400 Subject: [PATCH 05/11] nomad: plan apply uses raw Raft future --- nomad/plan_apply.go | 47 +++++++++++++++++++++++++++++++++++----- nomad/plan_apply_test.go | 12 ++++++++-- 2 files changed, 52 insertions(+), 7 deletions(-) diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 4b760cdafb0..fc142cdfa74 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -7,11 +7,35 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/raft" ) // planApply is a long lived goroutine that reads plan allocations from // the plan queue, determines if they can be applied safely and applies // them via Raft. +// +// Naively, we could simply dequeue a plan, verify, apply and then respond. +// However, the plan application is bounded by the Raft apply time and +// subject to some latency. This creates a stall condition, where we are +// not evaluating, but simply waiting for a transaction to complete. +// +// To avoid this, we overlap verification with apply. This means once +// we've verified plan N we attempt to apply it. However, while waiting +// for apply, we begin to verify plan N+1 under the assumption that plan +// N has succeeded. +// +// In this sense, we track two parallel versions of the world. One is +// the pessimistic one driven by the Raft log which is replicated. The +// other is optimistic and assumes our transactions will succeed. In the +// happy path, this lets us do productive work during the latency of +// apply. +// +// In the unhappy path (Raft transaction fails), effectively we only +// wasted work during a time we would have been waiting anyways. However, +// in anticipation of this case we cannot respond to the plan until +// the Raft log is updated. This means our schedulers will stall, +// but there are many of those and only a single plan verifier. +// func (s *Server) planApply() { for { // Pull the next pending plan, exit if we are no longer leader @@ -53,7 +77,13 @@ func (s *Server) planApply() { // Apply the plan if there is anything to do if !result.IsNoOp() { - allocIndex, err := s.applyPlan(result) + future, err := s.applyPlan(result) + if err != nil { + s.logger.Printf("[ERR] nomad: failed to submit plan: %v", err) + pending.respond(nil, err) + continue + } + allocIndex, err := s.planWaitFuture(future) if err != nil { s.logger.Printf("[ERR] nomad: failed to apply plan: %v", err) pending.respond(nil, err) @@ -68,8 +98,7 @@ func (s *Server) planApply() { } // applyPlan is used to apply the plan result and to return the alloc index -func (s *Server) applyPlan(result *structs.PlanResult) (uint64, error) { - defer metrics.MeasureSince([]string{"nomad", "plan", "apply"}, time.Now()) +func (s *Server) applyPlan(result *structs.PlanResult) (raft.ApplyFuture, error) { req := structs.AllocUpdateRequest{} for _, updateList := range result.NodeUpdate { req.Alloc = append(req.Alloc, updateList...) @@ -79,8 +108,16 @@ func (s *Server) applyPlan(result *structs.PlanResult) (uint64, error) { } req.Alloc = append(req.Alloc, result.FailedAllocs...) - _, index, err := s.raftApply(structs.AllocUpdateRequestType, &req) - return index, err + return s.raftApplyFuture(structs.AllocUpdateRequestType, &req) +} + +// planWaitFuture is used to wait for the Raft future to complete +func (s *Server) planWaitFuture(future raft.ApplyFuture) (uint64, error) { + defer metrics.MeasureSince([]string{"nomad", "plan", "apply"}, time.Now()) + if err := future.Error(); err != nil { + return 0, err + } + return future.Index(), nil } // evaluatePlan is used to determine what portions of a plan diff --git a/nomad/plan_apply_test.go b/nomad/plan_apply_test.go index 72f37c41699..c74b6353be2 100644 --- a/nomad/plan_apply_test.go +++ b/nomad/plan_apply_test.go @@ -46,7 +46,11 @@ func TestPlanApply_applyPlan(t *testing.T) { } // Apply the plan - index, err := s1.applyPlan(plan) + future, err := s1.applyPlan(plan) + if err != nil { + t.Fatalf("err: %v", err) + } + index, err := s1.planWaitFuture(future) if err != nil { t.Fatalf("err: %v", err) } @@ -87,7 +91,11 @@ func TestPlanApply_applyPlan(t *testing.T) { } // Apply the plan - index, err = s1.applyPlan(plan) + future, err = s1.applyPlan(plan) + if err != nil { + t.Fatalf("err: %v", err) + } + index, err = s1.planWaitFuture(future) if err != nil { t.Fatalf("err: %v", err) } From 712393f4bf936514360f6a1a0b6669223bc50152 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sun, 11 Oct 2015 17:57:36 -0400 Subject: [PATCH 06/11] nomad: wait for plan to apply async --- nomad/plan_apply.go | 50 +++++++++++++++++++++++----------------- nomad/plan_apply_test.go | 13 +++++++++-- 2 files changed, 40 insertions(+), 23 deletions(-) diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index fc142cdfa74..b58f9e1cee5 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -75,25 +75,24 @@ func (s *Server) planApply() { continue } - // Apply the plan if there is anything to do - if !result.IsNoOp() { - future, err := s.applyPlan(result) - if err != nil { - s.logger.Printf("[ERR] nomad: failed to submit plan: %v", err) - pending.respond(nil, err) - continue - } - allocIndex, err := s.planWaitFuture(future) - if err != nil { - s.logger.Printf("[ERR] nomad: failed to apply plan: %v", err) - pending.respond(nil, err) - continue - } - result.AllocIndex = allocIndex + // Fast-path the response if there is nothing to do + if result.IsNoOp() { + pending.respond(result, nil) + continue + } + + // Dispatch the Raft transaction for the plan + future, err := s.applyPlan(result) + if err != nil { + s.logger.Printf("[ERR] nomad: failed to submit plan: %v", err) + pending.respond(nil, err) + continue } - // Respond to the plan - pending.respond(result, nil) + // Respond to the plan in async + waitCh := make(chan struct{}) + go s.asyncPlanWait(waitCh, future, result, pending) + <-waitCh } } @@ -111,13 +110,22 @@ func (s *Server) applyPlan(result *structs.PlanResult) (raft.ApplyFuture, error) return s.raftApplyFuture(structs.AllocUpdateRequestType, &req) } -// planWaitFuture is used to wait for the Raft future to complete -func (s *Server) planWaitFuture(future raft.ApplyFuture) (uint64, error) { +// asyncPlanWait is used to apply and respond to a plan async +func (s *Server) asyncPlanWait(waitCh chan struct{}, future raft.ApplyFuture, + result *structs.PlanResult, pending *pendingPlan) { defer metrics.MeasureSince([]string{"nomad", "plan", "apply"}, time.Now()) + defer close(waitCh) + + // Wait for the plan to apply if err := future.Error(); err != nil { - return 0, err + s.logger.Printf("[ERR] nomad: failed to apply plan: %v", err) + pending.respond(nil, err) + return } - return future.Index(), nil + + // Respond to the plan + result.AllocIndex = future.Index() + pending.respond(result, nil) } // evaluatePlan is used to determine what portions of a plan diff --git a/nomad/plan_apply_test.go b/nomad/plan_apply_test.go index c74b6353be2..654e22f6da6 100644 --- a/nomad/plan_apply_test.go +++ b/nomad/plan_apply_test.go @@ -7,8 +7,17 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" + "github.com/hashicorp/raft" ) +// planWaitFuture is used to wait for the Raft future to complete +func planWaitFuture(future raft.ApplyFuture) (uint64, error) { + if err := future.Error(); err != nil { + return 0, err + } + return future.Index(), nil +} + func testRegisterNode(t *testing.T, s *Server, n *structs.Node) { // Create the register request req := &structs.NodeRegisterRequest{ @@ -50,7 +59,7 @@ func TestPlanApply_applyPlan(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - index, err := s1.planWaitFuture(future) + index, err := planWaitFuture(future) if err != nil { t.Fatalf("err: %v", err) } @@ -95,7 +104,7 @@ func TestPlanApply_applyPlan(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - index, err = s1.planWaitFuture(future) + index, err = planWaitFuture(future) if err != nil { t.Fatalf("err: %v", err) } From 5348bd056370b2d88357a5118498df5d62b30bae Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sun, 11 Oct 2015 18:19:01 -0400 Subject: [PATCH 07/11] nomad: optimistically apply plan to state snapshot --- nomad/plan_apply.go | 19 ++++++++++++++++--- nomad/plan_apply_test.go | 4 ++-- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index b58f9e1cee5..ae9f85b50e7 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -82,7 +82,7 @@ func (s *Server) planApply() { } // Dispatch the Raft transaction for the plan - future, err := s.applyPlan(result) + future, err := s.applyPlan(result, snap) if err != nil { s.logger.Printf("[ERR] nomad: failed to submit plan: %v", err) pending.respond(nil, err) @@ -97,7 +97,7 @@ func (s *Server) planApply() { } // applyPlan is used to apply the plan result and to return the alloc index -func (s *Server) applyPlan(result *structs.PlanResult) (raft.ApplyFuture, error) { +func (s *Server) applyPlan(result *structs.PlanResult, snap *state.StateSnapshot) (raft.ApplyFuture, error) { req := structs.AllocUpdateRequest{} for _, updateList := range result.NodeUpdate { req.Alloc = append(req.Alloc, updateList...) @@ -107,7 +107,20 @@ func (s *Server) applyPlan(result *structs.PlanResult) (raft.ApplyFuture, error) } req.Alloc = append(req.Alloc, result.FailedAllocs...) - return s.raftApplyFuture(structs.AllocUpdateRequestType, &req) + // Dispatch the Raft transaction + future, err := s.raftApplyFuture(structs.AllocUpdateRequestType, &req) + if err != nil { + return nil, err + } + + // Optimistically apply to our state view + if snap != nil { + nextIdx := s.raft.AppliedIndex() + 1 + if err := snap.UpsertAllocs(nextIdx, req.Alloc); err != nil { + return future, err + } + } + return future, nil } // asyncPlanWait is used to apply and respond to a plan async diff --git a/nomad/plan_apply_test.go b/nomad/plan_apply_test.go index 654e22f6da6..d4c5dde0f34 100644 --- a/nomad/plan_apply_test.go +++ b/nomad/plan_apply_test.go @@ -55,7 +55,7 @@ func TestPlanApply_applyPlan(t *testing.T) { } // Apply the plan - future, err := s1.applyPlan(plan) + future, err := s1.applyPlan(plan, nil) if err != nil { t.Fatalf("err: %v", err) } @@ -100,7 +100,7 @@ func TestPlanApply_applyPlan(t *testing.T) { } // Apply the plan - future, err = s1.applyPlan(plan) + future, err = s1.applyPlan(plan, nil) if err != nil { t.Fatalf("err: %v", err) } From 7464bcb9bb704c0846b3abb0dabcba8514018359 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sun, 11 Oct 2015 18:34:52 -0400 Subject: [PATCH 08/11] nomad: overlap plan evaluation with apply --- nomad/plan_apply.go | 43 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 36 insertions(+), 7 deletions(-) diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index ae9f85b50e7..75fff4f59ed 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -37,6 +37,12 @@ import ( // but there are many of those and only a single plan verifier. // func (s *Server) planApply() { + // waitCh is used to track an outstanding application + // while snap holds an optimistic state which includes + // that plan application. + var waitCh chan struct{} + var snap *state.StateSnapshot + for { // Pull the next pending plan, exit if we are no longer leader pending, err := s.planQueue.Dequeue(0) @@ -59,12 +65,23 @@ func (s *Server) planApply() { continue } + // Check if out last plan has completed + select { + case <-waitCh: + waitCh = nil + snap = nil + default: + } + // Snapshot the state so that we have a consistent view of the world - snap, err := s.fsm.State().Snapshot() - if err != nil { - s.logger.Printf("[ERR] nomad: failed to snapshot state: %v", err) - pending.respond(nil, err) - continue + // if no snapshot is available + if snap == nil { + snap, err = s.fsm.State().Snapshot() + if err != nil { + s.logger.Printf("[ERR] nomad: failed to snapshot state: %v", err) + pending.respond(nil, err) + continue + } } // Evaluate the plan @@ -81,6 +98,19 @@ func (s *Server) planApply() { continue } + // Ensure any parallel apply is complete before + // starting the next one. This also limits how out + // of date our snapshot can be. + if waitCh != nil { + <-waitCh + snap, err = s.fsm.State().Snapshot() + if err != nil { + s.logger.Printf("[ERR] nomad: failed to snapshot state: %v", err) + pending.respond(nil, err) + continue + } + } + // Dispatch the Raft transaction for the plan future, err := s.applyPlan(result, snap) if err != nil { @@ -90,9 +120,8 @@ func (s *Server) planApply() { } // Respond to the plan in async - waitCh := make(chan struct{}) + waitCh = make(chan struct{}) go s.asyncPlanWait(waitCh, future, result, pending) - <-waitCh } } From 9c14964bae0538eaccd468f6d8eb913b00aec12b Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sun, 11 Oct 2015 18:38:07 -0400 Subject: [PATCH 09/11] nomad: refresh snapshot under error return --- nomad/plan_apply.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 75fff4f59ed..94c00347705 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -75,7 +75,7 @@ func (s *Server) planApply() { // Snapshot the state so that we have a consistent view of the world // if no snapshot is available - if snap == nil { + if waitCh == nil || snap == nil { snap, err = s.fsm.State().Snapshot() if err != nil { s.logger.Printf("[ERR] nomad: failed to snapshot state: %v", err) From c4cc21dab4a4f74017ed731a02830718d9e29d10 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sun, 11 Oct 2015 18:46:46 -0400 Subject: [PATCH 10/11] nomad: test optimistic state update --- nomad/plan_apply_test.go | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/nomad/plan_apply_test.go b/nomad/plan_apply_test.go index d4c5dde0f34..4fd0627c5d7 100644 --- a/nomad/plan_apply_test.go +++ b/nomad/plan_apply_test.go @@ -54,11 +54,24 @@ func TestPlanApply_applyPlan(t *testing.T) { FailedAllocs: []*structs.Allocation{allocFail}, } + // Snapshot the state + snap, err := s1.State().Snapshot() + if err != nil { + t.Fatalf("err: %v", err) + } + // Apply the plan - future, err := s1.applyPlan(plan, nil) + future, err := s1.applyPlan(plan, snap) if err != nil { t.Fatalf("err: %v", err) } + + // Verify our optimistic snapshot is updated + if out, err := snap.AllocByID(alloc.ID); err != nil || out == nil { + t.Fatalf("bad: %v %v", out, err) + } + + // Check plan does apply cleanly index, err := planWaitFuture(future) if err != nil { t.Fatalf("err: %v", err) @@ -99,11 +112,24 @@ func TestPlanApply_applyPlan(t *testing.T) { }, } + // Snapshot the state + snap, err = s1.State().Snapshot() + if err != nil { + t.Fatalf("err: %v", err) + } + // Apply the plan - future, err = s1.applyPlan(plan, nil) + future, err = s1.applyPlan(plan, snap) if err != nil { t.Fatalf("err: %v", err) } + + // Check that our optimistic view is updated + if out, _ := snap.AllocByID(allocEvict.ID); out.DesiredStatus != structs.AllocDesiredStatusEvict { + t.Fatalf("bad: %#v", out) + } + + // Verify plan applies cleanly index, err = planWaitFuture(future) if err != nil { t.Fatalf("err: %v", err) From ad76822c8d1723e0440db0574e4e1fe678258ab6 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 12 Oct 2015 14:35:17 -0700 Subject: [PATCH 11/11] nomad: comment cleanups --- nomad/plan_apply.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 94c00347705..fe7bb84a7a1 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -17,7 +17,7 @@ import ( // Naively, we could simply dequeue a plan, verify, apply and then respond. // However, the plan application is bounded by the Raft apply time and // subject to some latency. This creates a stall condition, where we are -// not evaluating, but simply waiting for a transaction to complete. +// not evaluating, but simply waiting for a transaction to apply. // // To avoid this, we overlap verification with apply. This means once // we've verified plan N we attempt to apply it. However, while waiting @@ -37,9 +37,8 @@ import ( // but there are many of those and only a single plan verifier. // func (s *Server) planApply() { - // waitCh is used to track an outstanding application - // while snap holds an optimistic state which includes - // that plan application. + // waitCh is used to track an outstanding application while snap + // holds an optimistic state which includes that plan application. var waitCh chan struct{} var snap *state.StateSnapshot @@ -98,9 +97,8 @@ func (s *Server) planApply() { continue } - // Ensure any parallel apply is complete before - // starting the next one. This also limits how out - // of date our snapshot can be. + // Ensure any parallel apply is complete before starting the next one. + // This also limits how out of date our snapshot can be. if waitCh != nil { <-waitCh snap, err = s.fsm.State().Snapshot()