Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix deadlock in plan_apply #13407

Merged
merged 6 commits into from
Jun 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/13407.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
core: Fixed a bug where the plan applier could deadlock if leader's state lagged behind plan's creation index for more than 5 seconds.
```
13 changes: 8 additions & 5 deletions nomad/plan_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func (p *planner) planApply() {
// This also limits how out of date our snapshot can be.
if planIndexCh != nil {
idx := <-planIndexCh
planIndexCh = nil
prevPlanResultIndex = max(prevPlanResultIndex, idx)
snap, err = p.snapshotMinIndex(prevPlanResultIndex, pending.plan.SnapshotIndex)
if err != nil {
Expand All @@ -177,7 +178,7 @@ func (p *planner) planApply() {
}
}

// snapshotMinIndex wraps SnapshotAfter with a 5s timeout and converts timeout
// snapshotMinIndex wraps SnapshotAfter with a 10s timeout and converts timeout
// errors to a more descriptive error message. The snapshot is guaranteed to
// include both the previous plan and all objects referenced by the plan or
// return an error.
Expand All @@ -188,7 +189,11 @@ func (p *planner) snapshotMinIndex(prevPlanResultIndex, planSnapshotIndex uint64
// plan result's and current plan's snapshot index.
minIndex := max(prevPlanResultIndex, planSnapshotIndex)

const timeout = 5 * time.Second
// This timeout creates backpressure where any concurrent
// Plan.Submit RPCs will block waiting for results. This sheds
// load across all servers and gives raft some CPU to catch up,
// because schedulers won't dequeue more work while waiting.
const timeout = 10 * time.Second
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - The following are observations/thoughts I just wanted to record somewhere in case they're useful (or in case my "LGTM" is based on flawed assumptions).

This does make this code differ from the worker's use of SnapshotMinIndex via worker.snapshotMinIndex. I think this is probably ideal:

The plan applier blocking here puts a lot of backpressure on all schedulers and should dramatically help in cases where Raft/FSM is really struggling.

The shorter 5s min index timeout still on the worker seems fine because it fails up to the main eval retry loop which introduces its own additional retries.

...although looking at the code around wait-for-index failures in the worker it appears we lose/drop the RefreshIndex sent by the server if we hit that timeout-and-retry case. This would cause the retry to use its existing old snapshot. Probably not intentional but probably optimal as it lets workers keep creating plans and the plan applier will toss out the invalid bits with its stricter view of state.

ctx, cancel := context.WithTimeout(context.Background(), timeout)
snap, err := p.fsm.State().SnapshotMinIndex(ctx, minIndex)
cancel()
Expand Down Expand Up @@ -368,14 +373,12 @@ func updateAllocTimestamps(allocations []*structs.Allocation, timestamp int64) {
func (p *planner) asyncPlanWait(indexCh chan<- uint64, future raft.ApplyFuture,
result *structs.PlanResult, pending *pendingPlan) {
defer metrics.MeasureSince([]string{"nomad", "plan", "apply"}, time.Now())
defer close(indexCh)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this alone have fixed the deadlock? I think so and since we always use this index inside a max(...) it seems like an always safe operation.

Oof, good lesson about writing defensive code the first time round.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You never know if you will have time to come back and make it better in a second pass 🤣

Copy link
Member Author

@tgross tgross Jun 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this alone have fixed the deadlock?

Yes! That's how we did the initial test and fix. Nil'ing the channel gets us a few additional things:

  • It's belt-and-suspenders so that we know there's no possible way we're still polling on the channel.
  • It makes the usage consistent so that the reader doesn't wonder why the channel is being nil'd in one spot but not the other.
  • It locally communicates to the reader that the channel is being discarded, as opposed to having to go read the asyncPlanWait code further down (and doing so is super cheap).


// Wait for the plan to apply
if err := future.Error(); err != nil {
p.logger.Error("failed to apply plan", "error", err)
pending.respond(nil, err)

// Close indexCh on error
close(indexCh)
return
}

Expand Down
68 changes: 68 additions & 0 deletions nomad/plan_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nomad

import (
"sync"
"testing"
"time"

Expand All @@ -9,6 +10,7 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -128,3 +130,69 @@ func TestPlanEndpoint_Submit_Bad(t *testing.T) {
// Ensure no plans were enqueued
require.Zero(t, s1.planner.planQueue.Stats().Depth)
}

func TestPlanEndpoint_ApplyConcurrent(t *testing.T) {
t.Parallel()

s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)

plans := []*structs.Plan{}

for i := 0; i < 5; i++ {

// Create a node to place on
node := mock.Node()
store := s1.fsm.State()
require.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 100, node))

// Create the eval
eval1 := mock.Eval()
s1.evalBroker.Enqueue(eval1)
require.NoError(t, store.UpsertEvals(
structs.MsgTypeTestSetup, 150, []*structs.Evaluation{eval1}))

evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second)
require.NoError(t, err)
require.Equal(t, eval1, evalOut)

// Submit a plan
plan := mock.Plan()
plan.EvalID = eval1.ID
plan.EvalToken = token
plan.Job = mock.Job()

alloc := mock.Alloc()
alloc.JobID = plan.Job.ID
alloc.Job = plan.Job

plan.NodeAllocation = map[string][]*structs.Allocation{
node.ID: []*structs.Allocation{alloc}}

plans = append(plans, plan)
}

var wg sync.WaitGroup

for _, plan := range plans {
plan := plan
wg.Add(1)
go func() {

req := &structs.PlanRequest{
Plan: plan,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.PlanResponse
err := s1.RPC("Plan.Submit", req, &resp)
assert.NoError(t, err)
assert.NotNil(t, resp.Result, "missing result")
wg.Done()
}()
}

wg.Wait()
}