diff --git a/command/agent/http.go b/command/agent/http.go index 857f5f8bcde..3f8a067a871 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -13,10 +13,9 @@ import ( "strings" "time" + "github.com/NYTimes/gziphandler" assetfs "github.com/elazarl/go-bindata-assetfs" log "github.com/hashicorp/go-hclog" - - "github.com/NYTimes/gziphandler" "github.com/hashicorp/nomad/helper/tlsutil" "github.com/hashicorp/nomad/nomad/structs" "github.com/rs/cors" diff --git a/nomad/deploymentwatcher/deployment_watcher.go b/nomad/deploymentwatcher/deployment_watcher.go index cca75e5dee4..c798cbcedb6 100644 --- a/nomad/deploymentwatcher/deployment_watcher.go +++ b/nomad/deploymentwatcher/deployment_watcher.go @@ -6,15 +6,13 @@ import ( "sync" "time" - "golang.org/x/time/rate" - log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" - "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" + "golang.org/x/time/rate" ) const ( @@ -363,7 +361,7 @@ func (w *deploymentWatcher) watch() { // handle the case that the deployment has already progressed and we are now // just starting to watch it. This must likely would occur if there was a // leader transition and we are now starting our watcher. - currentDeadline := getDeploymentProgressCutoff(w.getDeployment()) + currentDeadline := w.getDeploymentProgressCutoff(w.getDeployment()) var deadlineTimer *time.Timer if currentDeadline.IsZero() { deadlineTimer = time.NewTimer(0) @@ -405,7 +403,7 @@ FAIL: case <-w.deploymentUpdateCh: // Get the updated deployment and check if we should change the // deadline timer - next := getDeploymentProgressCutoff(w.getDeployment()) + next := w.getDeploymentProgressCutoff(w.getDeployment()) if !next.Equal(currentDeadline) { prevDeadlineZero := currentDeadline.IsZero() currentDeadline = next @@ -419,7 +417,17 @@ FAIL: default: } } - deadlineTimer.Reset(next.Sub(time.Now())) + + // If the next deadline is zero, we should not reset the timer + // as we aren't tracking towards a progress deadline yet. This + // can happen if you have multiple task groups with progress + // deadlines and one of the task groups hasn't made any + // placements. As soon as the other task group finishes its + // rollout, the next progress deadline becomes zero, so we want + // to avoid resetting, causing a deployment failure. + if !next.IsZero() { + deadlineTimer.Reset(next.Sub(time.Now())) + } } case updates = <-w.getAllocsCh(allocIndex): @@ -506,7 +514,7 @@ func (w *deploymentWatcher) handleAllocUpdate(allocs []*structs.AllocListStub) ( var res allocUpdateResult // Get the latest evaluation index - latestEval, blocked, err := w.jobEvalStatus() + latestEval, err := w.jobEvalStatus() if err != nil { if err == context.Canceled || w.ctx.Err() == context.Canceled { return res, err @@ -534,7 +542,7 @@ func (w *deploymentWatcher) handleAllocUpdate(allocs []*structs.AllocListStub) ( } // We need to create an eval so the job can progress. - if !blocked && alloc.DeploymentStatus.IsHealthy() && alloc.DeploymentStatus.ModifyIndex > latestEval { + if alloc.DeploymentStatus.IsHealthy() && alloc.DeploymentStatus.ModifyIndex > latestEval { res.createEval = true } @@ -611,9 +619,20 @@ func (w *deploymentWatcher) shouldFail() (fail, rollback bool, err error) { // getDeploymentProgressCutoff returns the progress cutoff for the given // deployment -func getDeploymentProgressCutoff(d *structs.Deployment) time.Time { +func (w *deploymentWatcher) getDeploymentProgressCutoff(d *structs.Deployment) time.Time { var next time.Time - for _, state := range d.TaskGroups { + doneTGs := w.doneGroups(d) + for name, state := range d.TaskGroups { + // This task group is done so we don't have to concern ourselves with + // its progress deadline. + if done, ok := doneTGs[name]; ok && done { + continue + } + + if state.RequireProgressBy.IsZero() { + continue + } + if next.IsZero() || state.RequireProgressBy.Before(next) { next = state.RequireProgressBy } @@ -621,6 +640,51 @@ func getDeploymentProgressCutoff(d *structs.Deployment) time.Time { return next } +// doneGroups returns a map of task group to whether the deployment appears to +// be done for the group. A true value doesn't mean no more action will be taken +// in the life time of the deployment because there could always be node +// failures, or rescheduling events. +func (w *deploymentWatcher) doneGroups(d *structs.Deployment) map[string]bool { + if d == nil { + return nil + } + + // Collect the allocations by the task group + snap, err := w.state.Snapshot() + if err != nil { + return nil + } + + allocs, err := snap.AllocsByDeployment(nil, d.ID) + if err != nil { + return nil + } + + // Go through the allocs and count up how many healthy allocs we have + healthy := make(map[string]int, len(d.TaskGroups)) + for _, a := range allocs { + if a.TerminalStatus() || !a.DeploymentStatus.IsHealthy() { + continue + } + healthy[a.TaskGroup]++ + } + + // Go through each group and check if it done + groups := make(map[string]bool, len(d.TaskGroups)) + for name, state := range d.TaskGroups { + // Requires promotion + if state.DesiredCanaries != 0 && !state.Promoted { + groups[name] = false + continue + } + + // Check we have enough healthy currently running allocations + groups[name] = healthy[name] >= state.DesiredTotal + } + + return groups +} + // latestStableJob returns the latest stable job. It may be nil if none exist func (w *deploymentWatcher) latestStableJob() (*structs.Job, error) { snap, err := w.state.Snapshot() @@ -779,37 +843,35 @@ func (w *deploymentWatcher) getAllocsImpl(ws memdb.WatchSet, state *state.StateS return stubs, maxIndex, nil } -// jobEvalStatus returns the eval status for a job. It returns the index of the -// last evaluation created for the job, as well as whether there exists a -// blocked evaluation for the job. The index is used to determine if an -// allocation update requires an evaluation to be triggered. If there already is -// a blocked evaluations, no eval should be created. -func (w *deploymentWatcher) jobEvalStatus() (latestIndex uint64, blocked bool, err error) { +// jobEvalStatus returns the latest eval index for a job. The index is used to +// determine if an allocation update requires an evaluation to be triggered. +func (w *deploymentWatcher) jobEvalStatus() (latestIndex uint64, err error) { if err := w.queryLimiter.Wait(w.ctx); err != nil { - return 0, false, err + return 0, err } snap, err := w.state.Snapshot() if err != nil { - return 0, false, err + return 0, err } evals, err := snap.EvalsByJob(nil, w.j.Namespace, w.j.ID) if err != nil { - return 0, false, err + return 0, err } + // If there are no evals for the job, return zero, since we want any + // allocation change to trigger an evaluation. if len(evals) == 0 { - index, err := snap.Index("evals") - return index, false, err + return 0, nil } var max uint64 for _, eval := range evals { - // If we have a blocked eval, then we do not care what the index is - // since we will not need to make a new eval. - if eval.ShouldBlock() { - return 0, true, nil + // A cancelled eval never impacts what the scheduler has saw, so do not + // use it's indexes. + if eval.Status == structs.EvalStatusCancelled { + continue } // Prefer using the snapshot index. Otherwise use the create index @@ -820,5 +882,5 @@ func (w *deploymentWatcher) jobEvalStatus() (latestIndex uint64, blocked bool, e } } - return max, false, nil + return max, nil } diff --git a/nomad/deploymentwatcher/deployments_watcher_test.go b/nomad/deploymentwatcher/deployments_watcher_test.go index 6a033a0e978..a92a8e029a4 100644 --- a/nomad/deploymentwatcher/deployments_watcher_test.go +++ b/nomad/deploymentwatcher/deployments_watcher_test.go @@ -30,14 +30,14 @@ func defaultTestDeploymentWatcher(t *testing.T) (*Watcher, *mockBackend) { // Tests that the watcher properly watches for deployments and reconciles them func TestWatcher_WatchDeployments(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create three jobs j1, j2, j3 := mock.Job(), mock.Job(), mock.Job() - assert.Nil(m.state.UpsertJob(100, j1)) - assert.Nil(m.state.UpsertJob(101, j2)) - assert.Nil(m.state.UpsertJob(102, j3)) + require.Nil(m.state.UpsertJob(100, j1)) + require.Nil(m.state.UpsertJob(101, j2)) + require.Nil(m.state.UpsertJob(102, j3)) // Create three deployments all running d1, d2, d3 := mock.Deployment(), mock.Deployment(), mock.Deployment() @@ -46,14 +46,14 @@ func TestWatcher_WatchDeployments(t *testing.T) { d3.JobID = j3.ID // Upsert the first deployment - assert.Nil(m.state.UpsertDeployment(103, d1)) + require.Nil(m.state.UpsertDeployment(103, d1)) // Next list 3 block1 := make(chan time.Time) go func() { <-block1 - assert.Nil(m.state.UpsertDeployment(104, d2)) - assert.Nil(m.state.UpsertDeployment(105, d3)) + require.Nil(m.state.UpsertDeployment(104, d2)) + require.Nil(m.state.UpsertDeployment(105, d3)) }() //// Next list 3 but have one be terminal @@ -62,26 +62,27 @@ func TestWatcher_WatchDeployments(t *testing.T) { d3terminal.Status = structs.DeploymentStatusFailed go func() { <-block2 - assert.Nil(m.state.UpsertDeployment(106, d3terminal)) + require.Nil(m.state.UpsertDeployment(106, d3terminal)) }() w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "1 deployment returned") }) + func(err error) { require.Equal(1, len(w.watchers), "1 deployment returned") }) close(block1) testutil.WaitForResult(func() (bool, error) { return 3 == len(w.watchers), nil }, - func(err error) { assert.Equal(3, len(w.watchers), "3 deployment returned") }) + func(err error) { require.Equal(3, len(w.watchers), "3 deployment returned") }) close(block2) testutil.WaitForResult(func() (bool, error) { return 2 == len(w.watchers), nil }, - func(err error) { assert.Equal(3, len(w.watchers), "3 deployment returned - 1 terminal") }) + func(err error) { require.Equal(3, len(w.watchers), "3 deployment returned - 1 terminal") }) } // Tests that calls against an unknown deployment fail func TestWatcher_UnknownDeployment(t *testing.T) { t.Parallel() assert := assert.New(t) + require := require.New(t) w, m := defaultTestDeploymentWatcher(t) w.SetEnabled(true, m.state) @@ -97,7 +98,7 @@ func TestWatcher_UnknownDeployment(t *testing.T) { var resp structs.DeploymentUpdateResponse err := w.SetAllocHealth(req, &resp) if assert.NotNil(err, "should have error for unknown deployment") { - assert.Contains(err.Error(), expected) + require.Contains(err.Error(), expected) } // Request promoting against an unknown deployment @@ -107,7 +108,7 @@ func TestWatcher_UnknownDeployment(t *testing.T) { } err = w.PromoteDeployment(req2, &resp) if assert.NotNil(err, "should have error for unknown deployment") { - assert.Contains(err.Error(), expected) + require.Contains(err.Error(), expected) } // Request pausing against an unknown deployment @@ -117,7 +118,7 @@ func TestWatcher_UnknownDeployment(t *testing.T) { } err = w.PauseDeployment(req3, &resp) if assert.NotNil(err, "should have error for unknown deployment") { - assert.Contains(err.Error(), expected) + require.Contains(err.Error(), expected) } // Request failing against an unknown deployment @@ -126,7 +127,7 @@ func TestWatcher_UnknownDeployment(t *testing.T) { } err = w.FailDeployment(req4, &resp) if assert.NotNil(err, "should have error for unknown deployment") { - assert.Contains(err.Error(), expected) + require.Contains(err.Error(), expected) } } @@ -134,16 +135,17 @@ func TestWatcher_UnknownDeployment(t *testing.T) { func TestWatcher_SetAllocHealth_Unknown(t *testing.T) { t.Parallel() assert := assert.New(t) + require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create a job, and a deployment j := mock.Job() d := mock.Deployment() d.JobID = j.ID - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - // Assert that we get a call to UpsertDeploymentAllocHealth + // require that we get a call to UpsertDeploymentAllocHealth a := mock.Alloc() matchConfig := &matchDeploymentAllocHealthRequestConfig{ DeploymentID: d.ID, @@ -155,7 +157,7 @@ func TestWatcher_SetAllocHealth_Unknown(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) // Call SetAllocHealth req := &structs.DeploymentAllocHealthRequest{ @@ -165,15 +167,15 @@ func TestWatcher_SetAllocHealth_Unknown(t *testing.T) { var resp structs.DeploymentUpdateResponse err := w.SetAllocHealth(req, &resp) if assert.NotNil(err, "Set health of unknown allocation") { - assert.Contains(err.Error(), "unknown") + require.Contains(err.Error(), "unknown") } - assert.Equal(1, len(w.watchers), "Deployment should still be active") + require.Equal(1, len(w.watchers), "Deployment should still be active") } // Test setting allocation health func TestWatcher_SetAllocHealth_Healthy(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create a job, alloc, and a deployment @@ -182,11 +184,11 @@ func TestWatcher_SetAllocHealth_Healthy(t *testing.T) { d.JobID = j.ID a := mock.Alloc() a.DeploymentID = d.ID - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") - // Assert that we get a call to UpsertDeploymentAllocHealth + // require that we get a call to UpsertDeploymentAllocHealth matchConfig := &matchDeploymentAllocHealthRequestConfig{ DeploymentID: d.ID, Healthy: []string{a.ID}, @@ -197,7 +199,7 @@ func TestWatcher_SetAllocHealth_Healthy(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) // Call SetAllocHealth req := &structs.DeploymentAllocHealthRequest{ @@ -206,15 +208,15 @@ func TestWatcher_SetAllocHealth_Healthy(t *testing.T) { } var resp structs.DeploymentUpdateResponse err := w.SetAllocHealth(req, &resp) - assert.Nil(err, "SetAllocHealth") - assert.Equal(1, len(w.watchers), "Deployment should still be active") + require.Nil(err, "SetAllocHealth") + require.Equal(1, len(w.watchers), "Deployment should still be active") m.AssertCalled(t, "UpdateDeploymentAllocHealth", mocker.MatchedBy(matcher)) } // Test setting allocation unhealthy func TestWatcher_SetAllocHealth_Unhealthy(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create a job, alloc, and a deployment @@ -223,11 +225,11 @@ func TestWatcher_SetAllocHealth_Unhealthy(t *testing.T) { d.JobID = j.ID a := mock.Alloc() a.DeploymentID = d.ID - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") - // Assert that we get a call to UpsertDeploymentAllocHealth + // require that we get a call to UpsertDeploymentAllocHealth matchConfig := &matchDeploymentAllocHealthRequestConfig{ DeploymentID: d.ID, Unhealthy: []string{a.ID}, @@ -243,7 +245,7 @@ func TestWatcher_SetAllocHealth_Unhealthy(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) // Call SetAllocHealth req := &structs.DeploymentAllocHealthRequest{ @@ -252,17 +254,17 @@ func TestWatcher_SetAllocHealth_Unhealthy(t *testing.T) { } var resp structs.DeploymentUpdateResponse err := w.SetAllocHealth(req, &resp) - assert.Nil(err, "SetAllocHealth") + require.Nil(err, "SetAllocHealth") testutil.WaitForResult(func() (bool, error) { return 0 == len(w.watchers), nil }, - func(err error) { assert.Equal(0, len(w.watchers), "Should have no deployment") }) + func(err error) { require.Equal(0, len(w.watchers), "Should have no deployment") }) m.AssertNumberOfCalls(t, "UpdateDeploymentAllocHealth", 1) } // Test setting allocation unhealthy and that there should be a rollback func TestWatcher_SetAllocHealth_Unhealthy_Rollback(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create a job, alloc, and a deployment @@ -277,9 +279,9 @@ func TestWatcher_SetAllocHealth_Unhealthy_Rollback(t *testing.T) { d.TaskGroups["web"].AutoRevert = true a := mock.Alloc() a.DeploymentID = d.ID - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") // Upsert the job again to get a new version j2 := j.Copy() @@ -287,9 +289,9 @@ func TestWatcher_SetAllocHealth_Unhealthy_Rollback(t *testing.T) { // Modify the job to make its specification different j2.Meta["foo"] = "bar" - assert.Nil(m.state.UpsertJob(m.nextIndex(), j2), "UpsertJob2") + require.Nil(m.state.UpsertJob(m.nextIndex(), j2), "UpsertJob2") - // Assert that we get a call to UpsertDeploymentAllocHealth + // require that we get a call to UpsertDeploymentAllocHealth matchConfig := &matchDeploymentAllocHealthRequestConfig{ DeploymentID: d.ID, Unhealthy: []string{a.ID}, @@ -306,7 +308,7 @@ func TestWatcher_SetAllocHealth_Unhealthy_Rollback(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) // Call SetAllocHealth req := &structs.DeploymentAllocHealthRequest{ @@ -315,17 +317,17 @@ func TestWatcher_SetAllocHealth_Unhealthy_Rollback(t *testing.T) { } var resp structs.DeploymentUpdateResponse err := w.SetAllocHealth(req, &resp) - assert.Nil(err, "SetAllocHealth") + require.Nil(err, "SetAllocHealth") testutil.WaitForResult(func() (bool, error) { return 0 == len(w.watchers), nil }, - func(err error) { assert.Equal(0, len(w.watchers), "Should have no deployment") }) + func(err error) { require.Equal(0, len(w.watchers), "Should have no deployment") }) m.AssertNumberOfCalls(t, "UpdateDeploymentAllocHealth", 1) } // Test setting allocation unhealthy on job with identical spec and there should be no rollback func TestWatcher_SetAllocHealth_Unhealthy_NoRollback(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create a job, alloc, and a deployment @@ -340,17 +342,17 @@ func TestWatcher_SetAllocHealth_Unhealthy_NoRollback(t *testing.T) { d.TaskGroups["web"].AutoRevert = true a := mock.Alloc() a.DeploymentID = d.ID - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") // Upsert the job again to get a new version j2 := j.Copy() j2.Stable = false - assert.Nil(m.state.UpsertJob(m.nextIndex(), j2), "UpsertJob2") + require.Nil(m.state.UpsertJob(m.nextIndex(), j2), "UpsertJob2") - // Assert that we get a call to UpsertDeploymentAllocHealth + // require that we get a call to UpsertDeploymentAllocHealth matchConfig := &matchDeploymentAllocHealthRequestConfig{ DeploymentID: d.ID, Unhealthy: []string{a.ID}, @@ -367,7 +369,7 @@ func TestWatcher_SetAllocHealth_Unhealthy_NoRollback(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) // Call SetAllocHealth req := &structs.DeploymentAllocHealthRequest{ @@ -376,17 +378,17 @@ func TestWatcher_SetAllocHealth_Unhealthy_NoRollback(t *testing.T) { } var resp structs.DeploymentUpdateResponse err := w.SetAllocHealth(req, &resp) - assert.Nil(err, "SetAllocHealth") + require.Nil(err, "SetAllocHealth") testutil.WaitForResult(func() (bool, error) { return 0 == len(w.watchers), nil }, - func(err error) { assert.Equal(0, len(w.watchers), "Should have no deployment") }) + func(err error) { require.Equal(0, len(w.watchers), "Should have no deployment") }) m.AssertNumberOfCalls(t, "UpdateDeploymentAllocHealth", 1) } // Test promoting a deployment func TestWatcher_PromoteDeployment_HealthyCanaries(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create a job, canary alloc, and a deployment @@ -404,11 +406,11 @@ func TestWatcher_PromoteDeployment_HealthyCanaries(t *testing.T) { Healthy: helper.BoolToPtr(true), } a.DeploymentID = d.ID - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") - // Assert that we get a call to UpsertDeploymentPromotion + // require that we get a call to UpsertDeploymentPromotion matchConfig := &matchDeploymentPromoteRequestConfig{ Promotion: &structs.DeploymentPromoteRequest{ DeploymentID: d.ID, @@ -425,7 +427,7 @@ func TestWatcher_PromoteDeployment_HealthyCanaries(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) // Call PromoteDeployment req := &structs.DeploymentPromoteRequest{ @@ -434,15 +436,15 @@ func TestWatcher_PromoteDeployment_HealthyCanaries(t *testing.T) { } var resp structs.DeploymentUpdateResponse err := w.PromoteDeployment(req, &resp) - assert.Nil(err, "PromoteDeployment") - assert.Equal(1, len(w.watchers), "Deployment should still be active") + require.Nil(err, "PromoteDeployment") + require.Equal(1, len(w.watchers), "Deployment should still be active") m.AssertCalled(t, "UpdateDeploymentPromotion", mocker.MatchedBy(matcher)) } // Test promoting a deployment with unhealthy canaries func TestWatcher_PromoteDeployment_UnhealthyCanaries(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create a job, canary alloc, and a deployment @@ -457,11 +459,11 @@ func TestWatcher_PromoteDeployment_UnhealthyCanaries(t *testing.T) { d.TaskGroups[a.TaskGroup].PlacedCanaries = []string{a.ID} d.TaskGroups[a.TaskGroup].DesiredCanaries = 2 a.DeploymentID = d.ID - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") - // Assert that we get a call to UpsertDeploymentPromotion + // require that we get a call to UpsertDeploymentPromotion matchConfig := &matchDeploymentPromoteRequestConfig{ Promotion: &structs.DeploymentPromoteRequest{ DeploymentID: d.ID, @@ -474,7 +476,7 @@ func TestWatcher_PromoteDeployment_UnhealthyCanaries(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) // Call SetAllocHealth req := &structs.DeploymentPromoteRequest{ @@ -483,28 +485,28 @@ func TestWatcher_PromoteDeployment_UnhealthyCanaries(t *testing.T) { } var resp structs.DeploymentUpdateResponse err := w.PromoteDeployment(req, &resp) - if assert.NotNil(err, "PromoteDeployment") { - assert.Contains(err.Error(), `Task group "web" has 0/2 healthy allocations`, "Should error because canary isn't marked healthy") + if assert.NotNil(t, err, "PromoteDeployment") { + require.Contains(err.Error(), `Task group "web" has 0/2 healthy allocations`, "Should error because canary isn't marked healthy") } - assert.Equal(1, len(w.watchers), "Deployment should still be active") + require.Equal(1, len(w.watchers), "Deployment should still be active") m.AssertCalled(t, "UpdateDeploymentPromotion", mocker.MatchedBy(matcher)) } // Test pausing a deployment that is running func TestWatcher_PauseDeployment_Pause_Running(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create a job and a deployment j := mock.Job() d := mock.Deployment() d.JobID = j.ID - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - // Assert that we get a call to UpsertDeploymentStatusUpdate + // require that we get a call to UpsertDeploymentStatusUpdate matchConfig := &matchDeploymentStatusUpdateConfig{ DeploymentID: d.ID, Status: structs.DeploymentStatusPaused, @@ -515,7 +517,7 @@ func TestWatcher_PauseDeployment_Pause_Running(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) // Call PauseDeployment req := &structs.DeploymentPauseRequest{ @@ -524,16 +526,16 @@ func TestWatcher_PauseDeployment_Pause_Running(t *testing.T) { } var resp structs.DeploymentUpdateResponse err := w.PauseDeployment(req, &resp) - assert.Nil(err, "PauseDeployment") + require.Nil(err, "PauseDeployment") - assert.Equal(1, len(w.watchers), "Deployment should still be active") + require.Equal(1, len(w.watchers), "Deployment should still be active") m.AssertCalled(t, "UpdateDeploymentStatus", mocker.MatchedBy(matcher)) } // Test pausing a deployment that is paused func TestWatcher_PauseDeployment_Pause_Paused(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create a job and a deployment @@ -541,10 +543,10 @@ func TestWatcher_PauseDeployment_Pause_Paused(t *testing.T) { d := mock.Deployment() d.JobID = j.ID d.Status = structs.DeploymentStatusPaused - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - // Assert that we get a call to UpsertDeploymentStatusUpdate + // require that we get a call to UpsertDeploymentStatusUpdate matchConfig := &matchDeploymentStatusUpdateConfig{ DeploymentID: d.ID, Status: structs.DeploymentStatusPaused, @@ -555,7 +557,7 @@ func TestWatcher_PauseDeployment_Pause_Paused(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) // Call PauseDeployment req := &structs.DeploymentPauseRequest{ @@ -564,16 +566,16 @@ func TestWatcher_PauseDeployment_Pause_Paused(t *testing.T) { } var resp structs.DeploymentUpdateResponse err := w.PauseDeployment(req, &resp) - assert.Nil(err, "PauseDeployment") + require.Nil(err, "PauseDeployment") - assert.Equal(1, len(w.watchers), "Deployment should still be active") + require.Equal(1, len(w.watchers), "Deployment should still be active") m.AssertCalled(t, "UpdateDeploymentStatus", mocker.MatchedBy(matcher)) } // Test unpausing a deployment that is paused func TestWatcher_PauseDeployment_Unpause_Paused(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create a job and a deployment @@ -581,10 +583,10 @@ func TestWatcher_PauseDeployment_Unpause_Paused(t *testing.T) { d := mock.Deployment() d.JobID = j.ID d.Status = structs.DeploymentStatusPaused - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - // Assert that we get a call to UpsertDeploymentStatusUpdate + // require that we get a call to UpsertDeploymentStatusUpdate matchConfig := &matchDeploymentStatusUpdateConfig{ DeploymentID: d.ID, Status: structs.DeploymentStatusRunning, @@ -596,7 +598,7 @@ func TestWatcher_PauseDeployment_Unpause_Paused(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) // Call PauseDeployment req := &structs.DeploymentPauseRequest{ @@ -605,26 +607,26 @@ func TestWatcher_PauseDeployment_Unpause_Paused(t *testing.T) { } var resp structs.DeploymentUpdateResponse err := w.PauseDeployment(req, &resp) - assert.Nil(err, "PauseDeployment") + require.Nil(err, "PauseDeployment") - assert.Equal(1, len(w.watchers), "Deployment should still be active") + require.Equal(1, len(w.watchers), "Deployment should still be active") m.AssertCalled(t, "UpdateDeploymentStatus", mocker.MatchedBy(matcher)) } // Test unpausing a deployment that is running func TestWatcher_PauseDeployment_Unpause_Running(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create a job and a deployment j := mock.Job() d := mock.Deployment() d.JobID = j.ID - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - // Assert that we get a call to UpsertDeploymentStatusUpdate + // require that we get a call to UpsertDeploymentStatusUpdate matchConfig := &matchDeploymentStatusUpdateConfig{ DeploymentID: d.ID, Status: structs.DeploymentStatusRunning, @@ -636,7 +638,7 @@ func TestWatcher_PauseDeployment_Unpause_Running(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) // Call PauseDeployment req := &structs.DeploymentPauseRequest{ @@ -645,26 +647,26 @@ func TestWatcher_PauseDeployment_Unpause_Running(t *testing.T) { } var resp structs.DeploymentUpdateResponse err := w.PauseDeployment(req, &resp) - assert.Nil(err, "PauseDeployment") + require.Nil(err, "PauseDeployment") - assert.Equal(1, len(w.watchers), "Deployment should still be active") + require.Equal(1, len(w.watchers), "Deployment should still be active") m.AssertCalled(t, "UpdateDeploymentStatus", mocker.MatchedBy(matcher)) } // Test failing a deployment that is running func TestWatcher_FailDeployment_Running(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create a job and a deployment j := mock.Job() d := mock.Deployment() d.JobID = j.ID - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - // Assert that we get a call to UpsertDeploymentStatusUpdate + // require that we get a call to UpsertDeploymentStatusUpdate matchConfig := &matchDeploymentStatusUpdateConfig{ DeploymentID: d.ID, Status: structs.DeploymentStatusFailed, @@ -676,7 +678,7 @@ func TestWatcher_FailDeployment_Running(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) // Call PauseDeployment req := &structs.DeploymentFailRequest{ @@ -684,9 +686,9 @@ func TestWatcher_FailDeployment_Running(t *testing.T) { } var resp structs.DeploymentUpdateResponse err := w.FailDeployment(req, &resp) - assert.Nil(err, "FailDeployment") + require.Nil(err, "FailDeployment") - assert.Equal(1, len(w.watchers), "Deployment should still be active") + require.Equal(1, len(w.watchers), "Deployment should still be active") m.AssertCalled(t, "UpdateDeploymentStatus", mocker.MatchedBy(matcher)) } @@ -694,7 +696,7 @@ func TestWatcher_FailDeployment_Running(t *testing.T) { // proper actions func TestDeploymentWatcher_Watch_NoProgressDeadline(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := testDeploymentWatcher(t, 1000.0, 1*time.Millisecond) // Create a job, alloc, and a deployment @@ -709,23 +711,23 @@ func TestDeploymentWatcher_Watch_NoProgressDeadline(t *testing.T) { d.TaskGroups["web"].AutoRevert = true a := mock.Alloc() a.DeploymentID = d.ID - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") // Upsert the job again to get a new version j2 := j.Copy() // Modify the job to make its specification different j2.Meta["foo"] = "bar" j2.Stable = false - assert.Nil(m.state.UpsertJob(m.nextIndex(), j2), "UpsertJob2") + require.Nil(m.state.UpsertJob(m.nextIndex(), j2), "UpsertJob2") - // Assert that we will get a update allocation call only once. This will + // require that we will get a update allocation call only once. This will // verify that the watcher is batching allocation changes m1 := matchUpdateAllocDesiredTransitions([]string{d.ID}) m.On("UpdateAllocDesiredTransition", mocker.MatchedBy(m1)).Return(nil).Once() - // Assert that we get a call to UpsertDeploymentStatusUpdate + // require that we get a call to UpsertDeploymentStatusUpdate c := &matchDeploymentStatusUpdateConfig{ DeploymentID: d.ID, Status: structs.DeploymentStatusFailed, @@ -738,7 +740,7 @@ func TestDeploymentWatcher_Watch_NoProgressDeadline(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) // Update the allocs health to healthy which should create an evaluation for i := 0; i < 5; i++ { @@ -748,7 +750,7 @@ func TestDeploymentWatcher_Watch_NoProgressDeadline(t *testing.T) { HealthyAllocationIDs: []string{a.ID}, }, } - assert.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req), "UpsertDeploymentAllocHealth") + require.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req), "UpsertDeploymentAllocHealth") } // Wait for there to be one eval @@ -776,7 +778,7 @@ func TestDeploymentWatcher_Watch_NoProgressDeadline(t *testing.T) { UnhealthyAllocationIDs: []string{a.ID}, }, } - assert.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req2), "UpsertDeploymentAllocHealth") + require.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req2), "UpsertDeploymentAllocHealth") // Wait for there to be one eval testutil.WaitForResult(func() (bool, error) { @@ -797,7 +799,7 @@ func TestDeploymentWatcher_Watch_NoProgressDeadline(t *testing.T) { m.AssertCalled(t, "UpdateAllocDesiredTransition", mocker.MatchedBy(m1)) - // After we upsert the job version will go to 2. So use this to assert the + // After we upsert the job version will go to 2. So use this to require the // original call happened. c2 := &matchDeploymentStatusUpdateConfig{ DeploymentID: d.ID, @@ -809,12 +811,12 @@ func TestDeploymentWatcher_Watch_NoProgressDeadline(t *testing.T) { m3 := matchDeploymentStatusUpdateRequest(c2) m.AssertCalled(t, "UpdateDeploymentStatus", mocker.MatchedBy(m3)) testutil.WaitForResult(func() (bool, error) { return 0 == len(w.watchers), nil }, - func(err error) { assert.Equal(0, len(w.watchers), "Should have no deployment") }) + func(err error) { require.Equal(0, len(w.watchers), "Should have no deployment") }) } func TestDeploymentWatcher_Watch_ProgressDeadline(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := testDeploymentWatcher(t, 1000.0, 1*time.Millisecond) // Create a job, alloc, and a deployment @@ -831,11 +833,11 @@ func TestDeploymentWatcher_Watch_ProgressDeadline(t *testing.T) { a.CreateTime = now.UnixNano() a.ModifyTime = now.UnixNano() a.DeploymentID = d.ID - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") - // Assert that we get a call to UpsertDeploymentStatusUpdate + // require that we get a call to UpsertDeploymentStatusUpdate c := &matchDeploymentStatusUpdateConfig{ DeploymentID: d.ID, Status: structs.DeploymentStatusFailed, @@ -847,15 +849,15 @@ func TestDeploymentWatcher_Watch_ProgressDeadline(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) - // Update the alloc to be unhealthy and assert that nothing happens. + // Update the alloc to be unhealthy and require that nothing happens. a2 := a.Copy() a2.DeploymentStatus = &structs.AllocDeploymentStatus{ Healthy: helper.BoolToPtr(false), Timestamp: now, } - assert.Nil(m.state.UpdateAllocsFromClient(100, []*structs.Allocation{a2})) + require.Nil(m.state.UpdateAllocsFromClient(100, []*structs.Allocation{a2})) // Wait for the deployment to be failed testutil.WaitForResult(func() (bool, error) { @@ -869,7 +871,7 @@ func TestDeploymentWatcher_Watch_ProgressDeadline(t *testing.T) { t.Fatal(err) }) - // Assert there are is only one evaluation + // require there are is only one evaluation testutil.WaitForResult(func() (bool, error) { ws := memdb.NewWatchSet() evals, err := m.state.EvalsByJob(ws, j.Namespace, j.ID) @@ -887,6 +889,107 @@ func TestDeploymentWatcher_Watch_ProgressDeadline(t *testing.T) { }) } +// Test that progress deadline handling works when there are multiple groups +func TestDeploymentWatcher_ProgressCutoff(t *testing.T) { + t.Parallel() + require := require.New(t) + w, m := testDeploymentWatcher(t, 1000.0, 1*time.Millisecond) + + // Create a job, alloc, and a deployment + j := mock.Job() + j.TaskGroups[0].Count = 1 + j.TaskGroups[0].Update = structs.DefaultUpdateStrategy.Copy() + j.TaskGroups[0].Update.ProgressDeadline = 500 * time.Millisecond + j.TaskGroups = append(j.TaskGroups, j.TaskGroups[0].Copy()) + j.TaskGroups[1].Name = "foo" + j.TaskGroups[1].Update.ProgressDeadline = 1 * time.Second + j.Stable = true + + d := mock.Deployment() + d.JobID = j.ID + d.TaskGroups["web"].DesiredTotal = 1 + d.TaskGroups["foo"] = d.TaskGroups["web"].Copy() + d.TaskGroups["web"].ProgressDeadline = 500 * time.Millisecond + d.TaskGroups["foo"].ProgressDeadline = 1 * time.Second + + a := mock.Alloc() + now := time.Now() + a.CreateTime = now.UnixNano() + a.ModifyTime = now.UnixNano() + a.DeploymentID = d.ID + + a2 := mock.Alloc() + a2.TaskGroup = "foo" + a2.CreateTime = now.UnixNano() + a2.ModifyTime = now.UnixNano() + a2.DeploymentID = d.ID + + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a, a2}), "UpsertAllocs") + + // We may get an update for the desired transition. + m1 := matchUpdateAllocDesiredTransitions([]string{d.ID}) + m.On("UpdateAllocDesiredTransition", mocker.MatchedBy(m1)).Return(nil).Once() + + w.SetEnabled(true, m.state) + testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) + + watcher, err := w.getOrCreateWatcher(d.ID) + require.NoError(err) + require.NotNil(watcher) + + d1, err := m.state.DeploymentByID(nil, d.ID) + require.NoError(err) + + done := watcher.doneGroups(d1) + require.Contains(done, "web") + require.False(done["web"]) + require.Contains(done, "foo") + require.False(done["foo"]) + + cutoff1 := watcher.getDeploymentProgressCutoff(d1) + require.False(cutoff1.IsZero()) + + // Update the first allocation to be healthy + a3 := a.Copy() + a3.DeploymentStatus = &structs.AllocDeploymentStatus{Healthy: helper.BoolToPtr(true)} + require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a3}), "UpsertAllocs") + + // Get the updated deployment + d2, err := m.state.DeploymentByID(nil, d.ID) + require.NoError(err) + + done = watcher.doneGroups(d2) + require.Contains(done, "web") + require.True(done["web"]) + require.Contains(done, "foo") + require.False(done["foo"]) + + cutoff2 := watcher.getDeploymentProgressCutoff(d2) + require.False(cutoff2.IsZero()) + require.True(cutoff1.UnixNano() < cutoff2.UnixNano()) + + // Update the second allocation to be healthy + a4 := a2.Copy() + a4.DeploymentStatus = &structs.AllocDeploymentStatus{Healthy: helper.BoolToPtr(true)} + require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a4}), "UpsertAllocs") + + // Get the updated deployment + d3, err := m.state.DeploymentByID(nil, d.ID) + require.NoError(err) + + done = watcher.doneGroups(d3) + require.Contains(done, "web") + require.True(done["web"]) + require.Contains(done, "foo") + require.True(done["foo"]) + + cutoff3 := watcher.getDeploymentProgressCutoff(d2) + require.True(cutoff3.IsZero()) +} + // Test that we will allow the progress deadline to be reached when the canaries // are healthy but we haven't promoted func TestDeploymentWatcher_Watch_ProgressDeadline_Canaries(t *testing.T) { @@ -915,7 +1018,7 @@ func TestDeploymentWatcher_Watch_ProgressDeadline_Canaries(t *testing.T) { require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") - // Assert that we will get a createEvaluation call only once. This will + // require that we will get a createEvaluation call only once. This will // verify that the watcher is batching allocation changes m1 := matchUpdateAllocDesiredTransitions([]string{d.ID}) m.On("UpdateAllocDesiredTransition", mocker.MatchedBy(m1)).Return(nil).Once() @@ -1056,7 +1159,7 @@ func TestDeploymentWatcher_PromotedCanary_UpdatedAllocs(t *testing.T) { // After the deployment is updated, a failed alloc's DesiredTransition should be set func TestDeploymentWatcher_Watch_StartWithoutProgressDeadline(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := testDeploymentWatcher(t, 1000.0, 1*time.Millisecond) // Create a job, and a deployment @@ -1068,18 +1171,18 @@ func TestDeploymentWatcher_Watch_StartWithoutProgressDeadline(t *testing.T) { d := mock.Deployment() d.JobID = j.ID - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") a := mock.Alloc() a.CreateTime = time.Now().UnixNano() a.DeploymentID = d.ID - assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") d.TaskGroups["web"].ProgressDeadline = 500 * time.Millisecond // Update the deployment with a progress deadline - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") // Match on DesiredTransition set to Reschedule for the failed alloc m1 := matchUpdateAllocDesiredTransitionReschedule([]string{a.ID}) @@ -1087,7 +1190,7 @@ func TestDeploymentWatcher_Watch_StartWithoutProgressDeadline(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) // Update the alloc to be unhealthy a2 := a.Copy() @@ -1095,7 +1198,7 @@ func TestDeploymentWatcher_Watch_StartWithoutProgressDeadline(t *testing.T) { Healthy: helper.BoolToPtr(false), Timestamp: time.Now(), } - assert.Nil(m.state.UpdateAllocsFromClient(m.nextIndex(), []*structs.Allocation{a2})) + require.Nil(m.state.UpdateAllocsFromClient(m.nextIndex(), []*structs.Allocation{a2})) // Wait for the alloc's DesiredState to set reschedule testutil.WaitForResult(func() (bool, error) { @@ -1114,7 +1217,7 @@ func TestDeploymentWatcher_Watch_StartWithoutProgressDeadline(t *testing.T) { // Tests that the watcher fails rollback when the spec hasn't changed func TestDeploymentWatcher_RollbackFailed(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := testDeploymentWatcher(t, 1000.0, 1*time.Millisecond) // Create a job, alloc, and a deployment @@ -1129,22 +1232,22 @@ func TestDeploymentWatcher_RollbackFailed(t *testing.T) { d.TaskGroups["web"].AutoRevert = true a := mock.Alloc() a.DeploymentID = d.ID - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") // Upsert the job again to get a new version j2 := j.Copy() // Modify the job to make its specification different j2.Stable = false - assert.Nil(m.state.UpsertJob(m.nextIndex(), j2), "UpsertJob2") + require.Nil(m.state.UpsertJob(m.nextIndex(), j2), "UpsertJob2") - // Assert that we will get a createEvaluation call only once. This will + // require that we will get a createEvaluation call only once. This will // verify that the watcher is batching allocation changes m1 := matchUpdateAllocDesiredTransitions([]string{d.ID}) m.On("UpdateAllocDesiredTransition", mocker.MatchedBy(m1)).Return(nil).Once() - // Assert that we get a call to UpsertDeploymentStatusUpdate with roll back failed as the status + // require that we get a call to UpsertDeploymentStatusUpdate with roll back failed as the status c := &matchDeploymentStatusUpdateConfig{ DeploymentID: d.ID, Status: structs.DeploymentStatusFailed, @@ -1157,7 +1260,7 @@ func TestDeploymentWatcher_RollbackFailed(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) // Update the allocs health to healthy which should create an evaluation for i := 0; i < 5; i++ { @@ -1167,7 +1270,7 @@ func TestDeploymentWatcher_RollbackFailed(t *testing.T) { HealthyAllocationIDs: []string{a.ID}, }, } - assert.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req), "UpsertDeploymentAllocHealth") + require.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req), "UpsertDeploymentAllocHealth") } // Wait for there to be one eval @@ -1195,7 +1298,7 @@ func TestDeploymentWatcher_RollbackFailed(t *testing.T) { UnhealthyAllocationIDs: []string{a.ID}, }, } - assert.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req2), "UpsertDeploymentAllocHealth") + require.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req2), "UpsertDeploymentAllocHealth") // Wait for there to be one eval testutil.WaitForResult(func() (bool, error) { @@ -1218,13 +1321,13 @@ func TestDeploymentWatcher_RollbackFailed(t *testing.T) { // verify that the job version hasn't changed after upsert m.state.JobByID(nil, structs.DefaultNamespace, j.ID) - assert.Equal(uint64(0), j.Version, "Expected job version 0 but got ", j.Version) + require.Equal(uint64(0), j.Version, "Expected job version 0 but got ", j.Version) } // Test allocation updates and evaluation creation is batched between watchers func TestWatcher_BatchAllocUpdates(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := testDeploymentWatcher(t, 1000.0, 1*time.Second) // Create a job, alloc, for two deployments @@ -1248,14 +1351,14 @@ func TestWatcher_BatchAllocUpdates(t *testing.T) { a2.JobID = j2.ID a2.DeploymentID = d2.ID - assert.Nil(m.state.UpsertJob(m.nextIndex(), j1), "UpsertJob") - assert.Nil(m.state.UpsertJob(m.nextIndex(), j2), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d1), "UpsertDeployment") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d2), "UpsertDeployment") - assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a1}), "UpsertAllocs") - assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a2}), "UpsertAllocs") + require.Nil(m.state.UpsertJob(m.nextIndex(), j1), "UpsertJob") + require.Nil(m.state.UpsertJob(m.nextIndex(), j2), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d1), "UpsertDeployment") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d2), "UpsertDeployment") + require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a1}), "UpsertAllocs") + require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a2}), "UpsertAllocs") - // Assert that we will get a createEvaluation call only once and it contains + // require that we will get a createEvaluation call only once and it contains // both deployments. This will verify that the watcher is batching // allocation changes m1 := matchUpdateAllocDesiredTransitions([]string{d1.ID, d2.ID}) @@ -1263,7 +1366,7 @@ func TestWatcher_BatchAllocUpdates(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 2 == len(w.watchers), nil }, - func(err error) { assert.Equal(2, len(w.watchers), "Should have 2 deployment") }) + func(err error) { require.Equal(2, len(w.watchers), "Should have 2 deployment") }) // Update the allocs health to healthy which should create an evaluation req := &structs.ApplyDeploymentAllocHealthRequest{ @@ -1272,7 +1375,7 @@ func TestWatcher_BatchAllocUpdates(t *testing.T) { HealthyAllocationIDs: []string{a1.ID}, }, } - assert.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req), "UpsertDeploymentAllocHealth") + require.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req), "UpsertDeploymentAllocHealth") req2 := &structs.ApplyDeploymentAllocHealthRequest{ DeploymentAllocHealthRequest: structs.DeploymentAllocHealthRequest{ @@ -1280,7 +1383,7 @@ func TestWatcher_BatchAllocUpdates(t *testing.T) { HealthyAllocationIDs: []string{a2.ID}, }, } - assert.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req2), "UpsertDeploymentAllocHealth") + require.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req2), "UpsertDeploymentAllocHealth") // Wait for there to be one eval for each job testutil.WaitForResult(func() (bool, error) { @@ -1310,5 +1413,5 @@ func TestWatcher_BatchAllocUpdates(t *testing.T) { m.AssertCalled(t, "UpdateAllocDesiredTransition", mocker.MatchedBy(m1)) testutil.WaitForResult(func() (bool, error) { return 2 == len(w.watchers), nil }, - func(err error) { assert.Equal(2, len(w.watchers), "Should have 2 deployment") }) + func(err error) { require.Equal(2, len(w.watchers), "Should have 2 deployment") }) } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 8885ebab420..41c60090904 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -7,9 +7,8 @@ import ( "time" log "github.com/hashicorp/go-hclog" - multierror "github.com/hashicorp/go-multierror" - "github.com/hashicorp/go-memdb" + multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" ) @@ -1923,11 +1922,24 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, index uint64, a copyAlloc.ClientDescription = alloc.ClientDescription copyAlloc.TaskStates = alloc.TaskStates - // Merge the deployment status taking only what the client should set - oldDeploymentStatus := copyAlloc.DeploymentStatus - copyAlloc.DeploymentStatus = alloc.DeploymentStatus - if oldDeploymentStatus != nil && oldDeploymentStatus.Canary { - copyAlloc.DeploymentStatus.Canary = true + // The client can only set its deployment health and timestamp, so just take + // those + if copyAlloc.DeploymentStatus != nil && alloc.DeploymentStatus != nil { + oldHasHealthy := copyAlloc.DeploymentStatus.HasHealth() + newHasHealthy := alloc.DeploymentStatus.HasHealth() + + // We got new health information from the client + if newHasHealthy && (!oldHasHealthy || *copyAlloc.DeploymentStatus.Healthy != *alloc.DeploymentStatus.Healthy) { + // Updated deployment health and timestamp + copyAlloc.DeploymentStatus.Healthy = helper.BoolToPtr(*alloc.DeploymentStatus.Healthy) + copyAlloc.DeploymentStatus.Timestamp = alloc.DeploymentStatus.Timestamp + copyAlloc.DeploymentStatus.ModifyIndex = index + } + } else if alloc.DeploymentStatus != nil { + // First time getting a deployment status so copy everything and just + // set the index + copyAlloc.DeploymentStatus = alloc.DeploymentStatus.Copy() + copyAlloc.DeploymentStatus.ModifyIndex = index } // Update the modify index @@ -2695,7 +2707,7 @@ func (s *StateStore) UpdateDeploymentPromotion(index uint64, req *structs.ApplyD } // Ensure the canaries are healthy - if !alloc.DeploymentStatus.IsHealthy() { + if alloc.TerminalStatus() || !alloc.DeploymentStatus.IsHealthy() { continue } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 79cd8559f98..4ab83785812 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -3551,6 +3551,8 @@ func TestStateStore_UpdateAllocsFromClient_DeploymentStateMerges(t *testing.T) { require.Nil(err) require.NotNil(out) require.True(out.DeploymentStatus.Canary) + require.NotNil(out.DeploymentStatus.Healthy) + require.True(*out.DeploymentStatus.Healthy) } func TestStateStore_UpsertAlloc_Alloc(t *testing.T) { @@ -5468,7 +5470,15 @@ func TestStateStore_UpsertDeploymentPromotion_Unhealthy(t *testing.T) { c2.DeploymentID = d.ID d.TaskGroups[c2.TaskGroup].PlacedCanaries = append(d.TaskGroups[c2.TaskGroup].PlacedCanaries, c2.ID) - require.Nil(state.UpsertAllocs(3, []*structs.Allocation{c1, c2})) + // Create a healthy but terminal alloc + c3 := mock.Alloc() + c3.JobID = j.ID + c3.DeploymentID = d.ID + c3.DesiredStatus = structs.AllocDesiredStatusStop + c3.DeploymentStatus = &structs.AllocDeploymentStatus{Healthy: helper.BoolToPtr(true)} + d.TaskGroups[c3.TaskGroup].PlacedCanaries = append(d.TaskGroups[c3.TaskGroup].PlacedCanaries, c3.ID) + + require.Nil(state.UpsertAllocs(3, []*structs.Allocation{c1, c2, c3})) // Promote the canaries req := &structs.ApplyDeploymentPromoteRequest{