diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 4b760cdafb0..fe7bb84a7a1 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -7,12 +7,41 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/raft" ) // planApply is a long lived goroutine that reads plan allocations from // the plan queue, determines if they can be applied safely and applies // them via Raft. +// +// Naively, we could simply dequeue a plan, verify, apply and then respond. +// However, the plan application is bounded by the Raft apply time and +// subject to some latency. This creates a stall condition, where we are +// not evaluating, but simply waiting for a transaction to apply. +// +// To avoid this, we overlap verification with apply. This means once +// we've verified plan N we attempt to apply it. However, while waiting +// for apply, we begin to verify plan N+1 under the assumption that plan +// N has succeeded. +// +// In this sense, we track two parallel versions of the world. One is +// the pessimistic one driven by the Raft log which is replicated. The +// other is optimistic and assumes our transactions will succeed. In the +// happy path, this lets us do productive work during the latency of +// apply. +// +// In the unhappy path (Raft transaction fails), effectively we only +// wasted work during a time we would have been waiting anyways. However, +// in anticipation of this case we cannot respond to the plan until +// the Raft log is updated. This means our schedulers will stall, +// but there are many of those and only a single plan verifier. +// func (s *Server) planApply() { + // waitCh is used to track an outstanding application while snap + // holds an optimistic state which includes that plan application. + var waitCh chan struct{} + var snap *state.StateSnapshot + for { // Pull the next pending plan, exit if we are no longer leader pending, err := s.planQueue.Dequeue(0) @@ -35,12 +64,23 @@ func (s *Server) planApply() { continue } + // Check if out last plan has completed + select { + case <-waitCh: + waitCh = nil + snap = nil + default: + } + // Snapshot the state so that we have a consistent view of the world - snap, err := s.fsm.State().Snapshot() - if err != nil { - s.logger.Printf("[ERR] nomad: failed to snapshot state: %v", err) - pending.respond(nil, err) - continue + // if no snapshot is available + if waitCh == nil || snap == nil { + snap, err = s.fsm.State().Snapshot() + if err != nil { + s.logger.Printf("[ERR] nomad: failed to snapshot state: %v", err) + pending.respond(nil, err) + continue + } } // Evaluate the plan @@ -51,25 +91,40 @@ func (s *Server) planApply() { continue } - // Apply the plan if there is anything to do - if !result.IsNoOp() { - allocIndex, err := s.applyPlan(result) + // Fast-path the response if there is nothing to do + if result.IsNoOp() { + pending.respond(result, nil) + continue + } + + // Ensure any parallel apply is complete before starting the next one. + // This also limits how out of date our snapshot can be. + if waitCh != nil { + <-waitCh + snap, err = s.fsm.State().Snapshot() if err != nil { - s.logger.Printf("[ERR] nomad: failed to apply plan: %v", err) + s.logger.Printf("[ERR] nomad: failed to snapshot state: %v", err) pending.respond(nil, err) continue } - result.AllocIndex = allocIndex } - // Respond to the plan - pending.respond(result, nil) + // Dispatch the Raft transaction for the plan + future, err := s.applyPlan(result, snap) + if err != nil { + s.logger.Printf("[ERR] nomad: failed to submit plan: %v", err) + pending.respond(nil, err) + continue + } + + // Respond to the plan in async + waitCh = make(chan struct{}) + go s.asyncPlanWait(waitCh, future, result, pending) } } // applyPlan is used to apply the plan result and to return the alloc index -func (s *Server) applyPlan(result *structs.PlanResult) (uint64, error) { - defer metrics.MeasureSince([]string{"nomad", "plan", "apply"}, time.Now()) +func (s *Server) applyPlan(result *structs.PlanResult, snap *state.StateSnapshot) (raft.ApplyFuture, error) { req := structs.AllocUpdateRequest{} for _, updateList := range result.NodeUpdate { req.Alloc = append(req.Alloc, updateList...) @@ -79,8 +134,38 @@ func (s *Server) applyPlan(result *structs.PlanResult) (uint64, error) { } req.Alloc = append(req.Alloc, result.FailedAllocs...) - _, index, err := s.raftApply(structs.AllocUpdateRequestType, &req) - return index, err + // Dispatch the Raft transaction + future, err := s.raftApplyFuture(structs.AllocUpdateRequestType, &req) + if err != nil { + return nil, err + } + + // Optimistically apply to our state view + if snap != nil { + nextIdx := s.raft.AppliedIndex() + 1 + if err := snap.UpsertAllocs(nextIdx, req.Alloc); err != nil { + return future, err + } + } + return future, nil +} + +// asyncPlanWait is used to apply and respond to a plan async +func (s *Server) asyncPlanWait(waitCh chan struct{}, future raft.ApplyFuture, + result *structs.PlanResult, pending *pendingPlan) { + defer metrics.MeasureSince([]string{"nomad", "plan", "apply"}, time.Now()) + defer close(waitCh) + + // Wait for the plan to apply + if err := future.Error(); err != nil { + s.logger.Printf("[ERR] nomad: failed to apply plan: %v", err) + pending.respond(nil, err) + return + } + + // Respond to the plan + result.AllocIndex = future.Index() + pending.respond(result, nil) } // evaluatePlan is used to determine what portions of a plan diff --git a/nomad/plan_apply_test.go b/nomad/plan_apply_test.go index 72f37c41699..4fd0627c5d7 100644 --- a/nomad/plan_apply_test.go +++ b/nomad/plan_apply_test.go @@ -7,8 +7,17 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" + "github.com/hashicorp/raft" ) +// planWaitFuture is used to wait for the Raft future to complete +func planWaitFuture(future raft.ApplyFuture) (uint64, error) { + if err := future.Error(); err != nil { + return 0, err + } + return future.Index(), nil +} + func testRegisterNode(t *testing.T, s *Server, n *structs.Node) { // Create the register request req := &structs.NodeRegisterRequest{ @@ -45,8 +54,25 @@ func TestPlanApply_applyPlan(t *testing.T) { FailedAllocs: []*structs.Allocation{allocFail}, } + // Snapshot the state + snap, err := s1.State().Snapshot() + if err != nil { + t.Fatalf("err: %v", err) + } + // Apply the plan - index, err := s1.applyPlan(plan) + future, err := s1.applyPlan(plan, snap) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Verify our optimistic snapshot is updated + if out, err := snap.AllocByID(alloc.ID); err != nil || out == nil { + t.Fatalf("bad: %v %v", out, err) + } + + // Check plan does apply cleanly + index, err := planWaitFuture(future) if err != nil { t.Fatalf("err: %v", err) } @@ -86,8 +112,25 @@ func TestPlanApply_applyPlan(t *testing.T) { }, } + // Snapshot the state + snap, err = s1.State().Snapshot() + if err != nil { + t.Fatalf("err: %v", err) + } + // Apply the plan - index, err = s1.applyPlan(plan) + future, err = s1.applyPlan(plan, snap) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Check that our optimistic view is updated + if out, _ := snap.AllocByID(allocEvict.ID); out.DesiredStatus != structs.AllocDesiredStatusEvict { + t.Fatalf("bad: %#v", out) + } + + // Verify plan applies cleanly + index, err = planWaitFuture(future) if err != nil { t.Fatalf("err: %v", err) } diff --git a/nomad/rpc.go b/nomad/rpc.go index dee45e47fab..074dec0d61e 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/raft" "github.com/hashicorp/yamux" ) @@ -225,12 +226,11 @@ func (s *Server) forwardRegion(region, method string, args interface{}, reply in return s.connPool.RPC(region, server.Addr, server.Version, method, args, reply) } -// raftApply is used to encode a message, run it through raft, and return -// the FSM response along with any errors -func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, uint64, error) { +// raftApplyFuture is used to encode a message, run it through raft, and return the Raft future. +func (s *Server) raftApplyFuture(t structs.MessageType, msg interface{}) (raft.ApplyFuture, error) { buf, err := structs.Encode(t, msg) if err != nil { - return nil, 0, fmt.Errorf("Failed to encode request: %v", err) + return nil, fmt.Errorf("Failed to encode request: %v", err) } // Warn if the command is very large @@ -239,10 +239,19 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, } future := s.raft.Apply(buf, enqueueLimit) + return future, nil +} + +// raftApply is used to encode a message, run it through raft, and return +// the FSM response along with any errors +func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, uint64, error) { + future, err := s.raftApplyFuture(t, msg) + if err != nil { + return nil, 0, err + } if err := future.Error(); err != nil { return nil, 0, err } - return future.Response(), future.Index(), nil }