Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix multiple bugs with progress deadline handling #4842

Merged
merged 3 commits into from
Nov 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@ import (
"strings"
"time"

"github.com/NYTimes/gziphandler"
assetfs "github.com/elazarl/go-bindata-assetfs"
log "github.com/hashicorp/go-hclog"

"github.com/NYTimes/gziphandler"
"github.com/hashicorp/nomad/helper/tlsutil"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/rs/cors"
Expand Down
114 changes: 88 additions & 26 deletions nomad/deploymentwatcher/deployment_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@ import (
"sync"
"time"

"golang.org/x/time/rate"

log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"

"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"golang.org/x/time/rate"
)

const (
Expand Down Expand Up @@ -363,7 +361,7 @@ func (w *deploymentWatcher) watch() {
// handle the case that the deployment has already progressed and we are now
// just starting to watch it. This must likely would occur if there was a
// leader transition and we are now starting our watcher.
currentDeadline := getDeploymentProgressCutoff(w.getDeployment())
currentDeadline := w.getDeploymentProgressCutoff(w.getDeployment())
var deadlineTimer *time.Timer
if currentDeadline.IsZero() {
deadlineTimer = time.NewTimer(0)
Expand Down Expand Up @@ -405,7 +403,7 @@ FAIL:
case <-w.deploymentUpdateCh:
// Get the updated deployment and check if we should change the
// deadline timer
next := getDeploymentProgressCutoff(w.getDeployment())
next := w.getDeploymentProgressCutoff(w.getDeployment())
if !next.Equal(currentDeadline) {
prevDeadlineZero := currentDeadline.IsZero()
currentDeadline = next
Expand All @@ -419,7 +417,17 @@ FAIL:
default:
}
}
deadlineTimer.Reset(next.Sub(time.Now()))

// If the next deadline is zero, we should not reset the timer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this clarify what problem this !zero check is actually fixing. i'm concerned that this means under some conditions the deadlinetimer would never get reset

// as we aren't tracking towards a progress deadline yet. This
// can happen if you have multiple task groups with progress
// deadlines and one of the task groups hasn't made any
// placements. As soon as the other task group finishes its
// rollout, the next progress deadline becomes zero, so we want
// to avoid resetting, causing a deployment failure.
if !next.IsZero() {
deadlineTimer.Reset(next.Sub(time.Now()))
}
}

case updates = <-w.getAllocsCh(allocIndex):
Expand Down Expand Up @@ -506,7 +514,7 @@ func (w *deploymentWatcher) handleAllocUpdate(allocs []*structs.AllocListStub) (
var res allocUpdateResult

// Get the latest evaluation index
latestEval, blocked, err := w.jobEvalStatus()
latestEval, err := w.jobEvalStatus()
if err != nil {
if err == context.Canceled || w.ctx.Err() == context.Canceled {
return res, err
Expand Down Expand Up @@ -534,7 +542,7 @@ func (w *deploymentWatcher) handleAllocUpdate(allocs []*structs.AllocListStub) (
}

// We need to create an eval so the job can progress.
if !blocked && alloc.DeploymentStatus.IsHealthy() && alloc.DeploymentStatus.ModifyIndex > latestEval {
if alloc.DeploymentStatus.IsHealthy() && alloc.DeploymentStatus.ModifyIndex > latestEval {
res.createEval = true
}

Expand Down Expand Up @@ -611,16 +619,72 @@ func (w *deploymentWatcher) shouldFail() (fail, rollback bool, err error) {

// getDeploymentProgressCutoff returns the progress cutoff for the given
// deployment
func getDeploymentProgressCutoff(d *structs.Deployment) time.Time {
func (w *deploymentWatcher) getDeploymentProgressCutoff(d *structs.Deployment) time.Time {
var next time.Time
for _, state := range d.TaskGroups {
doneTGs := w.doneGroups(d)
for name, state := range d.TaskGroups {
// This task group is done so we don't have to concern ourselves with
// its progress deadline.
if done, ok := doneTGs[name]; ok && done {
continue
}

if state.RequireProgressBy.IsZero() {
continue
}

if next.IsZero() || state.RequireProgressBy.Before(next) {
next = state.RequireProgressBy
}
}
return next
}

// doneGroups returns a map of task group to whether the deployment appears to
// be done for the group. A true value doesn't mean no more action will be taken
// in the life time of the deployment because there could always be node
// failures, or rescheduling events.
func (w *deploymentWatcher) doneGroups(d *structs.Deployment) map[string]bool {
if d == nil {
return nil
}

// Collect the allocations by the task group
snap, err := w.state.Snapshot()
if err != nil {
return nil
}

allocs, err := snap.AllocsByDeployment(nil, d.ID)
if err != nil {
return nil
}

// Go through the allocs and count up how many healthy allocs we have
healthy := make(map[string]int, len(d.TaskGroups))
for _, a := range allocs {
if a.TerminalStatus() || !a.DeploymentStatus.IsHealthy() {
continue
}
healthy[a.TaskGroup]++
}

// Go through each group and check if it done
groups := make(map[string]bool, len(d.TaskGroups))
for name, state := range d.TaskGroups {
// Requires promotion
if state.DesiredCanaries != 0 && !state.Promoted {
groups[name] = false
continue
}

// Check we have enough healthy currently running allocations
groups[name] = healthy[name] >= state.DesiredTotal
}

return groups
}

// latestStableJob returns the latest stable job. It may be nil if none exist
func (w *deploymentWatcher) latestStableJob() (*structs.Job, error) {
snap, err := w.state.Snapshot()
Expand Down Expand Up @@ -779,37 +843,35 @@ func (w *deploymentWatcher) getAllocsImpl(ws memdb.WatchSet, state *state.StateS
return stubs, maxIndex, nil
}

// jobEvalStatus returns the eval status for a job. It returns the index of the
// last evaluation created for the job, as well as whether there exists a
// blocked evaluation for the job. The index is used to determine if an
// allocation update requires an evaluation to be triggered. If there already is
// a blocked evaluations, no eval should be created.
func (w *deploymentWatcher) jobEvalStatus() (latestIndex uint64, blocked bool, err error) {
// jobEvalStatus returns the latest eval index for a job. The index is used to
// determine if an allocation update requires an evaluation to be triggered.
func (w *deploymentWatcher) jobEvalStatus() (latestIndex uint64, err error) {
if err := w.queryLimiter.Wait(w.ctx); err != nil {
return 0, false, err
return 0, err
}

snap, err := w.state.Snapshot()
if err != nil {
return 0, false, err
return 0, err
}

evals, err := snap.EvalsByJob(nil, w.j.Namespace, w.j.ID)
if err != nil {
return 0, false, err
return 0, err
}

// If there are no evals for the job, return zero, since we want any
// allocation change to trigger an evaluation.
if len(evals) == 0 {
index, err := snap.Index("evals")
return index, false, err
return 0, nil
}

var max uint64
for _, eval := range evals {
// If we have a blocked eval, then we do not care what the index is
// since we will not need to make a new eval.
if eval.ShouldBlock() {
return 0, true, nil
// A cancelled eval never impacts what the scheduler has saw, so do not
// use it's indexes.
if eval.Status == structs.EvalStatusCancelled {
continue
}

// Prefer using the snapshot index. Otherwise use the create index
Expand All @@ -820,5 +882,5 @@ func (w *deploymentWatcher) jobEvalStatus() (latestIndex uint64, blocked bool, e
}
}

return max, false, nil
return max, nil
}
Loading