diff --git a/.changelog/13407.txt b/.changelog/13407.txt new file mode 100644 index 00000000000..7a2fb340bac --- /dev/null +++ b/.changelog/13407.txt @@ -0,0 +1,3 @@ +```release-note:bug +core: Fixed a bug where the plan applier could deadlock if leader's state lagged behind plan's creation index for more than 5 seconds. +``` diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 7f1fffc54f8..58002b2258a 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -154,6 +154,7 @@ func (p *planner) planApply() { // This also limits how out of date our snapshot can be. if planIndexCh != nil { idx := <-planIndexCh + planIndexCh = nil prevPlanResultIndex = max(prevPlanResultIndex, idx) snap, err = p.snapshotMinIndex(prevPlanResultIndex, pending.plan.SnapshotIndex) if err != nil { @@ -177,7 +178,7 @@ func (p *planner) planApply() { } } -// snapshotMinIndex wraps SnapshotAfter with a 5s timeout and converts timeout +// snapshotMinIndex wraps SnapshotAfter with a 10s timeout and converts timeout // errors to a more descriptive error message. The snapshot is guaranteed to // include both the previous plan and all objects referenced by the plan or // return an error. @@ -188,7 +189,11 @@ func (p *planner) snapshotMinIndex(prevPlanResultIndex, planSnapshotIndex uint64 // plan result's and current plan's snapshot index. minIndex := max(prevPlanResultIndex, planSnapshotIndex) - const timeout = 5 * time.Second + // This timeout creates backpressure where any concurrent + // Plan.Submit RPCs will block waiting for results. This sheds + // load across all servers and gives raft some CPU to catch up, + // because schedulers won't dequeue more work while waiting. + const timeout = 10 * time.Second ctx, cancel := context.WithTimeout(context.Background(), timeout) snap, err := p.fsm.State().SnapshotMinIndex(ctx, minIndex) cancel() @@ -368,14 +373,12 @@ func updateAllocTimestamps(allocations []*structs.Allocation, timestamp int64) { func (p *planner) asyncPlanWait(indexCh chan<- uint64, future raft.ApplyFuture, result *structs.PlanResult, pending *pendingPlan) { defer metrics.MeasureSince([]string{"nomad", "plan", "apply"}, time.Now()) + defer close(indexCh) // Wait for the plan to apply if err := future.Error(); err != nil { p.logger.Error("failed to apply plan", "error", err) pending.respond(nil, err) - - // Close indexCh on error - close(indexCh) return } diff --git a/nomad/plan_endpoint_test.go b/nomad/plan_endpoint_test.go index 8c02c2ba9a9..c36cb4f1989 100644 --- a/nomad/plan_endpoint_test.go +++ b/nomad/plan_endpoint_test.go @@ -1,6 +1,7 @@ package nomad import ( + "sync" "testing" "time" @@ -9,6 +10,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -128,3 +130,69 @@ func TestPlanEndpoint_Submit_Bad(t *testing.T) { // Ensure no plans were enqueued require.Zero(t, s1.planner.planQueue.Stats().Depth) } + +func TestPlanEndpoint_ApplyConcurrent(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 + }) + defer cleanupS1() + testutil.WaitForLeader(t, s1.RPC) + + plans := []*structs.Plan{} + + for i := 0; i < 5; i++ { + + // Create a node to place on + node := mock.Node() + store := s1.fsm.State() + require.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 100, node)) + + // Create the eval + eval1 := mock.Eval() + s1.evalBroker.Enqueue(eval1) + require.NoError(t, store.UpsertEvals( + structs.MsgTypeTestSetup, 150, []*structs.Evaluation{eval1})) + + evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second) + require.NoError(t, err) + require.Equal(t, eval1, evalOut) + + // Submit a plan + plan := mock.Plan() + plan.EvalID = eval1.ID + plan.EvalToken = token + plan.Job = mock.Job() + + alloc := mock.Alloc() + alloc.JobID = plan.Job.ID + alloc.Job = plan.Job + + plan.NodeAllocation = map[string][]*structs.Allocation{ + node.ID: []*structs.Allocation{alloc}} + + plans = append(plans, plan) + } + + var wg sync.WaitGroup + + for _, plan := range plans { + plan := plan + wg.Add(1) + go func() { + + req := &structs.PlanRequest{ + Plan: plan, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.PlanResponse + err := s1.RPC("Plan.Submit", req, &resp) + assert.NoError(t, err) + assert.NotNil(t, resp.Result, "missing result") + wg.Done() + }() + } + + wg.Wait() +}