Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler/reconcile: set FollowupEvalID on lost stop_after_client_disconnect #8105

Merged
merged 6 commits into from
Jun 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nomad/plan_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ func normalizeStoppedAlloc(stoppedAlloc *structs.Allocation, now int64) *structs
DesiredDescription: stoppedAlloc.DesiredDescription,
ClientStatus: stoppedAlloc.ClientStatus,
ModifyTime: now,
FollowupEvalID: stoppedAlloc.FollowupEvalID,
}
}

Expand Down
3 changes: 3 additions & 0 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5379,6 +5379,9 @@ func (s *StateSnapshot) DenormalizeAllocationDiffSlice(allocDiffs []*structs.All
if allocDiff.ClientStatus != "" {
allocCopy.ClientStatus = allocDiff.ClientStatus
}
if allocDiff.FollowupEvalID != "" {
allocCopy.FollowupEvalID = allocDiff.FollowupEvalID
}
}
if allocDiff.ModifyTime != 0 {
allocCopy.ModifyTime = allocDiff.ModifyTime
Expand Down
7 changes: 6 additions & 1 deletion nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9460,7 +9460,7 @@ type Plan struct {

// AppendStoppedAlloc marks an allocation to be stopped. The clientStatus of the
// allocation may be optionally set by passing in a non-empty value.
func (p *Plan) AppendStoppedAlloc(alloc *Allocation, desiredDesc, clientStatus string) {
func (p *Plan) AppendStoppedAlloc(alloc *Allocation, desiredDesc, clientStatus, followupEvalID string) {
newAlloc := new(Allocation)
*newAlloc = *alloc

Expand All @@ -9485,6 +9485,10 @@ func (p *Plan) AppendStoppedAlloc(alloc *Allocation, desiredDesc, clientStatus s

newAlloc.AppendState(AllocStateFieldClientStatus, clientStatus)

if followupEvalID != "" {
newAlloc.FollowupEvalID = followupEvalID
}

node := alloc.NodeID
existing := p.NodeUpdate[node]
p.NodeUpdate[node] = append(existing, newAlloc)
Expand Down Expand Up @@ -9559,6 +9563,7 @@ func (p *Plan) NormalizeAllocations() {
ID: alloc.ID,
DesiredDescription: alloc.DesiredDescription,
ClientStatus: alloc.ClientStatus,
FollowupEvalID: alloc.FollowupEvalID,
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3563,7 +3563,7 @@ func TestPlan_NormalizeAllocations(t *testing.T) {
}
stoppedAlloc := MockAlloc()
desiredDesc := "Desired desc"
plan.AppendStoppedAlloc(stoppedAlloc, desiredDesc, AllocClientStatusLost)
plan.AppendStoppedAlloc(stoppedAlloc, desiredDesc, AllocClientStatusLost, "followup-eval-id")
preemptedAlloc := MockAlloc()
preemptingAllocID := uuid.Generate()
plan.AppendPreemptedAlloc(preemptedAlloc, preemptingAllocID)
Expand All @@ -3575,6 +3575,7 @@ func TestPlan_NormalizeAllocations(t *testing.T) {
ID: stoppedAlloc.ID,
DesiredDescription: desiredDesc,
ClientStatus: AllocClientStatusLost,
FollowupEvalID: "followup-eval-id",
}
assert.Equal(t, expectedStoppedAlloc, actualStoppedAlloc)
actualPreemptedAlloc := plan.NodePreemptions[preemptedAlloc.NodeID][0]
Expand All @@ -3593,7 +3594,7 @@ func TestPlan_AppendStoppedAllocAppendsAllocWithUpdatedAttrs(t *testing.T) {
alloc := MockAlloc()
desiredDesc := "Desired desc"

plan.AppendStoppedAlloc(alloc, desiredDesc, AllocClientStatusLost)
plan.AppendStoppedAlloc(alloc, desiredDesc, AllocClientStatusLost, "")

expectedAlloc := new(Allocation)
*expectedAlloc = *alloc
Expand Down
2 changes: 1 addition & 1 deletion nomad/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ func TestWorker_SubmitPlanNormalizedAllocations(t *testing.T) {
NodePreemptions: make(map[string][]*structs.Allocation),
}
desiredDescription := "desired desc"
plan.AppendStoppedAlloc(stoppedAlloc, desiredDescription, structs.AllocClientStatusLost)
plan.AppendStoppedAlloc(stoppedAlloc, desiredDescription, structs.AllocClientStatusLost, "")
preemptingAllocID := uuid.Generate()
plan.AppendPreemptedAlloc(preemptedAlloc, preemptingAllocID)

Expand Down
4 changes: 2 additions & 2 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func (s *GenericScheduler) computeJobAllocs() error {

// Handle the stop
for _, stop := range results.stop {
s.plan.AppendStoppedAlloc(stop.alloc, stop.statusDescription, stop.clientStatus)
s.plan.AppendStoppedAlloc(stop.alloc, stop.statusDescription, stop.clientStatus, stop.followupEvalID)
}

// Handle the in-place updates
Expand Down Expand Up @@ -476,7 +476,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
stopPrevAlloc, stopPrevAllocDesc := missing.StopPreviousAlloc()
prevAllocation := missing.PreviousAllocation()
if stopPrevAlloc {
s.plan.AppendStoppedAlloc(prevAllocation, stopPrevAllocDesc, "")
s.plan.AppendStoppedAlloc(prevAllocation, stopPrevAllocDesc, "", "")
}

// Compute penalty nodes for rescheduled allocs
Expand Down
69 changes: 40 additions & 29 deletions scheduler/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,19 @@ func (a *allocReconciler) markStop(allocs allocSet, clientStatus, statusDescript
}
}

// markDelayed does markStop, but optionally includes a FollowupEvalID so that we can update
// the stopped alloc with its delayed rescheduling evalID
func (a *allocReconciler) markDelayed(allocs allocSet, clientStatus, statusDescription string, followupEvals map[string]string) {
for _, alloc := range allocs {
a.result.stop = append(a.result.stop, allocStopResult{
alloc: alloc,
clientStatus: clientStatus,
statusDescription: statusDescription,
followupEvalID: followupEvals[alloc.ID],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noticed that handleDelayedReschedulesImpl may return nil. Does function need to handle nil followupEvals? Also, maybe markDelayedStop is a better descriptive name for this function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I do need a nil check there. I re(de?)factored handleDelayedReschedulesImpl, leaving just the two functions designed to be called: handleDelayedReschedules and handleDelayedLost in 314fffd.

})
}
}

// computeGroup reconciles state for a particular task group. It returns whether
// the deployment it is for is complete with regards to the task group.
func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
Expand Down Expand Up @@ -355,7 +368,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {

// Find delays for any lost allocs that have stop_after_client_disconnect
lostLater := lost.delayByStopAfterClientDisconnect()
a.handleDelayedLost(lostLater, all, tg.Name)
lostLaterEvals := a.handleDelayedLost(lostLater, all, tg.Name)

// Create batched follow up evaluations for allocations that are
// reschedulable later and mark the allocations for in place updating
Expand All @@ -368,7 +381,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
// Stop any unneeded allocations and update the untainted set to not
// included stopped allocations.
canaryState := dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted
stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, canaryState)
stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, canaryState, lostLaterEvals)
desiredChanges.Stop += uint64(len(stop))
untainted = untainted.difference(stop)

Expand Down Expand Up @@ -705,13 +718,13 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
// the group definition, the set of allocations in various states and whether we
// are canarying.
func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *allocNameIndex,
untainted, migrate, lost, canaries allocSet, canaryState bool) allocSet {
untainted, migrate, lost, canaries allocSet, canaryState bool, followupEvals map[string]string) allocSet {

// Mark all lost allocations for stop. Previous allocation doesn't matter
// here since it is on a lost node
var stop allocSet
stop = stop.union(lost)
a.markStop(lost, structs.AllocClientStatusLost, allocLost)
a.markDelayed(lost, structs.AllocClientStatusLost, allocLost, followupEvals)

// If we are still deploying or creating canaries, don't stop them
if canaryState {
Expand Down Expand Up @@ -836,22 +849,33 @@ func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted all
return
}

// handleDelayedReschedules creates batched followup evaluations with the WaitUntil field set
// for allocations that are eligible to be rescheduled later
// handleDelayedReschedules creates batched followup evaluations with the WaitUntil field
// set for allocations that are eligible to be rescheduled later, and marks the alloc with
// the followupEvalID
func (a *allocReconciler) handleDelayedReschedules(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) {
a.handleDelayedReschedulesImpl(rescheduleLater, all, tgName, true)
}
// followupEvals are created in the same way as for delayed lost allocs
allocIDToFollowupEvalID := a.handleDelayedLost(rescheduleLater, all, tgName)

// Initialize the annotations
if len(allocIDToFollowupEvalID) != 0 && a.result.attributeUpdates == nil {
a.result.attributeUpdates = make(map[string]*structs.Allocation)
}

// handleDelayedLost creates batched followup evaluations with the WaitUntil field set for lost allocations
func (a *allocReconciler) handleDelayedLost(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) {
a.handleDelayedReschedulesImpl(rescheduleLater, all, tgName, false)
// Create updates that will be applied to the allocs to mark the FollowupEvalID
for allocID, evalID := range allocIDToFollowupEvalID {
existingAlloc := all[allocID]
updatedAlloc := existingAlloc.Copy()
updatedAlloc.FollowupEvalID = evalID
a.result.attributeUpdates[updatedAlloc.ID] = updatedAlloc
}
}

// handleDelayedReschedulesImpl creates batched followup evaluations with the WaitUntil field set
func (a *allocReconciler) handleDelayedReschedulesImpl(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string,
createUpdates bool) {
// handleDelayedLost creates batched followup evaluations with the WaitUntil field set for
// lost allocations. followupEvals are appended to a.result as a side effect, we return a
// map of alloc IDs to their followupEval IDs
func (a *allocReconciler) handleDelayedLost(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) map[string]string {
if len(rescheduleLater) == 0 {
return
return map[string]string{}
}

// Sort by time
Expand Down Expand Up @@ -904,18 +928,5 @@ func (a *allocReconciler) handleDelayedReschedulesImpl(rescheduleLater []*delaye

a.result.desiredFollowupEvals[tgName] = evals

// Initialize the annotations
if len(allocIDToFollowupEvalID) != 0 && a.result.attributeUpdates == nil {
a.result.attributeUpdates = make(map[string]*structs.Allocation)
}

// Create in-place updates for every alloc ID that needs to be updated with its follow up eval ID
if createUpdates {
for allocID, evalID := range allocIDToFollowupEvalID {
existingAlloc := all[allocID]
updatedAlloc := existingAlloc.Copy()
updatedAlloc.FollowupEvalID = evalID
a.result.attributeUpdates[updatedAlloc.ID] = updatedAlloc
}
}
return allocIDToFollowupEvalID
}
1 change: 1 addition & 0 deletions scheduler/reconcile_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type allocStopResult struct {
alloc *structs.Allocation
clientStatus string
statusDescription string
followupEvalID string
}

// allocPlaceResult contains the information required to place a single
Expand Down
6 changes: 3 additions & 3 deletions scheduler/system_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,18 +212,18 @@ func (s *SystemScheduler) computeJobAllocs() error {

// Add all the allocs to stop
for _, e := range diff.stop {
s.plan.AppendStoppedAlloc(e.Alloc, allocNotNeeded, "")
s.plan.AppendStoppedAlloc(e.Alloc, allocNotNeeded, "", "")
}

// Add all the allocs to migrate
for _, e := range diff.migrate {
s.plan.AppendStoppedAlloc(e.Alloc, allocNodeTainted, "")
s.plan.AppendStoppedAlloc(e.Alloc, allocNodeTainted, "", "")
}

// Lost allocations should be transitioned to desired status stop and client
// status lost.
for _, e := range diff.lost {
s.plan.AppendStoppedAlloc(e.Alloc, allocLost, structs.AllocClientStatusLost)
s.plan.AppendStoppedAlloc(e.Alloc, allocLost, structs.AllocClientStatusLost, "")
}

// Attempt to do the upgrades in place
Expand Down
8 changes: 4 additions & 4 deletions scheduler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job,
// the current allocation is discounted when checking for feasibility.
// Otherwise we would be trying to fit the tasks current resources and
// updated resources. After select is called we can remove the evict.
ctx.Plan().AppendStoppedAlloc(update.Alloc, allocInPlace, "")
ctx.Plan().AppendStoppedAlloc(update.Alloc, allocInPlace, "", "")

// Attempt to match the task group
option := stack.Select(update.TaskGroup, nil) // This select only looks at one node so we don't pass selectOptions
Expand Down Expand Up @@ -670,7 +670,7 @@ func evictAndPlace(ctx Context, diff *diffResult, allocs []allocTuple, desc stri
n := len(allocs)
for i := 0; i < n && i < *limit; i++ {
a := allocs[i]
ctx.Plan().AppendStoppedAlloc(a.Alloc, desc, "")
ctx.Plan().AppendStoppedAlloc(a.Alloc, desc, "", "")
diff.place = append(diff.place, a)
}
if n <= *limit {
Expand Down Expand Up @@ -831,7 +831,7 @@ func updateNonTerminalAllocsToLost(plan *structs.Plan, tainted map[string]*struc
alloc.DesiredStatus == structs.AllocDesiredStatusEvict) &&
(alloc.ClientStatus == structs.AllocClientStatusRunning ||
alloc.ClientStatus == structs.AllocClientStatusPending) {
plan.AppendStoppedAlloc(alloc, allocLost, structs.AllocClientStatusLost)
plan.AppendStoppedAlloc(alloc, allocLost, structs.AllocClientStatusLost, "")
}
}
}
Expand Down Expand Up @@ -881,7 +881,7 @@ func genericAllocUpdateFn(ctx Context, stack Stack, evalID string) allocUpdateTy
// the current allocation is discounted when checking for feasibility.
// Otherwise we would be trying to fit the tasks current resources and
// updated resources. After select is called we can remove the evict.
ctx.Plan().AppendStoppedAlloc(existing, allocInPlace, "")
ctx.Plan().AppendStoppedAlloc(existing, allocInPlace, "", "")

// Attempt to match the task group
option := stack.Select(newTG, nil) // This select only looks at one node so we don't pass selectOptions
Expand Down