Skip to content

Commit

Permalink
Merge pull request #4842 from hashicorp/b-deployment-progress-deadline
Browse files Browse the repository at this point in the history
 Fix multiple bugs with progress deadline handling
  • Loading branch information
dadgar authored Nov 8, 2018
2 parents b06f674 + ad15649 commit 08b75d4
Show file tree
Hide file tree
Showing 5 changed files with 382 additions and 196 deletions.
3 changes: 1 addition & 2 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@ import (
"strings"
"time"

"github.com/NYTimes/gziphandler"
assetfs "github.com/elazarl/go-bindata-assetfs"
log "github.com/hashicorp/go-hclog"

"github.com/NYTimes/gziphandler"
"github.com/hashicorp/nomad/helper/tlsutil"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/rs/cors"
Expand Down
114 changes: 88 additions & 26 deletions nomad/deploymentwatcher/deployment_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@ import (
"sync"
"time"

"golang.org/x/time/rate"

log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"

"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"golang.org/x/time/rate"
)

const (
Expand Down Expand Up @@ -363,7 +361,7 @@ func (w *deploymentWatcher) watch() {
// handle the case that the deployment has already progressed and we are now
// just starting to watch it. This must likely would occur if there was a
// leader transition and we are now starting our watcher.
currentDeadline := getDeploymentProgressCutoff(w.getDeployment())
currentDeadline := w.getDeploymentProgressCutoff(w.getDeployment())
var deadlineTimer *time.Timer
if currentDeadline.IsZero() {
deadlineTimer = time.NewTimer(0)
Expand Down Expand Up @@ -405,7 +403,7 @@ FAIL:
case <-w.deploymentUpdateCh:
// Get the updated deployment and check if we should change the
// deadline timer
next := getDeploymentProgressCutoff(w.getDeployment())
next := w.getDeploymentProgressCutoff(w.getDeployment())
if !next.Equal(currentDeadline) {
prevDeadlineZero := currentDeadline.IsZero()
currentDeadline = next
Expand All @@ -419,7 +417,17 @@ FAIL:
default:
}
}
deadlineTimer.Reset(next.Sub(time.Now()))

// If the next deadline is zero, we should not reset the timer
// as we aren't tracking towards a progress deadline yet. This
// can happen if you have multiple task groups with progress
// deadlines and one of the task groups hasn't made any
// placements. As soon as the other task group finishes its
// rollout, the next progress deadline becomes zero, so we want
// to avoid resetting, causing a deployment failure.
if !next.IsZero() {
deadlineTimer.Reset(next.Sub(time.Now()))
}
}

case updates = <-w.getAllocsCh(allocIndex):
Expand Down Expand Up @@ -506,7 +514,7 @@ func (w *deploymentWatcher) handleAllocUpdate(allocs []*structs.AllocListStub) (
var res allocUpdateResult

// Get the latest evaluation index
latestEval, blocked, err := w.jobEvalStatus()
latestEval, err := w.jobEvalStatus()
if err != nil {
if err == context.Canceled || w.ctx.Err() == context.Canceled {
return res, err
Expand Down Expand Up @@ -534,7 +542,7 @@ func (w *deploymentWatcher) handleAllocUpdate(allocs []*structs.AllocListStub) (
}

// We need to create an eval so the job can progress.
if !blocked && alloc.DeploymentStatus.IsHealthy() && alloc.DeploymentStatus.ModifyIndex > latestEval {
if alloc.DeploymentStatus.IsHealthy() && alloc.DeploymentStatus.ModifyIndex > latestEval {
res.createEval = true
}

Expand Down Expand Up @@ -611,16 +619,72 @@ func (w *deploymentWatcher) shouldFail() (fail, rollback bool, err error) {

// getDeploymentProgressCutoff returns the progress cutoff for the given
// deployment
func getDeploymentProgressCutoff(d *structs.Deployment) time.Time {
func (w *deploymentWatcher) getDeploymentProgressCutoff(d *structs.Deployment) time.Time {
var next time.Time
for _, state := range d.TaskGroups {
doneTGs := w.doneGroups(d)
for name, state := range d.TaskGroups {
// This task group is done so we don't have to concern ourselves with
// its progress deadline.
if done, ok := doneTGs[name]; ok && done {
continue
}

if state.RequireProgressBy.IsZero() {
continue
}

if next.IsZero() || state.RequireProgressBy.Before(next) {
next = state.RequireProgressBy
}
}
return next
}

// doneGroups returns a map of task group to whether the deployment appears to
// be done for the group. A true value doesn't mean no more action will be taken
// in the life time of the deployment because there could always be node
// failures, or rescheduling events.
func (w *deploymentWatcher) doneGroups(d *structs.Deployment) map[string]bool {
if d == nil {
return nil
}

// Collect the allocations by the task group
snap, err := w.state.Snapshot()
if err != nil {
return nil
}

allocs, err := snap.AllocsByDeployment(nil, d.ID)
if err != nil {
return nil
}

// Go through the allocs and count up how many healthy allocs we have
healthy := make(map[string]int, len(d.TaskGroups))
for _, a := range allocs {
if a.TerminalStatus() || !a.DeploymentStatus.IsHealthy() {
continue
}
healthy[a.TaskGroup]++
}

// Go through each group and check if it done
groups := make(map[string]bool, len(d.TaskGroups))
for name, state := range d.TaskGroups {
// Requires promotion
if state.DesiredCanaries != 0 && !state.Promoted {
groups[name] = false
continue
}

// Check we have enough healthy currently running allocations
groups[name] = healthy[name] >= state.DesiredTotal
}

return groups
}

// latestStableJob returns the latest stable job. It may be nil if none exist
func (w *deploymentWatcher) latestStableJob() (*structs.Job, error) {
snap, err := w.state.Snapshot()
Expand Down Expand Up @@ -779,37 +843,35 @@ func (w *deploymentWatcher) getAllocsImpl(ws memdb.WatchSet, state *state.StateS
return stubs, maxIndex, nil
}

// jobEvalStatus returns the eval status for a job. It returns the index of the
// last evaluation created for the job, as well as whether there exists a
// blocked evaluation for the job. The index is used to determine if an
// allocation update requires an evaluation to be triggered. If there already is
// a blocked evaluations, no eval should be created.
func (w *deploymentWatcher) jobEvalStatus() (latestIndex uint64, blocked bool, err error) {
// jobEvalStatus returns the latest eval index for a job. The index is used to
// determine if an allocation update requires an evaluation to be triggered.
func (w *deploymentWatcher) jobEvalStatus() (latestIndex uint64, err error) {
if err := w.queryLimiter.Wait(w.ctx); err != nil {
return 0, false, err
return 0, err
}

snap, err := w.state.Snapshot()
if err != nil {
return 0, false, err
return 0, err
}

evals, err := snap.EvalsByJob(nil, w.j.Namespace, w.j.ID)
if err != nil {
return 0, false, err
return 0, err
}

// If there are no evals for the job, return zero, since we want any
// allocation change to trigger an evaluation.
if len(evals) == 0 {
index, err := snap.Index("evals")
return index, false, err
return 0, nil
}

var max uint64
for _, eval := range evals {
// If we have a blocked eval, then we do not care what the index is
// since we will not need to make a new eval.
if eval.ShouldBlock() {
return 0, true, nil
// A cancelled eval never impacts what the scheduler has saw, so do not
// use it's indexes.
if eval.Status == structs.EvalStatusCancelled {
continue
}

// Prefer using the snapshot index. Otherwise use the create index
Expand All @@ -820,5 +882,5 @@ func (w *deploymentWatcher) jobEvalStatus() (latestIndex uint64, blocked bool, e
}
}

return max, false, nil
return max, nil
}
Loading

0 comments on commit 08b75d4

Please sign in to comment.