-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix multiple bugs with progress deadline handling #4842
Merged
Merged
Changes from 2 commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,15 +6,13 @@ import ( | |
"sync" | ||
"time" | ||
|
||
"golang.org/x/time/rate" | ||
|
||
log "github.com/hashicorp/go-hclog" | ||
memdb "github.com/hashicorp/go-memdb" | ||
|
||
"github.com/hashicorp/nomad/helper" | ||
"github.com/hashicorp/nomad/helper/uuid" | ||
"github.com/hashicorp/nomad/nomad/state" | ||
"github.com/hashicorp/nomad/nomad/structs" | ||
"golang.org/x/time/rate" | ||
) | ||
|
||
const ( | ||
|
@@ -363,7 +361,7 @@ func (w *deploymentWatcher) watch() { | |
// handle the case that the deployment has already progressed and we are now | ||
// just starting to watch it. This must likely would occur if there was a | ||
// leader transition and we are now starting our watcher. | ||
currentDeadline := getDeploymentProgressCutoff(w.getDeployment()) | ||
currentDeadline := w.getDeploymentProgressCutoff(w.getDeployment()) | ||
var deadlineTimer *time.Timer | ||
if currentDeadline.IsZero() { | ||
deadlineTimer = time.NewTimer(0) | ||
|
@@ -405,7 +403,7 @@ FAIL: | |
case <-w.deploymentUpdateCh: | ||
// Get the updated deployment and check if we should change the | ||
// deadline timer | ||
next := getDeploymentProgressCutoff(w.getDeployment()) | ||
next := w.getDeploymentProgressCutoff(w.getDeployment()) | ||
if !next.Equal(currentDeadline) { | ||
prevDeadlineZero := currentDeadline.IsZero() | ||
currentDeadline = next | ||
|
@@ -419,7 +417,12 @@ FAIL: | |
default: | ||
} | ||
} | ||
deadlineTimer.Reset(next.Sub(time.Now())) | ||
|
||
// If the next deadline is zero, we should not reset the timer | ||
// as we aren't tracking towards a progress deadline yet. | ||
if !next.IsZero() { | ||
deadlineTimer.Reset(next.Sub(time.Now())) | ||
} | ||
} | ||
|
||
case updates = <-w.getAllocsCh(allocIndex): | ||
|
@@ -506,7 +509,7 @@ func (w *deploymentWatcher) handleAllocUpdate(allocs []*structs.AllocListStub) ( | |
var res allocUpdateResult | ||
|
||
// Get the latest evaluation index | ||
latestEval, blocked, err := w.jobEvalStatus() | ||
latestEval, err := w.jobEvalStatus() | ||
if err != nil { | ||
if err == context.Canceled || w.ctx.Err() == context.Canceled { | ||
return res, err | ||
|
@@ -534,7 +537,7 @@ func (w *deploymentWatcher) handleAllocUpdate(allocs []*structs.AllocListStub) ( | |
} | ||
|
||
// We need to create an eval so the job can progress. | ||
if !blocked && alloc.DeploymentStatus.IsHealthy() && alloc.DeploymentStatus.ModifyIndex > latestEval { | ||
if alloc.DeploymentStatus.IsHealthy() && alloc.DeploymentStatus.ModifyIndex > latestEval { | ||
res.createEval = true | ||
} | ||
|
||
|
@@ -611,16 +614,72 @@ func (w *deploymentWatcher) shouldFail() (fail, rollback bool, err error) { | |
|
||
// getDeploymentProgressCutoff returns the progress cutoff for the given | ||
// deployment | ||
func getDeploymentProgressCutoff(d *structs.Deployment) time.Time { | ||
func (w *deploymentWatcher) getDeploymentProgressCutoff(d *structs.Deployment) time.Time { | ||
var next time.Time | ||
for _, state := range d.TaskGroups { | ||
doneTGs := w.doneGroups(d) | ||
for name, state := range d.TaskGroups { | ||
// This task group is done so we don't have to concern ourselves with | ||
// its progress deadline. | ||
if done, ok := doneTGs[name]; ok && done { | ||
continue | ||
} | ||
|
||
if state.RequireProgressBy.IsZero() { | ||
continue | ||
} | ||
|
||
if next.IsZero() || state.RequireProgressBy.Before(next) { | ||
next = state.RequireProgressBy | ||
} | ||
} | ||
return next | ||
} | ||
|
||
// doneGroups returns a map of task group to whether the deployment appears to | ||
// be done for the group. A true value doesn't mean no more action will be taken | ||
// in the life time of the deployment because there could always be node | ||
// failures, or rescheduling events. | ||
func (w *deploymentWatcher) doneGroups(d *structs.Deployment) map[string]bool { | ||
if d == nil { | ||
return nil | ||
} | ||
|
||
// Collect the allocations by the task group | ||
snap, err := w.state.Snapshot() | ||
if err != nil { | ||
return nil | ||
} | ||
|
||
allocs, err := snap.AllocsByDeployment(nil, d.ID) | ||
if err != nil { | ||
return nil | ||
} | ||
|
||
// Go through the allocs and count up how many healthy allocs we have | ||
healthy := make(map[string]int, len(d.TaskGroups)) | ||
for _, a := range allocs { | ||
if a.TerminalStatus() || !a.DeploymentStatus.IsHealthy() { | ||
continue | ||
} | ||
healthy[a.TaskGroup]++ | ||
} | ||
|
||
// Go through each group and check if it done | ||
groups := make(map[string]bool, len(d.TaskGroups)) | ||
for name, state := range d.TaskGroups { | ||
// Requires promotion | ||
if state.DesiredCanaries != 0 && !state.Promoted { | ||
groups[name] = false | ||
continue | ||
} | ||
|
||
// Check we have enough healthy currently running allocations | ||
groups[name] = healthy[name] >= state.DesiredTotal | ||
} | ||
|
||
return groups | ||
} | ||
|
||
// latestStableJob returns the latest stable job. It may be nil if none exist | ||
func (w *deploymentWatcher) latestStableJob() (*structs.Job, error) { | ||
snap, err := w.state.Snapshot() | ||
|
@@ -779,37 +838,29 @@ func (w *deploymentWatcher) getAllocsImpl(ws memdb.WatchSet, state *state.StateS | |
return stubs, maxIndex, nil | ||
} | ||
|
||
// jobEvalStatus returns the eval status for a job. It returns the index of the | ||
// last evaluation created for the job, as well as whether there exists a | ||
// blocked evaluation for the job. The index is used to determine if an | ||
// allocation update requires an evaluation to be triggered. If there already is | ||
// a blocked evaluations, no eval should be created. | ||
func (w *deploymentWatcher) jobEvalStatus() (latestIndex uint64, blocked bool, err error) { | ||
// jobEvalStatus returns the latest eval index for a job. The index is used to | ||
// determine if an allocation update requires an evaluation to be triggered. | ||
func (w *deploymentWatcher) jobEvalStatus() (latestIndex uint64, err error) { | ||
if err := w.queryLimiter.Wait(w.ctx); err != nil { | ||
return 0, false, err | ||
return 0, err | ||
} | ||
|
||
snap, err := w.state.Snapshot() | ||
if err != nil { | ||
return 0, false, err | ||
return 0, err | ||
} | ||
|
||
evals, err := snap.EvalsByJob(nil, w.j.Namespace, w.j.ID) | ||
if err != nil { | ||
return 0, false, err | ||
} | ||
|
||
if len(evals) == 0 { | ||
index, err := snap.Index("evals") | ||
return index, false, err | ||
return 0, err | ||
} | ||
|
||
var max uint64 | ||
for _, eval := range evals { | ||
// If we have a blocked eval, then we do not care what the index is | ||
// since we will not need to make a new eval. | ||
if eval.ShouldBlock() { | ||
return 0, true, nil | ||
// A cancelled eval never impacts what the scheduler has saw, so do not | ||
// use it's indexes. | ||
if eval.Status == structs.EvalStatusCancelled { | ||
continue | ||
} | ||
|
||
// Prefer using the snapshot index. Otherwise use the create index | ||
|
@@ -820,5 +871,10 @@ func (w *deploymentWatcher) jobEvalStatus() (latestIndex uint64, blocked bool, e | |
} | ||
} | ||
|
||
return max, false, nil | ||
if max == uint64(0) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This index returns the max eval index across all jobs, but we care about a single job. We could miss an update if evals were gced before this ran. This should return zero instead. |
||
index, err := snap.Index("evals") | ||
return index, err | ||
} | ||
|
||
return max, nil | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this clarify what problem this !zero check is actually fixing. i'm concerned that this means under some conditions the deadlinetimer would never get reset