From 21f9ce9dfa1d14746eb61671c8cdab854c44e61e Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 16 Jun 2022 20:55:56 +0000 Subject: [PATCH 1/6] backport of commit 2f7f862077f9a27980e05f2cfbd51cfd85eec96a --- nomad/plan_apply.go | 7 ++-- nomad/plan_endpoint_test.go | 68 +++++++++++++++++++++++++++++++++++++ nomad/state/state_store.go | 10 ++++++ 3 files changed, 82 insertions(+), 3 deletions(-) diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 8624631ff36..cc5a3c7d6c3 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -153,12 +153,15 @@ func (p *planner) planApply() { // Ensure any parallel apply is complete before starting the next one. // This also limits how out of date our snapshot can be. if planIndexCh != nil { + fmt.Println("waiting for idx...") // DEBUG idx := <-planIndexCh + fmt.Println("got index", idx) // DEBUG prevPlanResultIndex = max(prevPlanResultIndex, idx) snap, err = p.snapshotMinIndex(prevPlanResultIndex, pending.plan.SnapshotIndex) if err != nil { p.logger.Error("failed to update snapshot state", "error", err) pending.respond(nil, err) + planIndexCh = nil continue } } @@ -368,14 +371,12 @@ func updateAllocTimestamps(allocations []*structs.Allocation, timestamp int64) { func (p *planner) asyncPlanWait(indexCh chan<- uint64, future raft.ApplyFuture, result *structs.PlanResult, pending *pendingPlan) { defer metrics.MeasureSince([]string{"nomad", "plan", "apply"}, time.Now()) + defer close(indexCh) // Wait for the plan to apply if err := future.Error(); err != nil { p.logger.Error("failed to apply plan", "error", err) pending.respond(nil, err) - - // Close indexCh on error - close(indexCh) return } diff --git a/nomad/plan_endpoint_test.go b/nomad/plan_endpoint_test.go index 8c02c2ba9a9..963be8972ca 100644 --- a/nomad/plan_endpoint_test.go +++ b/nomad/plan_endpoint_test.go @@ -1,6 +1,7 @@ package nomad import ( + "sync" "testing" "time" @@ -9,6 +10,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -128,3 +130,69 @@ func TestPlanEndpoint_Submit_Bad(t *testing.T) { // Ensure no plans were enqueued require.Zero(t, s1.planner.planQueue.Stats().Depth) } + +func TestPlanEndpoint_ApplyDeadlock(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 + }) + defer cleanupS1() + testutil.WaitForLeader(t, s1.RPC) + + plans := []*structs.Plan{} + + for i := 0; i < 5; i++ { + + // Create a node to place on + node := mock.Node() + store := s1.fsm.State() + require.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 100, node)) + + // Create the eval + eval1 := mock.Eval() + s1.evalBroker.Enqueue(eval1) + require.NoError(t, store.UpsertEvals( + structs.MsgTypeTestSetup, 150, []*structs.Evaluation{eval1})) + + evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second) + require.NoError(t, err) + require.Equal(t, eval1, evalOut) + + // Submit a plan + plan := mock.Plan() + plan.EvalID = eval1.ID + plan.EvalToken = token + plan.Job = mock.Job() + + alloc := mock.Alloc() + alloc.JobID = plan.Job.ID + alloc.Job = plan.Job + + plan.NodeAllocation = map[string][]*structs.Allocation{ + node.ID: []*structs.Allocation{alloc}} + + plans = append(plans, plan) + } + + var wg sync.WaitGroup + + for _, plan := range plans { + plan := plan + wg.Add(1) + go func() { + + req := &structs.PlanRequest{ + Plan: plan, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.PlanResponse + err := s1.RPC("Plan.Submit", req, &resp) + assert.NoError(t, err) + assert.NotNil(t, resp.Result, "missing result") + wg.Done() + }() + } + + wg.Wait() +} diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 16ea849c176..e018c083fd7 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -185,6 +185,9 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) { return snap, nil } +// DEBUG: this is to introduce a one-time timeout +var stop = true + // SnapshotMinIndex is used to create a state snapshot where the index is // guaranteed to be greater than or equal to the index parameter. // @@ -203,6 +206,13 @@ func (s *StateStore) SnapshotMinIndex(ctx context.Context, index uint64) (*State var retries uint var retryTimer *time.Timer + // DEBUG: this is to introduce a one-time timeout + if index == 7 && stop { + stop = false + time.Sleep(6000 * time.Millisecond) + return nil, ctx.Err() + } + // XXX: Potential optimization is to set up a watch on the state // store's index table and only unblock via a trigger rather than // polling. From cf9eb46e194f21a9ae0889bdaa11f76cfcb65394 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Fri, 17 Jun 2022 18:08:45 +0000 Subject: [PATCH 2/6] backport of commit 6fab93703fc70a7440a6e4fbd7dd5d7e5bcb81db --- nomad/plan_apply.go | 2 -- nomad/plan_endpoint_test.go | 2 +- nomad/state/state_store.go | 10 ---------- 3 files changed, 1 insertion(+), 13 deletions(-) diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index cc5a3c7d6c3..e78c606cb34 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -153,9 +153,7 @@ func (p *planner) planApply() { // Ensure any parallel apply is complete before starting the next one. // This also limits how out of date our snapshot can be. if planIndexCh != nil { - fmt.Println("waiting for idx...") // DEBUG idx := <-planIndexCh - fmt.Println("got index", idx) // DEBUG prevPlanResultIndex = max(prevPlanResultIndex, idx) snap, err = p.snapshotMinIndex(prevPlanResultIndex, pending.plan.SnapshotIndex) if err != nil { diff --git a/nomad/plan_endpoint_test.go b/nomad/plan_endpoint_test.go index 963be8972ca..c36cb4f1989 100644 --- a/nomad/plan_endpoint_test.go +++ b/nomad/plan_endpoint_test.go @@ -131,7 +131,7 @@ func TestPlanEndpoint_Submit_Bad(t *testing.T) { require.Zero(t, s1.planner.planQueue.Stats().Depth) } -func TestPlanEndpoint_ApplyDeadlock(t *testing.T) { +func TestPlanEndpoint_ApplyConcurrent(t *testing.T) { t.Parallel() s1, cleanupS1 := TestServer(t, func(c *Config) { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index e018c083fd7..16ea849c176 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -185,9 +185,6 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) { return snap, nil } -// DEBUG: this is to introduce a one-time timeout -var stop = true - // SnapshotMinIndex is used to create a state snapshot where the index is // guaranteed to be greater than or equal to the index parameter. // @@ -206,13 +203,6 @@ func (s *StateStore) SnapshotMinIndex(ctx context.Context, index uint64) (*State var retries uint var retryTimer *time.Timer - // DEBUG: this is to introduce a one-time timeout - if index == 7 && stop { - stop = false - time.Sleep(6000 * time.Millisecond) - return nil, ctx.Err() - } - // XXX: Potential optimization is to set up a watch on the state // store's index table and only unblock via a trigger rather than // polling. From ec3831cfd8a3a38c24f28afdcc990197c8646fa0 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Fri, 17 Jun 2022 18:28:46 +0000 Subject: [PATCH 3/6] backport of commit 5e0964ef1768aa4bc8d2b1f71989da040a69b25a --- .changelog/13407.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/13407.txt diff --git a/.changelog/13407.txt b/.changelog/13407.txt new file mode 100644 index 00000000000..8376f89eac4 --- /dev/null +++ b/.changelog/13407.txt @@ -0,0 +1,3 @@ +```release-note:bug +core: Fixed a bug where the plan applier could deadlock if raft load prevented a state store snapshot from completing within 5 seconds +``` From ba0fd4b7656b1e7d7ff215dfb1cd0ec1c41dbe2c Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 21 Jun 2022 18:31:22 +0000 Subject: [PATCH 4/6] backport of commit 247a03f5e9c666fd64a66e6a8aacd61de73adfad --- nomad/plan_apply.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index e78c606cb34..f40942e25e0 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -178,7 +178,7 @@ func (p *planner) planApply() { } } -// snapshotMinIndex wraps SnapshotAfter with a 5s timeout and converts timeout +// snapshotMinIndex wraps SnapshotAfter with a 10s timeout and converts timeout // errors to a more descriptive error message. The snapshot is guaranteed to // include both the previous plan and all objects referenced by the plan or // return an error. @@ -189,7 +189,11 @@ func (p *planner) snapshotMinIndex(prevPlanResultIndex, planSnapshotIndex uint64 // plan result's and current plan's snapshot index. minIndex := max(prevPlanResultIndex, planSnapshotIndex) - const timeout = 5 * time.Second + // This timeout creates backpressure where any concurrent + // Plan.Submit RPCs will block waiting for results. This sheds + // load across all servers and gives raft some CPU to catch up, + // because schedulers won't dequeue more work while waiting. + const timeout = 10 * time.Second ctx, cancel := context.WithTimeout(context.Background(), timeout) snap, err := p.fsm.State().SnapshotMinIndex(ctx, minIndex) cancel() From 0ded14054c4e6eaec81dc1465b2234a51fe66cc7 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Wed, 22 Jun 2022 13:45:59 +0000 Subject: [PATCH 5/6] backport of commit 6fbd2a86b1ee80fbd4112a3432efe7b4e558bda3 --- .changelog/13407.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changelog/13407.txt b/.changelog/13407.txt index 8376f89eac4..7a2fb340bac 100644 --- a/.changelog/13407.txt +++ b/.changelog/13407.txt @@ -1,3 +1,3 @@ ```release-note:bug -core: Fixed a bug where the plan applier could deadlock if raft load prevented a state store snapshot from completing within 5 seconds +core: Fixed a bug where the plan applier could deadlock if leader's state lagged behind plan's creation index for more than 5 seconds. ``` From ade8b9a8867f1a8a7be5e8862400775056a032ae Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Wed, 22 Jun 2022 14:11:29 +0000 Subject: [PATCH 6/6] backport of commit 2a358ace8be4aaf02b789f5687de0da653f531bf --- nomad/plan_apply.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index f40942e25e0..f9b189b2697 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -154,12 +154,12 @@ func (p *planner) planApply() { // This also limits how out of date our snapshot can be. if planIndexCh != nil { idx := <-planIndexCh + planIndexCh = nil prevPlanResultIndex = max(prevPlanResultIndex, idx) snap, err = p.snapshotMinIndex(prevPlanResultIndex, pending.plan.SnapshotIndex) if err != nil { p.logger.Error("failed to update snapshot state", "error", err) pending.respond(nil, err) - planIndexCh = nil continue } }