Skip to content

Commit

Permalink
fix deadlock in plan_apply
Browse files Browse the repository at this point in the history
The plan applier has to get a snapshot with a minimum index for the
plan it's working on in order to ensure consistency. Under heavy raft
loads, we can exceed the timeout. When this happens, we hit a bug
where the plan applier blocks waiting on the `indexCh` forever, and
all schedulers will block in `Plan.Submit`.

Closing the `indexCh` when the `asyncPlanWait` is done with it will
prevent the deadlock without impacting correctness of the previous
snapshot index.

This changeset includes the a PoC failing test that works by injecting
a large timeout into the state store. We need to turn this into a test
we can run normally without breaking the state store before we can
merge this PR.
  • Loading branch information
tgross committed Jun 17, 2022
1 parent 37ee500 commit 2f7f862
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 3 deletions.
7 changes: 4 additions & 3 deletions nomad/plan_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,15 @@ func (p *planner) planApply() {
// Ensure any parallel apply is complete before starting the next one.
// This also limits how out of date our snapshot can be.
if planIndexCh != nil {
fmt.Println("waiting for idx...") // DEBUG
idx := <-planIndexCh
fmt.Println("got index", idx) // DEBUG
prevPlanResultIndex = max(prevPlanResultIndex, idx)
snap, err = p.snapshotMinIndex(prevPlanResultIndex, pending.plan.SnapshotIndex)
if err != nil {
p.logger.Error("failed to update snapshot state", "error", err)
pending.respond(nil, err)
planIndexCh = nil
continue
}
}
Expand Down Expand Up @@ -368,14 +371,12 @@ func updateAllocTimestamps(allocations []*structs.Allocation, timestamp int64) {
func (p *planner) asyncPlanWait(indexCh chan<- uint64, future raft.ApplyFuture,
result *structs.PlanResult, pending *pendingPlan) {
defer metrics.MeasureSince([]string{"nomad", "plan", "apply"}, time.Now())
defer close(indexCh)

// Wait for the plan to apply
if err := future.Error(); err != nil {
p.logger.Error("failed to apply plan", "error", err)
pending.respond(nil, err)

// Close indexCh on error
close(indexCh)
return
}

Expand Down
68 changes: 68 additions & 0 deletions nomad/plan_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nomad

import (
"sync"
"testing"
"time"

Expand All @@ -9,6 +10,7 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -128,3 +130,69 @@ func TestPlanEndpoint_Submit_Bad(t *testing.T) {
// Ensure no plans were enqueued
require.Zero(t, s1.planner.planQueue.Stats().Depth)
}

func TestPlanEndpoint_ApplyDeadlock(t *testing.T) {
t.Parallel()

s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)

plans := []*structs.Plan{}

for i := 0; i < 5; i++ {

// Create a node to place on
node := mock.Node()
store := s1.fsm.State()
require.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 100, node))

// Create the eval
eval1 := mock.Eval()
s1.evalBroker.Enqueue(eval1)
require.NoError(t, store.UpsertEvals(
structs.MsgTypeTestSetup, 150, []*structs.Evaluation{eval1}))

evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second)
require.NoError(t, err)
require.Equal(t, eval1, evalOut)

// Submit a plan
plan := mock.Plan()
plan.EvalID = eval1.ID
plan.EvalToken = token
plan.Job = mock.Job()

alloc := mock.Alloc()
alloc.JobID = plan.Job.ID
alloc.Job = plan.Job

plan.NodeAllocation = map[string][]*structs.Allocation{
node.ID: []*structs.Allocation{alloc}}

plans = append(plans, plan)
}

var wg sync.WaitGroup

for _, plan := range plans {
plan := plan
wg.Add(1)
go func() {

req := &structs.PlanRequest{
Plan: plan,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.PlanResponse
err := s1.RPC("Plan.Submit", req, &resp)
assert.NoError(t, err)
assert.NotNil(t, resp.Result, "missing result")
wg.Done()
}()
}

wg.Wait()
}
10 changes: 10 additions & 0 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) {
return snap, nil
}

// DEBUG: this is to introduce a one-time timeout
var stop = true

// SnapshotMinIndex is used to create a state snapshot where the index is
// guaranteed to be greater than or equal to the index parameter.
//
Expand All @@ -222,6 +225,13 @@ func (s *StateStore) SnapshotMinIndex(ctx context.Context, index uint64) (*State
var retries uint
var retryTimer *time.Timer

// DEBUG: this is to introduce a one-time timeout
if index == 7 && stop {
stop = false
time.Sleep(6000 * time.Millisecond)
return nil, ctx.Err()
}

// XXX: Potential optimization is to set up a watch on the state
// store's index table and only unblock via a trigger rather than
// polling.
Expand Down

0 comments on commit 2f7f862

Please sign in to comment.