Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Plan queue apply overlaps plan verification with plan application to increase throughput #272

Merged
merged 11 commits into from
Oct 12, 2015
1 change: 1 addition & 0 deletions nomad/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ func TestLeader_EvalBroker_Reset(t *testing.T) {
defer s3.Shutdown()
servers := []*Server{s1, s2, s3}
testJoin(t, s1, s2, s3)
testutil.WaitForLeader(t, s1.RPC)

for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
Expand Down
119 changes: 103 additions & 16 deletions nomad/plan_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,42 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/raft"
)

// planApply is a long lived goroutine that reads plan allocations from
// the plan queue, determines if they can be applied safely and applies
// them via Raft.
//
// Naively, we could simply dequeue a plan, verify, apply and then respond.
// However, the plan application is bounded by the Raft apply time and
// subject to some latency. This creates a stall condition, where we are
// not evaluating, but simply waiting for a transaction to complete.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

complete -> apply.

//
// To avoid this, we overlap verification with apply. This means once
// we've verified plan N we attempt to apply it. However, while waiting
// for apply, we begin to verify plan N+1 under the assumption that plan
// N has succeeded.
//
// In this sense, we track two parallel versions of the world. One is
// the pessimistic one driven by the Raft log which is replicated. The
// other is optimistic and assumes our transactions will succeed. In the
// happy path, this lets us do productive work during the latency of
// apply.
//
// In the unhappy path (Raft transaction fails), effectively we only
// wasted work during a time we would have been waiting anyways. However,
// in anticipation of this case we cannot respond to the plan until
// the Raft log is updated. This means our schedulers will stall,
// but there are many of those and only a single plan verifier.
//
func (s *Server) planApply() {
// waitCh is used to track an outstanding application
// while snap holds an optimistic state which includes
// that plan application.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the oddly short comment length?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Armon doesn't run a 80 character viewer in his vim setup, so he just guesses and does things like this.

We just try to limit to less than 80 chars.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😆

var waitCh chan struct{}
var snap *state.StateSnapshot

for {
// Pull the next pending plan, exit if we are no longer leader
pending, err := s.planQueue.Dequeue(0)
Expand All @@ -35,12 +65,23 @@ func (s *Server) planApply() {
continue
}

// Check if out last plan has completed
select {
case <-waitCh:
waitCh = nil
snap = nil
default:
}

// Snapshot the state so that we have a consistent view of the world
snap, err := s.fsm.State().Snapshot()
if err != nil {
s.logger.Printf("[ERR] nomad: failed to snapshot state: %v", err)
pending.respond(nil, err)
continue
// if no snapshot is available
if waitCh == nil || snap == nil {
snap, err = s.fsm.State().Snapshot()
if err != nil {
s.logger.Printf("[ERR] nomad: failed to snapshot state: %v", err)
pending.respond(nil, err)
continue
}
}

// Evaluate the plan
Expand All @@ -51,25 +92,41 @@ func (s *Server) planApply() {
continue
}

// Apply the plan if there is anything to do
if !result.IsNoOp() {
allocIndex, err := s.applyPlan(result)
// Fast-path the response if there is nothing to do
if result.IsNoOp() {
pending.respond(result, nil)
continue
}

// Ensure any parallel apply is complete before
// starting the next one. This also limits how out
// of date our snapshot can be.
if waitCh != nil {
<-waitCh
snap, err = s.fsm.State().Snapshot()
if err != nil {
s.logger.Printf("[ERR] nomad: failed to apply plan: %v", err)
s.logger.Printf("[ERR] nomad: failed to snapshot state: %v", err)
pending.respond(nil, err)
continue
}
result.AllocIndex = allocIndex
}

// Respond to the plan
pending.respond(result, nil)
// Dispatch the Raft transaction for the plan
future, err := s.applyPlan(result, snap)
if err != nil {
s.logger.Printf("[ERR] nomad: failed to submit plan: %v", err)
pending.respond(nil, err)
continue
}

// Respond to the plan in async
waitCh = make(chan struct{})
go s.asyncPlanWait(waitCh, future, result, pending)
}
}

// applyPlan is used to apply the plan result and to return the alloc index
func (s *Server) applyPlan(result *structs.PlanResult) (uint64, error) {
defer metrics.MeasureSince([]string{"nomad", "plan", "apply"}, time.Now())
func (s *Server) applyPlan(result *structs.PlanResult, snap *state.StateSnapshot) (raft.ApplyFuture, error) {
req := structs.AllocUpdateRequest{}
for _, updateList := range result.NodeUpdate {
req.Alloc = append(req.Alloc, updateList...)
Expand All @@ -79,8 +136,38 @@ func (s *Server) applyPlan(result *structs.PlanResult) (uint64, error) {
}
req.Alloc = append(req.Alloc, result.FailedAllocs...)

_, index, err := s.raftApply(structs.AllocUpdateRequestType, &req)
return index, err
// Dispatch the Raft transaction
future, err := s.raftApplyFuture(structs.AllocUpdateRequestType, &req)
if err != nil {
return nil, err
}

// Optimistically apply to our state view
if snap != nil {
nextIdx := s.raft.AppliedIndex() + 1
if err := snap.UpsertAllocs(nextIdx, req.Alloc); err != nil {
return future, err
}
}
return future, nil
}

// asyncPlanWait is used to apply and respond to a plan async
func (s *Server) asyncPlanWait(waitCh chan struct{}, future raft.ApplyFuture,
result *structs.PlanResult, pending *pendingPlan) {
defer metrics.MeasureSince([]string{"nomad", "plan", "apply"}, time.Now())
defer close(waitCh)

// Wait for the plan to apply
if err := future.Error(); err != nil {
s.logger.Printf("[ERR] nomad: failed to apply plan: %v", err)
pending.respond(nil, err)
return
}

// Respond to the plan
result.AllocIndex = future.Index()
pending.respond(result, nil)
}

// evaluatePlan is used to determine what portions of a plan
Expand Down
47 changes: 45 additions & 2 deletions nomad/plan_apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,17 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/hashicorp/raft"
)

// planWaitFuture is used to wait for the Raft future to complete
func planWaitFuture(future raft.ApplyFuture) (uint64, error) {
if err := future.Error(); err != nil {
return 0, err
}
return future.Index(), nil
}

func testRegisterNode(t *testing.T, s *Server, n *structs.Node) {
// Create the register request
req := &structs.NodeRegisterRequest{
Expand Down Expand Up @@ -45,8 +54,25 @@ func TestPlanApply_applyPlan(t *testing.T) {
FailedAllocs: []*structs.Allocation{allocFail},
}

// Snapshot the state
snap, err := s1.State().Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}

// Apply the plan
index, err := s1.applyPlan(plan)
future, err := s1.applyPlan(plan, snap)
if err != nil {
t.Fatalf("err: %v", err)
}

// Verify our optimistic snapshot is updated
if out, err := snap.AllocByID(alloc.ID); err != nil || out == nil {
t.Fatalf("bad: %v %v", out, err)
}

// Check plan does apply cleanly
index, err := planWaitFuture(future)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -86,8 +112,25 @@ func TestPlanApply_applyPlan(t *testing.T) {
},
}

// Snapshot the state
snap, err = s1.State().Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}

// Apply the plan
index, err = s1.applyPlan(plan)
future, err = s1.applyPlan(plan, snap)
if err != nil {
t.Fatalf("err: %v", err)
}

// Check that our optimistic view is updated
if out, _ := snap.AllocByID(allocEvict.ID); out.DesiredStatus != structs.AllocDesiredStatusEvict {
t.Fatalf("bad: %#v", out)
}

// Verify plan applies cleanly
index, err = planWaitFuture(future)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
19 changes: 14 additions & 5 deletions nomad/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/raft"
"github.com/hashicorp/yamux"
)

Expand Down Expand Up @@ -225,12 +226,11 @@ func (s *Server) forwardRegion(region, method string, args interface{}, reply in
return s.connPool.RPC(region, server.Addr, server.Version, method, args, reply)
}

// raftApply is used to encode a message, run it through raft, and return
// the FSM response along with any errors
func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, uint64, error) {
// raftApplyFuture is used to encode a message, run it through raft, and return the Raft future.
func (s *Server) raftApplyFuture(t structs.MessageType, msg interface{}) (raft.ApplyFuture, error) {
buf, err := structs.Encode(t, msg)
if err != nil {
return nil, 0, fmt.Errorf("Failed to encode request: %v", err)
return nil, fmt.Errorf("Failed to encode request: %v", err)
}

// Warn if the command is very large
Expand All @@ -239,10 +239,19 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{},
}

future := s.raft.Apply(buf, enqueueLimit)
return future, nil
}

// raftApply is used to encode a message, run it through raft, and return
// the FSM response along with any errors
func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, uint64, error) {
future, err := s.raftApplyFuture(t, msg)
if err != nil {
return nil, 0, err
}
if err := future.Error(); err != nil {
return nil, 0, err
}

return future.Response(), future.Index(), nil
}

Expand Down
21 changes: 18 additions & 3 deletions nomad/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ func TestWorker_dequeueEvaluation(t *testing.T) {

// Create the evaluation
eval1 := mock.Eval()
s1.evalBroker.Enqueue(eval1)
testutil.WaitForResult(func() (bool, error) {
err := s1.evalBroker.Enqueue(eval1)
return err == nil, err
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Create a worker
w := &Worker{srv: s1, logger: s1.logger}
Expand Down Expand Up @@ -82,7 +87,12 @@ func TestWorker_dequeueEvaluation_paused(t *testing.T) {

// Create the evaluation
eval1 := mock.Eval()
s1.evalBroker.Enqueue(eval1)
testutil.WaitForResult(func() (bool, error) {
err := s1.evalBroker.Enqueue(eval1)
return err == nil, err
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Create a worker
w := &Worker{srv: s1, logger: s1.logger}
Expand Down Expand Up @@ -153,7 +163,12 @@ func TestWorker_sendAck(t *testing.T) {

// Create the evaluation
eval1 := mock.Eval()
s1.evalBroker.Enqueue(eval1)
testutil.WaitForResult(func() (bool, error) {
err := s1.evalBroker.Enqueue(eval1)
return err == nil, err
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Create a worker
w := &Worker{srv: s1, logger: s1.logger}
Expand Down