Skip to content

Commit

Permalink
Merge pull request #272 from hashicorp/f-apply-overlap
Browse files Browse the repository at this point in the history
Plan queue apply overlaps plan verification with plan application to increase throughput
  • Loading branch information
armon committed Oct 12, 2015
2 parents 01f5bde + ad76822 commit d69ce40
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 23 deletions.
117 changes: 101 additions & 16 deletions nomad/plan_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,41 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/raft"
)

// planApply is a long lived goroutine that reads plan allocations from
// the plan queue, determines if they can be applied safely and applies
// them via Raft.
//
// Naively, we could simply dequeue a plan, verify, apply and then respond.
// However, the plan application is bounded by the Raft apply time and
// subject to some latency. This creates a stall condition, where we are
// not evaluating, but simply waiting for a transaction to apply.
//
// To avoid this, we overlap verification with apply. This means once
// we've verified plan N we attempt to apply it. However, while waiting
// for apply, we begin to verify plan N+1 under the assumption that plan
// N has succeeded.
//
// In this sense, we track two parallel versions of the world. One is
// the pessimistic one driven by the Raft log which is replicated. The
// other is optimistic and assumes our transactions will succeed. In the
// happy path, this lets us do productive work during the latency of
// apply.
//
// In the unhappy path (Raft transaction fails), effectively we only
// wasted work during a time we would have been waiting anyways. However,
// in anticipation of this case we cannot respond to the plan until
// the Raft log is updated. This means our schedulers will stall,
// but there are many of those and only a single plan verifier.
//
func (s *Server) planApply() {
// waitCh is used to track an outstanding application while snap
// holds an optimistic state which includes that plan application.
var waitCh chan struct{}
var snap *state.StateSnapshot

for {
// Pull the next pending plan, exit if we are no longer leader
pending, err := s.planQueue.Dequeue(0)
Expand All @@ -35,12 +64,23 @@ func (s *Server) planApply() {
continue
}

// Check if out last plan has completed
select {
case <-waitCh:
waitCh = nil
snap = nil
default:
}

// Snapshot the state so that we have a consistent view of the world
snap, err := s.fsm.State().Snapshot()
if err != nil {
s.logger.Printf("[ERR] nomad: failed to snapshot state: %v", err)
pending.respond(nil, err)
continue
// if no snapshot is available
if waitCh == nil || snap == nil {
snap, err = s.fsm.State().Snapshot()
if err != nil {
s.logger.Printf("[ERR] nomad: failed to snapshot state: %v", err)
pending.respond(nil, err)
continue
}
}

// Evaluate the plan
Expand All @@ -51,25 +91,40 @@ func (s *Server) planApply() {
continue
}

// Apply the plan if there is anything to do
if !result.IsNoOp() {
allocIndex, err := s.applyPlan(result)
// Fast-path the response if there is nothing to do
if result.IsNoOp() {
pending.respond(result, nil)
continue
}

// Ensure any parallel apply is complete before starting the next one.
// This also limits how out of date our snapshot can be.
if waitCh != nil {
<-waitCh
snap, err = s.fsm.State().Snapshot()
if err != nil {
s.logger.Printf("[ERR] nomad: failed to apply plan: %v", err)
s.logger.Printf("[ERR] nomad: failed to snapshot state: %v", err)
pending.respond(nil, err)
continue
}
result.AllocIndex = allocIndex
}

// Respond to the plan
pending.respond(result, nil)
// Dispatch the Raft transaction for the plan
future, err := s.applyPlan(result, snap)
if err != nil {
s.logger.Printf("[ERR] nomad: failed to submit plan: %v", err)
pending.respond(nil, err)
continue
}

// Respond to the plan in async
waitCh = make(chan struct{})
go s.asyncPlanWait(waitCh, future, result, pending)
}
}

// applyPlan is used to apply the plan result and to return the alloc index
func (s *Server) applyPlan(result *structs.PlanResult) (uint64, error) {
defer metrics.MeasureSince([]string{"nomad", "plan", "apply"}, time.Now())
func (s *Server) applyPlan(result *structs.PlanResult, snap *state.StateSnapshot) (raft.ApplyFuture, error) {
req := structs.AllocUpdateRequest{}
for _, updateList := range result.NodeUpdate {
req.Alloc = append(req.Alloc, updateList...)
Expand All @@ -79,8 +134,38 @@ func (s *Server) applyPlan(result *structs.PlanResult) (uint64, error) {
}
req.Alloc = append(req.Alloc, result.FailedAllocs...)

_, index, err := s.raftApply(structs.AllocUpdateRequestType, &req)
return index, err
// Dispatch the Raft transaction
future, err := s.raftApplyFuture(structs.AllocUpdateRequestType, &req)
if err != nil {
return nil, err
}

// Optimistically apply to our state view
if snap != nil {
nextIdx := s.raft.AppliedIndex() + 1
if err := snap.UpsertAllocs(nextIdx, req.Alloc); err != nil {
return future, err
}
}
return future, nil
}

// asyncPlanWait is used to apply and respond to a plan async
func (s *Server) asyncPlanWait(waitCh chan struct{}, future raft.ApplyFuture,
result *structs.PlanResult, pending *pendingPlan) {
defer metrics.MeasureSince([]string{"nomad", "plan", "apply"}, time.Now())
defer close(waitCh)

// Wait for the plan to apply
if err := future.Error(); err != nil {
s.logger.Printf("[ERR] nomad: failed to apply plan: %v", err)
pending.respond(nil, err)
return
}

// Respond to the plan
result.AllocIndex = future.Index()
pending.respond(result, nil)
}

// evaluatePlan is used to determine what portions of a plan
Expand Down
47 changes: 45 additions & 2 deletions nomad/plan_apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,17 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/hashicorp/raft"
)

// planWaitFuture is used to wait for the Raft future to complete
func planWaitFuture(future raft.ApplyFuture) (uint64, error) {
if err := future.Error(); err != nil {
return 0, err
}
return future.Index(), nil
}

func testRegisterNode(t *testing.T, s *Server, n *structs.Node) {
// Create the register request
req := &structs.NodeRegisterRequest{
Expand Down Expand Up @@ -45,8 +54,25 @@ func TestPlanApply_applyPlan(t *testing.T) {
FailedAllocs: []*structs.Allocation{allocFail},
}

// Snapshot the state
snap, err := s1.State().Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}

// Apply the plan
index, err := s1.applyPlan(plan)
future, err := s1.applyPlan(plan, snap)
if err != nil {
t.Fatalf("err: %v", err)
}

// Verify our optimistic snapshot is updated
if out, err := snap.AllocByID(alloc.ID); err != nil || out == nil {
t.Fatalf("bad: %v %v", out, err)
}

// Check plan does apply cleanly
index, err := planWaitFuture(future)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -86,8 +112,25 @@ func TestPlanApply_applyPlan(t *testing.T) {
},
}

// Snapshot the state
snap, err = s1.State().Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}

// Apply the plan
index, err = s1.applyPlan(plan)
future, err = s1.applyPlan(plan, snap)
if err != nil {
t.Fatalf("err: %v", err)
}

// Check that our optimistic view is updated
if out, _ := snap.AllocByID(allocEvict.ID); out.DesiredStatus != structs.AllocDesiredStatusEvict {
t.Fatalf("bad: %#v", out)
}

// Verify plan applies cleanly
index, err = planWaitFuture(future)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
19 changes: 14 additions & 5 deletions nomad/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/raft"
"github.com/hashicorp/yamux"
)

Expand Down Expand Up @@ -225,12 +226,11 @@ func (s *Server) forwardRegion(region, method string, args interface{}, reply in
return s.connPool.RPC(region, server.Addr, server.Version, method, args, reply)
}

// raftApply is used to encode a message, run it through raft, and return
// the FSM response along with any errors
func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, uint64, error) {
// raftApplyFuture is used to encode a message, run it through raft, and return the Raft future.
func (s *Server) raftApplyFuture(t structs.MessageType, msg interface{}) (raft.ApplyFuture, error) {
buf, err := structs.Encode(t, msg)
if err != nil {
return nil, 0, fmt.Errorf("Failed to encode request: %v", err)
return nil, fmt.Errorf("Failed to encode request: %v", err)
}

// Warn if the command is very large
Expand All @@ -239,10 +239,19 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{},
}

future := s.raft.Apply(buf, enqueueLimit)
return future, nil
}

// raftApply is used to encode a message, run it through raft, and return
// the FSM response along with any errors
func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, uint64, error) {
future, err := s.raftApplyFuture(t, msg)
if err != nil {
return nil, 0, err
}
if err := future.Error(); err != nil {
return nil, 0, err
}

return future.Response(), future.Index(), nil
}

Expand Down

0 comments on commit d69ce40

Please sign in to comment.