Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix deadlock in plan_apply #13407

Merged
merged 6 commits into from
Jun 23, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/13407.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
core: Fixed a bug where the plan applier could deadlock if raft load prevented a state store snapshot from completing within 5 seconds
tgross marked this conversation as resolved.
Show resolved Hide resolved
```
13 changes: 8 additions & 5 deletions nomad/plan_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func (p *planner) planApply() {
if err != nil {
p.logger.Error("failed to update snapshot state", "error", err)
pending.respond(nil, err)
planIndexCh = nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be on L157 right after we receive from the chan?

AFAICT the only case where this move would make a difference is if applyPlan on L168 returns a non-nil error. It would then continue with a non-nil planIndexCh that will never receive another result.

I think this change (immediately nil'ing the chan) would match the behavior of the only other place we receiving on planIndexCh: L105 in the select. The case where we receive on the chan immediately and unconditionally sets it nil there as well.

To put another way: I think planIndexCh is always a one-shot chan. It will always receive exactly one index value and never get reused.

If you think that's accurate I wonder if we should wrap planIndexCh in a little struct with a method like:

func (t *T) Recv() int {
  v := <-t.ch
  t.ch = nil
  return v
}

We also use planIndexCh == nil to signal "no outstanding plan application", so our struct would probably need a helper method for that too. Unsure if the extra struct would simplify or complicate reading this code... just an idea.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be on L157 right after we receive from the chan?

Yup, good catch. I was focused on the error branch because there's where the bug showed up, but there's no reason to keep it once we've received a value.

To put another way: I think planIndexCh is always a one-shot chan. It will always receive exactly one index value and never get reused.

Yes, agreed!

If you think that's accurate I wonder if we should wrap planIndexCh in a little struct with a method like:
...
We also use planIndexCh == nil to signal "no outstanding plan application", so our struct would probably need a helper method for that too. Unsure if the extra struct would simplify or complicate reading this code... just an idea.

Hm... I like this idea in principle. But the logic here leans heavily on the specific semantics of channels (ex. a closed channel isn't nil) and wrapping it in a struct just seems like it'll obscure that by moving the logic away from where we're using it. The fallthrough select on line 104-113 makes this especially gross because it relies on the fact that we get 0 on error, so we can't even call recv() and check for 0 there so it needs it's own index, ok := recvOrDefault() function.

some example code
type asyncPlanIndex struct {
	planIndexCh chan uint64
}

func (a *asyncPlanIndex) hasOutstandingPlan() bool {
	return a.planIndexCh == nil
}

func (a *asyncPlanIndex) recv() uint64 {
	v := <-a.planIndexCh
	a.planIndexCh = nil
	return v
}

func (a *asyncPlanIndex) recvOrDefault() (uint64, bool) {
	select {
	case v := <-a.planIndexCh:
		return v, true
	default:
		return 0, false
	}
}

func (a *asyncPlanIndex) send(v uint64) {
	a.planIndexCh <- v
	close(a.planIndexCh)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the logic here leans heavily on the specific semantics of channels

Oof yeah, this is a pattern to watch out for in the future. Not always a problem but clearly made this code more fragile than if a little effort had been put into encapsulating the logic in a little purpose-built struct.

continue
}
}
Expand All @@ -177,7 +178,7 @@ func (p *planner) planApply() {
}
}

// snapshotMinIndex wraps SnapshotAfter with a 5s timeout and converts timeout
// snapshotMinIndex wraps SnapshotAfter with a 10s timeout and converts timeout
// errors to a more descriptive error message. The snapshot is guaranteed to
// include both the previous plan and all objects referenced by the plan or
// return an error.
Expand All @@ -188,7 +189,11 @@ func (p *planner) snapshotMinIndex(prevPlanResultIndex, planSnapshotIndex uint64
// plan result's and current plan's snapshot index.
minIndex := max(prevPlanResultIndex, planSnapshotIndex)

const timeout = 5 * time.Second
// This timeout creates backpressure where any concurrent
// Plan.Submit RPCs will block waiting for results. This sheds
// load across all servers and gives raft some CPU to catch up,
// because schedulers won't dequeue more work while waiting.
const timeout = 10 * time.Second
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - The following are observations/thoughts I just wanted to record somewhere in case they're useful (or in case my "LGTM" is based on flawed assumptions).

This does make this code differ from the worker's use of SnapshotMinIndex via worker.snapshotMinIndex. I think this is probably ideal:

The plan applier blocking here puts a lot of backpressure on all schedulers and should dramatically help in cases where Raft/FSM is really struggling.

The shorter 5s min index timeout still on the worker seems fine because it fails up to the main eval retry loop which introduces its own additional retries.

...although looking at the code around wait-for-index failures in the worker it appears we lose/drop the RefreshIndex sent by the server if we hit that timeout-and-retry case. This would cause the retry to use its existing old snapshot. Probably not intentional but probably optimal as it lets workers keep creating plans and the plan applier will toss out the invalid bits with its stricter view of state.

ctx, cancel := context.WithTimeout(context.Background(), timeout)
snap, err := p.fsm.State().SnapshotMinIndex(ctx, minIndex)
cancel()
Expand Down Expand Up @@ -368,14 +373,12 @@ func updateAllocTimestamps(allocations []*structs.Allocation, timestamp int64) {
func (p *planner) asyncPlanWait(indexCh chan<- uint64, future raft.ApplyFuture,
result *structs.PlanResult, pending *pendingPlan) {
defer metrics.MeasureSince([]string{"nomad", "plan", "apply"}, time.Now())
defer close(indexCh)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this alone have fixed the deadlock? I think so and since we always use this index inside a max(...) it seems like an always safe operation.

Oof, good lesson about writing defensive code the first time round.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You never know if you will have time to come back and make it better in a second pass 🤣

Copy link
Member Author

@tgross tgross Jun 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this alone have fixed the deadlock?

Yes! That's how we did the initial test and fix. Nil'ing the channel gets us a few additional things:

  • It's belt-and-suspenders so that we know there's no possible way we're still polling on the channel.
  • It makes the usage consistent so that the reader doesn't wonder why the channel is being nil'd in one spot but not the other.
  • It locally communicates to the reader that the channel is being discarded, as opposed to having to go read the asyncPlanWait code further down (and doing so is super cheap).


// Wait for the plan to apply
if err := future.Error(); err != nil {
p.logger.Error("failed to apply plan", "error", err)
pending.respond(nil, err)

// Close indexCh on error
close(indexCh)
return
}

Expand Down
68 changes: 68 additions & 0 deletions nomad/plan_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nomad

import (
"sync"
"testing"
"time"

Expand All @@ -9,6 +10,7 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -128,3 +130,69 @@ func TestPlanEndpoint_Submit_Bad(t *testing.T) {
// Ensure no plans were enqueued
require.Zero(t, s1.planner.planQueue.Stats().Depth)
}

func TestPlanEndpoint_ApplyConcurrent(t *testing.T) {
t.Parallel()

s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)

plans := []*structs.Plan{}

for i := 0; i < 5; i++ {

// Create a node to place on
node := mock.Node()
store := s1.fsm.State()
require.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 100, node))

// Create the eval
eval1 := mock.Eval()
s1.evalBroker.Enqueue(eval1)
require.NoError(t, store.UpsertEvals(
structs.MsgTypeTestSetup, 150, []*structs.Evaluation{eval1}))

evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second)
require.NoError(t, err)
require.Equal(t, eval1, evalOut)

// Submit a plan
plan := mock.Plan()
plan.EvalID = eval1.ID
plan.EvalToken = token
plan.Job = mock.Job()

alloc := mock.Alloc()
alloc.JobID = plan.Job.ID
alloc.Job = plan.Job

plan.NodeAllocation = map[string][]*structs.Allocation{
node.ID: []*structs.Allocation{alloc}}

plans = append(plans, plan)
}

var wg sync.WaitGroup

for _, plan := range plans {
plan := plan
wg.Add(1)
go func() {

req := &structs.PlanRequest{
Plan: plan,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.PlanResponse
err := s1.RPC("Plan.Submit", req, &resp)
assert.NoError(t, err)
assert.NotNil(t, resp.Result, "missing result")
wg.Done()
}()
}

wg.Wait()
}