Skip to content

Commit

Permalink
scheduler: RescheduleTracker dropped if follow-up fails placements (#…
Browse files Browse the repository at this point in the history
…12319)

When an allocation fails it triggers an evaluation. The evaluation is processed
and the scheduler sees it needs to reschedule, which triggers a follow-up
eval. The follow-up eval creates a plan to `(stop 1) (place 1)`. The replacement
alloc has a `RescheduleTracker` (or gets its `RescheduleTracker` updated).

But in the case where the follow-up eval can't place all allocs (there aren't
enough resources), it can create a partial plan to `(stop 1) (place 0)`. It then
creates a blocked eval. The plan applier stops the failed alloc. Then when the
blocked eval is processed, the job is missing an allocation, so the scheduler
creates a new allocation. This allocation is _not_ a replacement from the
perspective of the scheduler, so it's not handed off a `RescheduleTracker`.

This changeset fixes this by annotating the reschedule tracker whenever the
scheduler can't place a replacement allocation. We check this annotation for
allocations that have the `stop` desired status when filtering out allocations
to pass to the reschedule tracker. I've also included tests that cover this case
and expands coverage of the relevant area of the code.

Fixes: #12147
Fixes: #17072
  • Loading branch information
tgross authored Jun 10, 2024
1 parent ffcb72b commit fa70267
Show file tree
Hide file tree
Showing 8 changed files with 390 additions and 25 deletions.
3 changes: 3 additions & 0 deletions .changelog/12319.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
scheduler: Fixed a bug where rescheduled allocations that could not be placed would later ignore their reschedule policy limits
```
3 changes: 2 additions & 1 deletion api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,8 @@ type GenericResponse struct {

// RescheduleTracker encapsulates previous reschedule events
type RescheduleTracker struct {
Events []*RescheduleEvent
Events []*RescheduleEvent
LastReschedule string
}

// RescheduleEvent is used to keep track of previous attempts at rescheduling an allocation
Expand Down
25 changes: 24 additions & 1 deletion nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10612,8 +10612,19 @@ type DeploymentStatusUpdate struct {
// RescheduleTracker encapsulates previous reschedule events
type RescheduleTracker struct {
Events []*RescheduleEvent

// LastReschedule represents whether the most recent attempt to reschedule
// the allocation (if any) was successful
LastReschedule RescheduleTrackerAnnotation
}

type RescheduleTrackerAnnotation string

const (
LastRescheduleSuccess RescheduleTrackerAnnotation = "ok"
LastRescheduleFailedToPlace RescheduleTrackerAnnotation = "no placement"
)

func (rt *RescheduleTracker) Copy() *RescheduleTracker {
if rt == nil {
return nil
Expand Down Expand Up @@ -11189,7 +11200,9 @@ func (a *Allocation) NextRescheduleTime() (time.Time, bool) {
return time.Time{}, false
}

if a.DesiredStatus == AllocDesiredStatusStop || a.ClientStatus != AllocClientStatusFailed || failTime.IsZero() || reschedulePolicy == nil {
if (a.DesiredStatus == AllocDesiredStatusStop && !a.LastRescheduleFailed()) ||
(a.ClientStatus != AllocClientStatusFailed && a.ClientStatus != AllocClientStatusLost) ||
failTime.IsZero() || reschedulePolicy == nil {
return time.Time{}, false
}

Expand Down Expand Up @@ -11617,6 +11630,16 @@ func (a *Allocation) HasAnyPausedTasks() bool {
return false
}

// LastRescheduleFailed returns whether the scheduler previously attempted to
// reschedule this allocation but failed to find a placement
func (a *Allocation) LastRescheduleFailed() bool {
if a.RescheduleTracker == nil {
return false
}
return a.RescheduleTracker.LastReschedule != "" &&
a.RescheduleTracker.LastReschedule != LastRescheduleSuccess
}

// IdentityClaims are the input to a JWT identifying a workload. It
// should never be serialized to msgpack unsigned.
type IdentityClaims struct {
Expand Down
5 changes: 4 additions & 1 deletion nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5370,7 +5370,10 @@ func TestAllocation_ShouldReschedule(t *testing.T) {
alloc := Allocation{}
alloc.DesiredStatus = state.DesiredStatus
alloc.ClientStatus = state.ClientStatus
alloc.RescheduleTracker = &RescheduleTracker{state.RescheduleTrackers}
alloc.RescheduleTracker = &RescheduleTracker{
Events: state.RescheduleTrackers,
LastReschedule: "",
}

t.Run(state.Desc, func(t *testing.T) {
if got := alloc.ShouldReschedule(state.ReschedulePolicy, state.FailTime); got != state.ShouldReschedule {
Expand Down
22 changes: 21 additions & 1 deletion scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,16 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
if stopPrevAlloc {
s.plan.PopUpdate(prevAllocation)
}

// If we were trying to replace a rescheduling alloc, mark the
// reschedule as failed so that we can retry it in the following
// blocked eval without dropping the reschedule tracker
if prevAllocation != nil {
if missing.IsRescheduling() {
annotateRescheduleTracker(prevAllocation, structs.LastRescheduleFailedToPlace)
}
}

}

}
Expand Down Expand Up @@ -835,6 +845,13 @@ func getSelectOptions(prevAllocation *structs.Allocation, preferredNode *structs
return selectOptions
}

func annotateRescheduleTracker(prev *structs.Allocation, note structs.RescheduleTrackerAnnotation) {
if prev.RescheduleTracker == nil {
prev.RescheduleTracker = &structs.RescheduleTracker{}
}
prev.RescheduleTracker.LastReschedule = note
}

// updateRescheduleTracker carries over previous restart attempts and adds the most recent restart
func updateRescheduleTracker(alloc *structs.Allocation, prev *structs.Allocation, now time.Time) {
reschedPolicy := prev.ReschedulePolicy()
Expand Down Expand Up @@ -869,7 +886,10 @@ func updateRescheduleTracker(alloc *structs.Allocation, prev *structs.Allocation
nextDelay := prev.NextDelay()
rescheduleEvent := structs.NewRescheduleEvent(now.UnixNano(), prev.ID, prev.NodeID, nextDelay)
rescheduleEvents = append(rescheduleEvents, rescheduleEvent)
alloc.RescheduleTracker = &structs.RescheduleTracker{Events: rescheduleEvents}
alloc.RescheduleTracker = &structs.RescheduleTracker{
Events: rescheduleEvents,
LastReschedule: structs.LastRescheduleSuccess}
annotateRescheduleTracker(prev, structs.LastRescheduleSuccess)
}

// findPreferredNode finds the preferred node for an allocation
Expand Down
196 changes: 196 additions & 0 deletions scheduler/generic_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4641,6 +4641,202 @@ func TestServiceSched_Reschedule_MultipleNow(t *testing.T) {
assert.Equal(5, len(out)) // 2 original, plus 3 reschedule attempts
}

func TestServiceSched_BlockedReschedule(t *testing.T) {
ci.Parallel(t)

h := NewHarness(t)
node := mock.Node()
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))

// Generate a fake job with an allocation and an update policy.
job := mock.Job()
job.TaskGroups[0].Count = 1
delayDuration := 15 * time.Second
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: 3,
Interval: 15 * time.Minute,
Delay: delayDuration,
MaxDelay: 1 * time.Minute,
DelayFunction: "constant",
}
tgName := job.TaskGroups[0].Name
now := time.Now()

must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))

alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
alloc.ClientStatus = structs.AllocClientStatusFailed
alloc.TaskStates = map[string]*structs.TaskState{tgName: {State: "dead",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now}}
failedAllocID := alloc.ID

must.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup,
h.NextIndex(), []*structs.Allocation{alloc}))

// Create a mock evaluation for the allocation failure
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerRetryFailedAlloc,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup,
h.NextIndex(), []*structs.Evaluation{eval}))

// -----------------------------------
// first reschedule which works with delay as expected

// Process the evaluation and assert we have a plan
must.NoError(t, h.Process(NewServiceScheduler, eval))
must.Len(t, 1, h.Plans)
must.MapLen(t, 0, h.Plans[0].NodeUpdate) // stop
must.MapLen(t, 1, h.Plans[0].NodeAllocation) // place

// Lookup the allocations by JobID and verify no new allocs created
ws := memdb.NewWatchSet()
out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
must.NoError(t, err)
must.Len(t, 1, out)

// Verify follow-up eval was created for the failed alloc
// and write the eval to the state store
alloc, err = h.State.AllocByID(ws, failedAllocID)
must.NoError(t, err)
must.NotEq(t, "", alloc.FollowupEvalID)
must.Len(t, 1, h.CreateEvals)
followupEval := h.CreateEvals[0]
must.Eq(t, structs.EvalStatusPending, followupEval.Status)
must.Eq(t, now.Add(delayDuration), followupEval.WaitUntil)
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup,
h.NextIndex(), []*structs.Evaluation{followupEval}))

// Follow-up delay "expires", so process the follow-up eval, which results
// in a replacement and stop
must.NoError(t, h.Process(NewServiceScheduler, followupEval))
must.Len(t, 2, h.Plans)
must.MapLen(t, 1, h.Plans[1].NodeUpdate) // stop
must.MapLen(t, 1, h.Plans[1].NodeAllocation) // place

out, err = h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
must.NoError(t, err)
must.Len(t, 2, out)

var replacementAllocID string
for _, alloc := range out {
if alloc.ID != failedAllocID {
must.NotNil(t, alloc.RescheduleTracker,
must.Sprint("replacement alloc should have reschedule tracker"))
must.Len(t, 1, alloc.RescheduleTracker.Events)
replacementAllocID = alloc.ID
break
}
}

// -----------------------------------
// Replacement alloc fails, second reschedule but it blocks because of delay

alloc, err = h.State.AllocByID(ws, replacementAllocID)
must.NoError(t, err)
alloc.ClientStatus = structs.AllocClientStatusFailed
alloc.TaskStates = map[string]*structs.TaskState{tgName: {State: "dead",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now}}
must.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup,
h.NextIndex(), []*structs.Allocation{alloc}))

// Create a mock evaluation for the allocation failure
eval.ID = uuid.Generate()
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup,
h.NextIndex(), []*structs.Evaluation{eval}))

// Process the evaluation and assert we have a plan
must.NoError(t, h.Process(NewServiceScheduler, eval))
must.Len(t, 3, h.Plans)
must.MapLen(t, 0, h.Plans[2].NodeUpdate) // stop
must.MapLen(t, 1, h.Plans[2].NodeAllocation) // place

// Lookup the allocations by JobID and verify no new allocs created
out, err = h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
must.NoError(t, err)
must.Len(t, 2, out)

// Verify follow-up eval was created for the failed alloc
// and write the eval to the state store
alloc, err = h.State.AllocByID(ws, replacementAllocID)
must.NoError(t, err)
must.NotEq(t, "", alloc.FollowupEvalID)
must.Len(t, 2, h.CreateEvals)
followupEval = h.CreateEvals[1]
must.Eq(t, structs.EvalStatusPending, followupEval.Status)
must.Eq(t, now.Add(delayDuration), followupEval.WaitUntil)
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup,
h.NextIndex(), []*structs.Evaluation{followupEval}))

// "use up" resources on the node so the follow-up will block
node.NodeResources.Memory.MemoryMB = 200
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))

// Process the follow-up eval, which results in a stop but not a replacement
must.NoError(t, h.Process(NewServiceScheduler, followupEval))
must.Len(t, 4, h.Plans)
must.MapLen(t, 1, h.Plans[3].NodeUpdate) // stop
must.MapLen(t, 0, h.Plans[3].NodeAllocation) // place

out, err = h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
must.NoError(t, err)
must.Len(t, 2, out)

// Verify blocked eval was created and write it to state
must.Len(t, 3, h.CreateEvals)
blockedEval := h.CreateEvals[2]
must.Eq(t, structs.EvalTriggerQueuedAllocs, blockedEval.TriggeredBy)
must.Eq(t, structs.EvalStatusBlocked, blockedEval.Status)
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup,
h.NextIndex(), []*structs.Evaluation{blockedEval}))

// "free up" resources on the node so the blocked eval will succeed
node.NodeResources.Memory.MemoryMB = 8000
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))

// if we process the blocked eval, the task state of the replacement alloc
// will not be old enough to be rescheduled yet and we'll get a no-op
must.NoError(t, h.Process(NewServiceScheduler, blockedEval))
must.Len(t, 4, h.Plans, must.Sprint("expected no new plan"))

// bypass the timer check by setting the alloc's follow-up eval ID to be the
// blocked eval
alloc, err = h.State.AllocByID(ws, replacementAllocID)
must.NoError(t, err)
alloc = alloc.Copy()
alloc.FollowupEvalID = blockedEval.ID
must.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup,
h.NextIndex(), []*structs.Allocation{alloc}))

must.NoError(t, h.Process(NewServiceScheduler, blockedEval))
must.Len(t, 5, h.Plans)
must.MapLen(t, 1, h.Plans[4].NodeUpdate) // stop
must.MapLen(t, 1, h.Plans[4].NodeAllocation) // place

out, err = h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
must.NoError(t, err)
must.Len(t, 3, out)

for _, alloc := range out {
if alloc.ID != failedAllocID && alloc.ID != replacementAllocID {
must.NotNil(t, alloc.RescheduleTracker,
must.Sprint("replacement alloc should have reschedule tracker"))
must.Len(t, 2, alloc.RescheduleTracker.Events)
}
}
}

// Tests that old reschedule attempts are pruned
func TestServiceSched_Reschedule_PruneEvents(t *testing.T) {
ci.Parallel(t)
Expand Down
9 changes: 9 additions & 0 deletions scheduler/reconcile_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ func (a allocSet) filterByRescheduleable(isBatch, isDisconnecting bool, now time
isUntainted, ignore := shouldFilter(alloc, isBatch)
if isUntainted && !isDisconnecting {
untainted[alloc.ID] = alloc
continue // these allocs can never be rescheduled, so skip checking
}

if ignore {
Expand Down Expand Up @@ -447,6 +448,7 @@ func (a allocSet) filterByRescheduleable(isBatch, isDisconnecting bool, now time
// If desired state is stop - ignore
//
// Filtering logic for service jobs:
// Never untainted
// If desired state is stop/evict - ignore
// If client status is complete/lost - ignore
func shouldFilter(alloc *structs.Allocation, isBatch bool) (untainted, ignore bool) {
Expand All @@ -460,6 +462,9 @@ func shouldFilter(alloc *structs.Allocation, isBatch bool) (untainted, ignore bo
if alloc.RanSuccessfully() {
return true, false
}
if alloc.LastRescheduleFailed() {
return false, false
}
return false, true
case structs.AllocDesiredStatusEvict:
return false, true
Expand All @@ -476,6 +481,10 @@ func shouldFilter(alloc *structs.Allocation, isBatch bool) (untainted, ignore bo
// Handle service jobs
switch alloc.DesiredStatus {
case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict:
if alloc.LastRescheduleFailed() {
return false, false
}

return false, true
}

Expand Down
Loading

0 comments on commit fa70267

Please sign in to comment.