From 0213a36e457c7bdfa4366dc992322fed305bc7b5 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 23 Jun 2022 12:06:27 -0400 Subject: [PATCH] fix deadlock in plan_apply (#13407) The plan applier has to get a snapshot with a minimum index for the plan it's working on in order to ensure consistency. Under heavy raft loads, we can exceed the timeout. When this happens, we hit a bug where the plan applier blocks waiting on the `indexCh` forever, and all schedulers will block in `Plan.Submit`. Closing the `indexCh` when the `asyncPlanWait` is done with it will prevent the deadlock without impacting correctness of the previous snapshot index. This changeset includes the a PoC failing test that works by injecting a large timeout into the state store. We need to turn this into a test we can run normally without breaking the state store before we can merge this PR. Increase `snapshotMinIndex` timeout to 10s. This timeout creates backpressure where any concurrent `Plan.Submit` RPCs will block waiting for results. This sheds load across all servers and gives raft some CPU to catch up, because schedulers won't dequeue more work while waiting. Increase it to 10s based on observations of large production clusters. --- .changelog/13407.txt | 3 ++ nomad/plan_apply.go | 13 ++++--- nomad/plan_endpoint_test.go | 68 +++++++++++++++++++++++++++++++++++++ 3 files changed, 79 insertions(+), 5 deletions(-) create mode 100644 .changelog/13407.txt diff --git a/.changelog/13407.txt b/.changelog/13407.txt new file mode 100644 index 00000000000..7a2fb340bac --- /dev/null +++ b/.changelog/13407.txt @@ -0,0 +1,3 @@ +```release-note:bug +core: Fixed a bug where the plan applier could deadlock if leader's state lagged behind plan's creation index for more than 5 seconds. +``` diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 7f1fffc54f8..58002b2258a 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -154,6 +154,7 @@ func (p *planner) planApply() { // This also limits how out of date our snapshot can be. if planIndexCh != nil { idx := <-planIndexCh + planIndexCh = nil prevPlanResultIndex = max(prevPlanResultIndex, idx) snap, err = p.snapshotMinIndex(prevPlanResultIndex, pending.plan.SnapshotIndex) if err != nil { @@ -177,7 +178,7 @@ func (p *planner) planApply() { } } -// snapshotMinIndex wraps SnapshotAfter with a 5s timeout and converts timeout +// snapshotMinIndex wraps SnapshotAfter with a 10s timeout and converts timeout // errors to a more descriptive error message. The snapshot is guaranteed to // include both the previous plan and all objects referenced by the plan or // return an error. @@ -188,7 +189,11 @@ func (p *planner) snapshotMinIndex(prevPlanResultIndex, planSnapshotIndex uint64 // plan result's and current plan's snapshot index. minIndex := max(prevPlanResultIndex, planSnapshotIndex) - const timeout = 5 * time.Second + // This timeout creates backpressure where any concurrent + // Plan.Submit RPCs will block waiting for results. This sheds + // load across all servers and gives raft some CPU to catch up, + // because schedulers won't dequeue more work while waiting. + const timeout = 10 * time.Second ctx, cancel := context.WithTimeout(context.Background(), timeout) snap, err := p.fsm.State().SnapshotMinIndex(ctx, minIndex) cancel() @@ -368,14 +373,12 @@ func updateAllocTimestamps(allocations []*structs.Allocation, timestamp int64) { func (p *planner) asyncPlanWait(indexCh chan<- uint64, future raft.ApplyFuture, result *structs.PlanResult, pending *pendingPlan) { defer metrics.MeasureSince([]string{"nomad", "plan", "apply"}, time.Now()) + defer close(indexCh) // Wait for the plan to apply if err := future.Error(); err != nil { p.logger.Error("failed to apply plan", "error", err) pending.respond(nil, err) - - // Close indexCh on error - close(indexCh) return } diff --git a/nomad/plan_endpoint_test.go b/nomad/plan_endpoint_test.go index 8c02c2ba9a9..c36cb4f1989 100644 --- a/nomad/plan_endpoint_test.go +++ b/nomad/plan_endpoint_test.go @@ -1,6 +1,7 @@ package nomad import ( + "sync" "testing" "time" @@ -9,6 +10,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -128,3 +130,69 @@ func TestPlanEndpoint_Submit_Bad(t *testing.T) { // Ensure no plans were enqueued require.Zero(t, s1.planner.planQueue.Stats().Depth) } + +func TestPlanEndpoint_ApplyConcurrent(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 + }) + defer cleanupS1() + testutil.WaitForLeader(t, s1.RPC) + + plans := []*structs.Plan{} + + for i := 0; i < 5; i++ { + + // Create a node to place on + node := mock.Node() + store := s1.fsm.State() + require.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 100, node)) + + // Create the eval + eval1 := mock.Eval() + s1.evalBroker.Enqueue(eval1) + require.NoError(t, store.UpsertEvals( + structs.MsgTypeTestSetup, 150, []*structs.Evaluation{eval1})) + + evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second) + require.NoError(t, err) + require.Equal(t, eval1, evalOut) + + // Submit a plan + plan := mock.Plan() + plan.EvalID = eval1.ID + plan.EvalToken = token + plan.Job = mock.Job() + + alloc := mock.Alloc() + alloc.JobID = plan.Job.ID + alloc.Job = plan.Job + + plan.NodeAllocation = map[string][]*structs.Allocation{ + node.ID: []*structs.Allocation{alloc}} + + plans = append(plans, plan) + } + + var wg sync.WaitGroup + + for _, plan := range plans { + plan := plan + wg.Add(1) + go func() { + + req := &structs.PlanRequest{ + Plan: plan, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.PlanResponse + err := s1.RPC("Plan.Submit", req, &resp) + assert.NoError(t, err) + assert.NotNil(t, resp.Result, "missing result") + wg.Done() + }() + } + + wg.Wait() +}