Skip to content

Commit

Permalink
fix deadlock in plan_apply (#13407)
Browse files Browse the repository at this point in the history
The plan applier has to get a snapshot with a minimum index for the
plan it's working on in order to ensure consistency. Under heavy raft
loads, we can exceed the timeout. When this happens, we hit a bug
where the plan applier blocks waiting on the `indexCh` forever, and
all schedulers will block in `Plan.Submit`.

Closing the `indexCh` when the `asyncPlanWait` is done with it will
prevent the deadlock without impacting correctness of the previous
snapshot index.

This changeset includes the a PoC failing test that works by injecting
a large timeout into the state store. We need to turn this into a test
we can run normally without breaking the state store before we can
merge this PR.

Increase `snapshotMinIndex` timeout to 10s.
This timeout creates backpressure where any concurrent `Plan.Submit`
RPCs will block waiting for results. This sheds load across all
servers and gives raft some CPU to catch up, because schedulers won't
dequeue more work while waiting. Increase it to 10s based on
observations of large production clusters.
  • Loading branch information
tgross authored Jun 23, 2022
1 parent c52741a commit 0213a36
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 5 deletions.
3 changes: 3 additions & 0 deletions .changelog/13407.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
core: Fixed a bug where the plan applier could deadlock if leader's state lagged behind plan's creation index for more than 5 seconds.
```
13 changes: 8 additions & 5 deletions nomad/plan_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func (p *planner) planApply() {
// This also limits how out of date our snapshot can be.
if planIndexCh != nil {
idx := <-planIndexCh
planIndexCh = nil
prevPlanResultIndex = max(prevPlanResultIndex, idx)
snap, err = p.snapshotMinIndex(prevPlanResultIndex, pending.plan.SnapshotIndex)
if err != nil {
Expand All @@ -177,7 +178,7 @@ func (p *planner) planApply() {
}
}

// snapshotMinIndex wraps SnapshotAfter with a 5s timeout and converts timeout
// snapshotMinIndex wraps SnapshotAfter with a 10s timeout and converts timeout
// errors to a more descriptive error message. The snapshot is guaranteed to
// include both the previous plan and all objects referenced by the plan or
// return an error.
Expand All @@ -188,7 +189,11 @@ func (p *planner) snapshotMinIndex(prevPlanResultIndex, planSnapshotIndex uint64
// plan result's and current plan's snapshot index.
minIndex := max(prevPlanResultIndex, planSnapshotIndex)

const timeout = 5 * time.Second
// This timeout creates backpressure where any concurrent
// Plan.Submit RPCs will block waiting for results. This sheds
// load across all servers and gives raft some CPU to catch up,
// because schedulers won't dequeue more work while waiting.
const timeout = 10 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
snap, err := p.fsm.State().SnapshotMinIndex(ctx, minIndex)
cancel()
Expand Down Expand Up @@ -368,14 +373,12 @@ func updateAllocTimestamps(allocations []*structs.Allocation, timestamp int64) {
func (p *planner) asyncPlanWait(indexCh chan<- uint64, future raft.ApplyFuture,
result *structs.PlanResult, pending *pendingPlan) {
defer metrics.MeasureSince([]string{"nomad", "plan", "apply"}, time.Now())
defer close(indexCh)

// Wait for the plan to apply
if err := future.Error(); err != nil {
p.logger.Error("failed to apply plan", "error", err)
pending.respond(nil, err)

// Close indexCh on error
close(indexCh)
return
}

Expand Down
68 changes: 68 additions & 0 deletions nomad/plan_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nomad

import (
"sync"
"testing"
"time"

Expand All @@ -9,6 +10,7 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -128,3 +130,69 @@ func TestPlanEndpoint_Submit_Bad(t *testing.T) {
// Ensure no plans were enqueued
require.Zero(t, s1.planner.planQueue.Stats().Depth)
}

func TestPlanEndpoint_ApplyConcurrent(t *testing.T) {
t.Parallel()

s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)

plans := []*structs.Plan{}

for i := 0; i < 5; i++ {

// Create a node to place on
node := mock.Node()
store := s1.fsm.State()
require.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 100, node))

// Create the eval
eval1 := mock.Eval()
s1.evalBroker.Enqueue(eval1)
require.NoError(t, store.UpsertEvals(
structs.MsgTypeTestSetup, 150, []*structs.Evaluation{eval1}))

evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second)
require.NoError(t, err)
require.Equal(t, eval1, evalOut)

// Submit a plan
plan := mock.Plan()
plan.EvalID = eval1.ID
plan.EvalToken = token
plan.Job = mock.Job()

alloc := mock.Alloc()
alloc.JobID = plan.Job.ID
alloc.Job = plan.Job

plan.NodeAllocation = map[string][]*structs.Allocation{
node.ID: []*structs.Allocation{alloc}}

plans = append(plans, plan)
}

var wg sync.WaitGroup

for _, plan := range plans {
plan := plan
wg.Add(1)
go func() {

req := &structs.PlanRequest{
Plan: plan,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.PlanResponse
err := s1.RPC("Plan.Submit", req, &resp)
assert.NoError(t, err)
assert.NotNil(t, resp.Result, "missing result")
wg.Done()
}()
}

wg.Wait()
}

0 comments on commit 0213a36

Please sign in to comment.