From 8615b1d558c85b9d6e49f6a1ea1729036c1f12bb Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 5 Nov 2018 14:43:07 -0800 Subject: [PATCH 1/3] Fix multiple tgs with progress deadline handling Fix an issue in which the deployment watcher would fail the deployment based on the earliest progress deadline of the deployment regardless of if the task group has finished. Further fix an issue where the blocked eval optimization would make it so no evals were created to progress the deployment. To reproduce this issue, prior to this commit, you can create a job with two task groups. The first group has count 1 and resources such that it can not be placed. The second group has count 3, max_parallel=1, and can be placed. Run this first and then update the second group to do a deployment. It will place the first of three, but never progress since there exists a blocked eval. However, that doesn't capture the fact that there are two groups being deployed. --- command/agent/http.go | 3 +- nomad/deploymentwatcher/deployment_watcher.go | 114 +++-- .../deployments_watcher_test.go | 421 +++++++++++------- nomad/state/state_store.go | 5 +- nomad/state/state_store_test.go | 10 +- 5 files changed, 359 insertions(+), 194 deletions(-) diff --git a/command/agent/http.go b/command/agent/http.go index 857f5f8bcde..3f8a067a871 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -13,10 +13,9 @@ import ( "strings" "time" + "github.com/NYTimes/gziphandler" assetfs "github.com/elazarl/go-bindata-assetfs" log "github.com/hashicorp/go-hclog" - - "github.com/NYTimes/gziphandler" "github.com/hashicorp/nomad/helper/tlsutil" "github.com/hashicorp/nomad/nomad/structs" "github.com/rs/cors" diff --git a/nomad/deploymentwatcher/deployment_watcher.go b/nomad/deploymentwatcher/deployment_watcher.go index cca75e5dee4..55703c5a96f 100644 --- a/nomad/deploymentwatcher/deployment_watcher.go +++ b/nomad/deploymentwatcher/deployment_watcher.go @@ -6,15 +6,13 @@ import ( "sync" "time" - "golang.org/x/time/rate" - log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" - "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" + "golang.org/x/time/rate" ) const ( @@ -363,7 +361,7 @@ func (w *deploymentWatcher) watch() { // handle the case that the deployment has already progressed and we are now // just starting to watch it. This must likely would occur if there was a // leader transition and we are now starting our watcher. - currentDeadline := getDeploymentProgressCutoff(w.getDeployment()) + currentDeadline := w.getDeploymentProgressCutoff(w.getDeployment()) var deadlineTimer *time.Timer if currentDeadline.IsZero() { deadlineTimer = time.NewTimer(0) @@ -405,7 +403,7 @@ FAIL: case <-w.deploymentUpdateCh: // Get the updated deployment and check if we should change the // deadline timer - next := getDeploymentProgressCutoff(w.getDeployment()) + next := w.getDeploymentProgressCutoff(w.getDeployment()) if !next.Equal(currentDeadline) { prevDeadlineZero := currentDeadline.IsZero() currentDeadline = next @@ -419,7 +417,12 @@ FAIL: default: } } - deadlineTimer.Reset(next.Sub(time.Now())) + + // If the next deadline is zero, we should not reset the timer + // as we aren't tracking towards a progress deadline yet. + if !next.IsZero() { + deadlineTimer.Reset(next.Sub(time.Now())) + } } case updates = <-w.getAllocsCh(allocIndex): @@ -506,7 +509,7 @@ func (w *deploymentWatcher) handleAllocUpdate(allocs []*structs.AllocListStub) ( var res allocUpdateResult // Get the latest evaluation index - latestEval, blocked, err := w.jobEvalStatus() + latestEval, err := w.jobEvalStatus() if err != nil { if err == context.Canceled || w.ctx.Err() == context.Canceled { return res, err @@ -534,7 +537,7 @@ func (w *deploymentWatcher) handleAllocUpdate(allocs []*structs.AllocListStub) ( } // We need to create an eval so the job can progress. - if !blocked && alloc.DeploymentStatus.IsHealthy() && alloc.DeploymentStatus.ModifyIndex > latestEval { + if alloc.DeploymentStatus.IsHealthy() && alloc.DeploymentStatus.ModifyIndex > latestEval { res.createEval = true } @@ -611,9 +614,20 @@ func (w *deploymentWatcher) shouldFail() (fail, rollback bool, err error) { // getDeploymentProgressCutoff returns the progress cutoff for the given // deployment -func getDeploymentProgressCutoff(d *structs.Deployment) time.Time { +func (w *deploymentWatcher) getDeploymentProgressCutoff(d *structs.Deployment) time.Time { var next time.Time - for _, state := range d.TaskGroups { + doneTGs := w.doneGroups(d) + for name, state := range d.TaskGroups { + // This task group is done so we don't have to concern ourselves with + // its progress deadline. + if done, ok := doneTGs[name]; ok && done { + continue + } + + if state.RequireProgressBy.IsZero() { + continue + } + if next.IsZero() || state.RequireProgressBy.Before(next) { next = state.RequireProgressBy } @@ -621,6 +635,51 @@ func getDeploymentProgressCutoff(d *structs.Deployment) time.Time { return next } +// doneGroups returns a map of task group to whether the deployment appears to +// be done for the group. A true value doesn't mean no more action will be taken +// in the life time of the deployment because there could always be node +// failures, or rescheduling events. +func (w *deploymentWatcher) doneGroups(d *structs.Deployment) map[string]bool { + if d == nil { + return nil + } + + // Collect the allocations by the task group + snap, err := w.state.Snapshot() + if err != nil { + return nil + } + + allocs, err := snap.AllocsByDeployment(nil, d.ID) + if err != nil { + return nil + } + + // Go through the allocs and count up how many healthy allocs we have + healthy := make(map[string]int, len(d.TaskGroups)) + for _, a := range allocs { + if a.TerminalStatus() || !a.DeploymentStatus.IsHealthy() { + continue + } + healthy[a.TaskGroup]++ + } + + // Go through each group and check if it done + groups := make(map[string]bool, len(d.TaskGroups)) + for name, state := range d.TaskGroups { + // Requires promotion + if state.DesiredCanaries != 0 && !state.Promoted { + groups[name] = false + continue + } + + // Check we have enough healthy currently running allocations + groups[name] = healthy[name] >= state.DesiredTotal + } + + return groups +} + // latestStableJob returns the latest stable job. It may be nil if none exist func (w *deploymentWatcher) latestStableJob() (*structs.Job, error) { snap, err := w.state.Snapshot() @@ -779,37 +838,29 @@ func (w *deploymentWatcher) getAllocsImpl(ws memdb.WatchSet, state *state.StateS return stubs, maxIndex, nil } -// jobEvalStatus returns the eval status for a job. It returns the index of the -// last evaluation created for the job, as well as whether there exists a -// blocked evaluation for the job. The index is used to determine if an -// allocation update requires an evaluation to be triggered. If there already is -// a blocked evaluations, no eval should be created. -func (w *deploymentWatcher) jobEvalStatus() (latestIndex uint64, blocked bool, err error) { +// jobEvalStatus returns the latest eval index for a job. The index is used to +// determine if an allocation update requires an evaluation to be triggered. +func (w *deploymentWatcher) jobEvalStatus() (latestIndex uint64, err error) { if err := w.queryLimiter.Wait(w.ctx); err != nil { - return 0, false, err + return 0, err } snap, err := w.state.Snapshot() if err != nil { - return 0, false, err + return 0, err } evals, err := snap.EvalsByJob(nil, w.j.Namespace, w.j.ID) if err != nil { - return 0, false, err - } - - if len(evals) == 0 { - index, err := snap.Index("evals") - return index, false, err + return 0, err } var max uint64 for _, eval := range evals { - // If we have a blocked eval, then we do not care what the index is - // since we will not need to make a new eval. - if eval.ShouldBlock() { - return 0, true, nil + // A cancelled eval never impacts what the scheduler has saw, so do not + // use it's indexes. + if eval.Status == structs.EvalStatusCancelled { + continue } // Prefer using the snapshot index. Otherwise use the create index @@ -820,5 +871,10 @@ func (w *deploymentWatcher) jobEvalStatus() (latestIndex uint64, blocked bool, e } } - return max, false, nil + if max == uint64(0) { + index, err := snap.Index("evals") + return index, err + } + + return max, nil } diff --git a/nomad/deploymentwatcher/deployments_watcher_test.go b/nomad/deploymentwatcher/deployments_watcher_test.go index 6a033a0e978..a92a8e029a4 100644 --- a/nomad/deploymentwatcher/deployments_watcher_test.go +++ b/nomad/deploymentwatcher/deployments_watcher_test.go @@ -30,14 +30,14 @@ func defaultTestDeploymentWatcher(t *testing.T) (*Watcher, *mockBackend) { // Tests that the watcher properly watches for deployments and reconciles them func TestWatcher_WatchDeployments(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create three jobs j1, j2, j3 := mock.Job(), mock.Job(), mock.Job() - assert.Nil(m.state.UpsertJob(100, j1)) - assert.Nil(m.state.UpsertJob(101, j2)) - assert.Nil(m.state.UpsertJob(102, j3)) + require.Nil(m.state.UpsertJob(100, j1)) + require.Nil(m.state.UpsertJob(101, j2)) + require.Nil(m.state.UpsertJob(102, j3)) // Create three deployments all running d1, d2, d3 := mock.Deployment(), mock.Deployment(), mock.Deployment() @@ -46,14 +46,14 @@ func TestWatcher_WatchDeployments(t *testing.T) { d3.JobID = j3.ID // Upsert the first deployment - assert.Nil(m.state.UpsertDeployment(103, d1)) + require.Nil(m.state.UpsertDeployment(103, d1)) // Next list 3 block1 := make(chan time.Time) go func() { <-block1 - assert.Nil(m.state.UpsertDeployment(104, d2)) - assert.Nil(m.state.UpsertDeployment(105, d3)) + require.Nil(m.state.UpsertDeployment(104, d2)) + require.Nil(m.state.UpsertDeployment(105, d3)) }() //// Next list 3 but have one be terminal @@ -62,26 +62,27 @@ func TestWatcher_WatchDeployments(t *testing.T) { d3terminal.Status = structs.DeploymentStatusFailed go func() { <-block2 - assert.Nil(m.state.UpsertDeployment(106, d3terminal)) + require.Nil(m.state.UpsertDeployment(106, d3terminal)) }() w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "1 deployment returned") }) + func(err error) { require.Equal(1, len(w.watchers), "1 deployment returned") }) close(block1) testutil.WaitForResult(func() (bool, error) { return 3 == len(w.watchers), nil }, - func(err error) { assert.Equal(3, len(w.watchers), "3 deployment returned") }) + func(err error) { require.Equal(3, len(w.watchers), "3 deployment returned") }) close(block2) testutil.WaitForResult(func() (bool, error) { return 2 == len(w.watchers), nil }, - func(err error) { assert.Equal(3, len(w.watchers), "3 deployment returned - 1 terminal") }) + func(err error) { require.Equal(3, len(w.watchers), "3 deployment returned - 1 terminal") }) } // Tests that calls against an unknown deployment fail func TestWatcher_UnknownDeployment(t *testing.T) { t.Parallel() assert := assert.New(t) + require := require.New(t) w, m := defaultTestDeploymentWatcher(t) w.SetEnabled(true, m.state) @@ -97,7 +98,7 @@ func TestWatcher_UnknownDeployment(t *testing.T) { var resp structs.DeploymentUpdateResponse err := w.SetAllocHealth(req, &resp) if assert.NotNil(err, "should have error for unknown deployment") { - assert.Contains(err.Error(), expected) + require.Contains(err.Error(), expected) } // Request promoting against an unknown deployment @@ -107,7 +108,7 @@ func TestWatcher_UnknownDeployment(t *testing.T) { } err = w.PromoteDeployment(req2, &resp) if assert.NotNil(err, "should have error for unknown deployment") { - assert.Contains(err.Error(), expected) + require.Contains(err.Error(), expected) } // Request pausing against an unknown deployment @@ -117,7 +118,7 @@ func TestWatcher_UnknownDeployment(t *testing.T) { } err = w.PauseDeployment(req3, &resp) if assert.NotNil(err, "should have error for unknown deployment") { - assert.Contains(err.Error(), expected) + require.Contains(err.Error(), expected) } // Request failing against an unknown deployment @@ -126,7 +127,7 @@ func TestWatcher_UnknownDeployment(t *testing.T) { } err = w.FailDeployment(req4, &resp) if assert.NotNil(err, "should have error for unknown deployment") { - assert.Contains(err.Error(), expected) + require.Contains(err.Error(), expected) } } @@ -134,16 +135,17 @@ func TestWatcher_UnknownDeployment(t *testing.T) { func TestWatcher_SetAllocHealth_Unknown(t *testing.T) { t.Parallel() assert := assert.New(t) + require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create a job, and a deployment j := mock.Job() d := mock.Deployment() d.JobID = j.ID - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - // Assert that we get a call to UpsertDeploymentAllocHealth + // require that we get a call to UpsertDeploymentAllocHealth a := mock.Alloc() matchConfig := &matchDeploymentAllocHealthRequestConfig{ DeploymentID: d.ID, @@ -155,7 +157,7 @@ func TestWatcher_SetAllocHealth_Unknown(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) // Call SetAllocHealth req := &structs.DeploymentAllocHealthRequest{ @@ -165,15 +167,15 @@ func TestWatcher_SetAllocHealth_Unknown(t *testing.T) { var resp structs.DeploymentUpdateResponse err := w.SetAllocHealth(req, &resp) if assert.NotNil(err, "Set health of unknown allocation") { - assert.Contains(err.Error(), "unknown") + require.Contains(err.Error(), "unknown") } - assert.Equal(1, len(w.watchers), "Deployment should still be active") + require.Equal(1, len(w.watchers), "Deployment should still be active") } // Test setting allocation health func TestWatcher_SetAllocHealth_Healthy(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create a job, alloc, and a deployment @@ -182,11 +184,11 @@ func TestWatcher_SetAllocHealth_Healthy(t *testing.T) { d.JobID = j.ID a := mock.Alloc() a.DeploymentID = d.ID - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") - // Assert that we get a call to UpsertDeploymentAllocHealth + // require that we get a call to UpsertDeploymentAllocHealth matchConfig := &matchDeploymentAllocHealthRequestConfig{ DeploymentID: d.ID, Healthy: []string{a.ID}, @@ -197,7 +199,7 @@ func TestWatcher_SetAllocHealth_Healthy(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) // Call SetAllocHealth req := &structs.DeploymentAllocHealthRequest{ @@ -206,15 +208,15 @@ func TestWatcher_SetAllocHealth_Healthy(t *testing.T) { } var resp structs.DeploymentUpdateResponse err := w.SetAllocHealth(req, &resp) - assert.Nil(err, "SetAllocHealth") - assert.Equal(1, len(w.watchers), "Deployment should still be active") + require.Nil(err, "SetAllocHealth") + require.Equal(1, len(w.watchers), "Deployment should still be active") m.AssertCalled(t, "UpdateDeploymentAllocHealth", mocker.MatchedBy(matcher)) } // Test setting allocation unhealthy func TestWatcher_SetAllocHealth_Unhealthy(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create a job, alloc, and a deployment @@ -223,11 +225,11 @@ func TestWatcher_SetAllocHealth_Unhealthy(t *testing.T) { d.JobID = j.ID a := mock.Alloc() a.DeploymentID = d.ID - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") - // Assert that we get a call to UpsertDeploymentAllocHealth + // require that we get a call to UpsertDeploymentAllocHealth matchConfig := &matchDeploymentAllocHealthRequestConfig{ DeploymentID: d.ID, Unhealthy: []string{a.ID}, @@ -243,7 +245,7 @@ func TestWatcher_SetAllocHealth_Unhealthy(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) // Call SetAllocHealth req := &structs.DeploymentAllocHealthRequest{ @@ -252,17 +254,17 @@ func TestWatcher_SetAllocHealth_Unhealthy(t *testing.T) { } var resp structs.DeploymentUpdateResponse err := w.SetAllocHealth(req, &resp) - assert.Nil(err, "SetAllocHealth") + require.Nil(err, "SetAllocHealth") testutil.WaitForResult(func() (bool, error) { return 0 == len(w.watchers), nil }, - func(err error) { assert.Equal(0, len(w.watchers), "Should have no deployment") }) + func(err error) { require.Equal(0, len(w.watchers), "Should have no deployment") }) m.AssertNumberOfCalls(t, "UpdateDeploymentAllocHealth", 1) } // Test setting allocation unhealthy and that there should be a rollback func TestWatcher_SetAllocHealth_Unhealthy_Rollback(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create a job, alloc, and a deployment @@ -277,9 +279,9 @@ func TestWatcher_SetAllocHealth_Unhealthy_Rollback(t *testing.T) { d.TaskGroups["web"].AutoRevert = true a := mock.Alloc() a.DeploymentID = d.ID - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") // Upsert the job again to get a new version j2 := j.Copy() @@ -287,9 +289,9 @@ func TestWatcher_SetAllocHealth_Unhealthy_Rollback(t *testing.T) { // Modify the job to make its specification different j2.Meta["foo"] = "bar" - assert.Nil(m.state.UpsertJob(m.nextIndex(), j2), "UpsertJob2") + require.Nil(m.state.UpsertJob(m.nextIndex(), j2), "UpsertJob2") - // Assert that we get a call to UpsertDeploymentAllocHealth + // require that we get a call to UpsertDeploymentAllocHealth matchConfig := &matchDeploymentAllocHealthRequestConfig{ DeploymentID: d.ID, Unhealthy: []string{a.ID}, @@ -306,7 +308,7 @@ func TestWatcher_SetAllocHealth_Unhealthy_Rollback(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) // Call SetAllocHealth req := &structs.DeploymentAllocHealthRequest{ @@ -315,17 +317,17 @@ func TestWatcher_SetAllocHealth_Unhealthy_Rollback(t *testing.T) { } var resp structs.DeploymentUpdateResponse err := w.SetAllocHealth(req, &resp) - assert.Nil(err, "SetAllocHealth") + require.Nil(err, "SetAllocHealth") testutil.WaitForResult(func() (bool, error) { return 0 == len(w.watchers), nil }, - func(err error) { assert.Equal(0, len(w.watchers), "Should have no deployment") }) + func(err error) { require.Equal(0, len(w.watchers), "Should have no deployment") }) m.AssertNumberOfCalls(t, "UpdateDeploymentAllocHealth", 1) } // Test setting allocation unhealthy on job with identical spec and there should be no rollback func TestWatcher_SetAllocHealth_Unhealthy_NoRollback(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create a job, alloc, and a deployment @@ -340,17 +342,17 @@ func TestWatcher_SetAllocHealth_Unhealthy_NoRollback(t *testing.T) { d.TaskGroups["web"].AutoRevert = true a := mock.Alloc() a.DeploymentID = d.ID - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") // Upsert the job again to get a new version j2 := j.Copy() j2.Stable = false - assert.Nil(m.state.UpsertJob(m.nextIndex(), j2), "UpsertJob2") + require.Nil(m.state.UpsertJob(m.nextIndex(), j2), "UpsertJob2") - // Assert that we get a call to UpsertDeploymentAllocHealth + // require that we get a call to UpsertDeploymentAllocHealth matchConfig := &matchDeploymentAllocHealthRequestConfig{ DeploymentID: d.ID, Unhealthy: []string{a.ID}, @@ -367,7 +369,7 @@ func TestWatcher_SetAllocHealth_Unhealthy_NoRollback(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) // Call SetAllocHealth req := &structs.DeploymentAllocHealthRequest{ @@ -376,17 +378,17 @@ func TestWatcher_SetAllocHealth_Unhealthy_NoRollback(t *testing.T) { } var resp structs.DeploymentUpdateResponse err := w.SetAllocHealth(req, &resp) - assert.Nil(err, "SetAllocHealth") + require.Nil(err, "SetAllocHealth") testutil.WaitForResult(func() (bool, error) { return 0 == len(w.watchers), nil }, - func(err error) { assert.Equal(0, len(w.watchers), "Should have no deployment") }) + func(err error) { require.Equal(0, len(w.watchers), "Should have no deployment") }) m.AssertNumberOfCalls(t, "UpdateDeploymentAllocHealth", 1) } // Test promoting a deployment func TestWatcher_PromoteDeployment_HealthyCanaries(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create a job, canary alloc, and a deployment @@ -404,11 +406,11 @@ func TestWatcher_PromoteDeployment_HealthyCanaries(t *testing.T) { Healthy: helper.BoolToPtr(true), } a.DeploymentID = d.ID - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") - // Assert that we get a call to UpsertDeploymentPromotion + // require that we get a call to UpsertDeploymentPromotion matchConfig := &matchDeploymentPromoteRequestConfig{ Promotion: &structs.DeploymentPromoteRequest{ DeploymentID: d.ID, @@ -425,7 +427,7 @@ func TestWatcher_PromoteDeployment_HealthyCanaries(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) // Call PromoteDeployment req := &structs.DeploymentPromoteRequest{ @@ -434,15 +436,15 @@ func TestWatcher_PromoteDeployment_HealthyCanaries(t *testing.T) { } var resp structs.DeploymentUpdateResponse err := w.PromoteDeployment(req, &resp) - assert.Nil(err, "PromoteDeployment") - assert.Equal(1, len(w.watchers), "Deployment should still be active") + require.Nil(err, "PromoteDeployment") + require.Equal(1, len(w.watchers), "Deployment should still be active") m.AssertCalled(t, "UpdateDeploymentPromotion", mocker.MatchedBy(matcher)) } // Test promoting a deployment with unhealthy canaries func TestWatcher_PromoteDeployment_UnhealthyCanaries(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create a job, canary alloc, and a deployment @@ -457,11 +459,11 @@ func TestWatcher_PromoteDeployment_UnhealthyCanaries(t *testing.T) { d.TaskGroups[a.TaskGroup].PlacedCanaries = []string{a.ID} d.TaskGroups[a.TaskGroup].DesiredCanaries = 2 a.DeploymentID = d.ID - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") - // Assert that we get a call to UpsertDeploymentPromotion + // require that we get a call to UpsertDeploymentPromotion matchConfig := &matchDeploymentPromoteRequestConfig{ Promotion: &structs.DeploymentPromoteRequest{ DeploymentID: d.ID, @@ -474,7 +476,7 @@ func TestWatcher_PromoteDeployment_UnhealthyCanaries(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) // Call SetAllocHealth req := &structs.DeploymentPromoteRequest{ @@ -483,28 +485,28 @@ func TestWatcher_PromoteDeployment_UnhealthyCanaries(t *testing.T) { } var resp structs.DeploymentUpdateResponse err := w.PromoteDeployment(req, &resp) - if assert.NotNil(err, "PromoteDeployment") { - assert.Contains(err.Error(), `Task group "web" has 0/2 healthy allocations`, "Should error because canary isn't marked healthy") + if assert.NotNil(t, err, "PromoteDeployment") { + require.Contains(err.Error(), `Task group "web" has 0/2 healthy allocations`, "Should error because canary isn't marked healthy") } - assert.Equal(1, len(w.watchers), "Deployment should still be active") + require.Equal(1, len(w.watchers), "Deployment should still be active") m.AssertCalled(t, "UpdateDeploymentPromotion", mocker.MatchedBy(matcher)) } // Test pausing a deployment that is running func TestWatcher_PauseDeployment_Pause_Running(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create a job and a deployment j := mock.Job() d := mock.Deployment() d.JobID = j.ID - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - // Assert that we get a call to UpsertDeploymentStatusUpdate + // require that we get a call to UpsertDeploymentStatusUpdate matchConfig := &matchDeploymentStatusUpdateConfig{ DeploymentID: d.ID, Status: structs.DeploymentStatusPaused, @@ -515,7 +517,7 @@ func TestWatcher_PauseDeployment_Pause_Running(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) // Call PauseDeployment req := &structs.DeploymentPauseRequest{ @@ -524,16 +526,16 @@ func TestWatcher_PauseDeployment_Pause_Running(t *testing.T) { } var resp structs.DeploymentUpdateResponse err := w.PauseDeployment(req, &resp) - assert.Nil(err, "PauseDeployment") + require.Nil(err, "PauseDeployment") - assert.Equal(1, len(w.watchers), "Deployment should still be active") + require.Equal(1, len(w.watchers), "Deployment should still be active") m.AssertCalled(t, "UpdateDeploymentStatus", mocker.MatchedBy(matcher)) } // Test pausing a deployment that is paused func TestWatcher_PauseDeployment_Pause_Paused(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create a job and a deployment @@ -541,10 +543,10 @@ func TestWatcher_PauseDeployment_Pause_Paused(t *testing.T) { d := mock.Deployment() d.JobID = j.ID d.Status = structs.DeploymentStatusPaused - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - // Assert that we get a call to UpsertDeploymentStatusUpdate + // require that we get a call to UpsertDeploymentStatusUpdate matchConfig := &matchDeploymentStatusUpdateConfig{ DeploymentID: d.ID, Status: structs.DeploymentStatusPaused, @@ -555,7 +557,7 @@ func TestWatcher_PauseDeployment_Pause_Paused(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) // Call PauseDeployment req := &structs.DeploymentPauseRequest{ @@ -564,16 +566,16 @@ func TestWatcher_PauseDeployment_Pause_Paused(t *testing.T) { } var resp structs.DeploymentUpdateResponse err := w.PauseDeployment(req, &resp) - assert.Nil(err, "PauseDeployment") + require.Nil(err, "PauseDeployment") - assert.Equal(1, len(w.watchers), "Deployment should still be active") + require.Equal(1, len(w.watchers), "Deployment should still be active") m.AssertCalled(t, "UpdateDeploymentStatus", mocker.MatchedBy(matcher)) } // Test unpausing a deployment that is paused func TestWatcher_PauseDeployment_Unpause_Paused(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create a job and a deployment @@ -581,10 +583,10 @@ func TestWatcher_PauseDeployment_Unpause_Paused(t *testing.T) { d := mock.Deployment() d.JobID = j.ID d.Status = structs.DeploymentStatusPaused - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - // Assert that we get a call to UpsertDeploymentStatusUpdate + // require that we get a call to UpsertDeploymentStatusUpdate matchConfig := &matchDeploymentStatusUpdateConfig{ DeploymentID: d.ID, Status: structs.DeploymentStatusRunning, @@ -596,7 +598,7 @@ func TestWatcher_PauseDeployment_Unpause_Paused(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) // Call PauseDeployment req := &structs.DeploymentPauseRequest{ @@ -605,26 +607,26 @@ func TestWatcher_PauseDeployment_Unpause_Paused(t *testing.T) { } var resp structs.DeploymentUpdateResponse err := w.PauseDeployment(req, &resp) - assert.Nil(err, "PauseDeployment") + require.Nil(err, "PauseDeployment") - assert.Equal(1, len(w.watchers), "Deployment should still be active") + require.Equal(1, len(w.watchers), "Deployment should still be active") m.AssertCalled(t, "UpdateDeploymentStatus", mocker.MatchedBy(matcher)) } // Test unpausing a deployment that is running func TestWatcher_PauseDeployment_Unpause_Running(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create a job and a deployment j := mock.Job() d := mock.Deployment() d.JobID = j.ID - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - // Assert that we get a call to UpsertDeploymentStatusUpdate + // require that we get a call to UpsertDeploymentStatusUpdate matchConfig := &matchDeploymentStatusUpdateConfig{ DeploymentID: d.ID, Status: structs.DeploymentStatusRunning, @@ -636,7 +638,7 @@ func TestWatcher_PauseDeployment_Unpause_Running(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) // Call PauseDeployment req := &structs.DeploymentPauseRequest{ @@ -645,26 +647,26 @@ func TestWatcher_PauseDeployment_Unpause_Running(t *testing.T) { } var resp structs.DeploymentUpdateResponse err := w.PauseDeployment(req, &resp) - assert.Nil(err, "PauseDeployment") + require.Nil(err, "PauseDeployment") - assert.Equal(1, len(w.watchers), "Deployment should still be active") + require.Equal(1, len(w.watchers), "Deployment should still be active") m.AssertCalled(t, "UpdateDeploymentStatus", mocker.MatchedBy(matcher)) } // Test failing a deployment that is running func TestWatcher_FailDeployment_Running(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create a job and a deployment j := mock.Job() d := mock.Deployment() d.JobID = j.ID - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - // Assert that we get a call to UpsertDeploymentStatusUpdate + // require that we get a call to UpsertDeploymentStatusUpdate matchConfig := &matchDeploymentStatusUpdateConfig{ DeploymentID: d.ID, Status: structs.DeploymentStatusFailed, @@ -676,7 +678,7 @@ func TestWatcher_FailDeployment_Running(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) // Call PauseDeployment req := &structs.DeploymentFailRequest{ @@ -684,9 +686,9 @@ func TestWatcher_FailDeployment_Running(t *testing.T) { } var resp structs.DeploymentUpdateResponse err := w.FailDeployment(req, &resp) - assert.Nil(err, "FailDeployment") + require.Nil(err, "FailDeployment") - assert.Equal(1, len(w.watchers), "Deployment should still be active") + require.Equal(1, len(w.watchers), "Deployment should still be active") m.AssertCalled(t, "UpdateDeploymentStatus", mocker.MatchedBy(matcher)) } @@ -694,7 +696,7 @@ func TestWatcher_FailDeployment_Running(t *testing.T) { // proper actions func TestDeploymentWatcher_Watch_NoProgressDeadline(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := testDeploymentWatcher(t, 1000.0, 1*time.Millisecond) // Create a job, alloc, and a deployment @@ -709,23 +711,23 @@ func TestDeploymentWatcher_Watch_NoProgressDeadline(t *testing.T) { d.TaskGroups["web"].AutoRevert = true a := mock.Alloc() a.DeploymentID = d.ID - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") // Upsert the job again to get a new version j2 := j.Copy() // Modify the job to make its specification different j2.Meta["foo"] = "bar" j2.Stable = false - assert.Nil(m.state.UpsertJob(m.nextIndex(), j2), "UpsertJob2") + require.Nil(m.state.UpsertJob(m.nextIndex(), j2), "UpsertJob2") - // Assert that we will get a update allocation call only once. This will + // require that we will get a update allocation call only once. This will // verify that the watcher is batching allocation changes m1 := matchUpdateAllocDesiredTransitions([]string{d.ID}) m.On("UpdateAllocDesiredTransition", mocker.MatchedBy(m1)).Return(nil).Once() - // Assert that we get a call to UpsertDeploymentStatusUpdate + // require that we get a call to UpsertDeploymentStatusUpdate c := &matchDeploymentStatusUpdateConfig{ DeploymentID: d.ID, Status: structs.DeploymentStatusFailed, @@ -738,7 +740,7 @@ func TestDeploymentWatcher_Watch_NoProgressDeadline(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) // Update the allocs health to healthy which should create an evaluation for i := 0; i < 5; i++ { @@ -748,7 +750,7 @@ func TestDeploymentWatcher_Watch_NoProgressDeadline(t *testing.T) { HealthyAllocationIDs: []string{a.ID}, }, } - assert.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req), "UpsertDeploymentAllocHealth") + require.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req), "UpsertDeploymentAllocHealth") } // Wait for there to be one eval @@ -776,7 +778,7 @@ func TestDeploymentWatcher_Watch_NoProgressDeadline(t *testing.T) { UnhealthyAllocationIDs: []string{a.ID}, }, } - assert.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req2), "UpsertDeploymentAllocHealth") + require.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req2), "UpsertDeploymentAllocHealth") // Wait for there to be one eval testutil.WaitForResult(func() (bool, error) { @@ -797,7 +799,7 @@ func TestDeploymentWatcher_Watch_NoProgressDeadline(t *testing.T) { m.AssertCalled(t, "UpdateAllocDesiredTransition", mocker.MatchedBy(m1)) - // After we upsert the job version will go to 2. So use this to assert the + // After we upsert the job version will go to 2. So use this to require the // original call happened. c2 := &matchDeploymentStatusUpdateConfig{ DeploymentID: d.ID, @@ -809,12 +811,12 @@ func TestDeploymentWatcher_Watch_NoProgressDeadline(t *testing.T) { m3 := matchDeploymentStatusUpdateRequest(c2) m.AssertCalled(t, "UpdateDeploymentStatus", mocker.MatchedBy(m3)) testutil.WaitForResult(func() (bool, error) { return 0 == len(w.watchers), nil }, - func(err error) { assert.Equal(0, len(w.watchers), "Should have no deployment") }) + func(err error) { require.Equal(0, len(w.watchers), "Should have no deployment") }) } func TestDeploymentWatcher_Watch_ProgressDeadline(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := testDeploymentWatcher(t, 1000.0, 1*time.Millisecond) // Create a job, alloc, and a deployment @@ -831,11 +833,11 @@ func TestDeploymentWatcher_Watch_ProgressDeadline(t *testing.T) { a.CreateTime = now.UnixNano() a.ModifyTime = now.UnixNano() a.DeploymentID = d.ID - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") - // Assert that we get a call to UpsertDeploymentStatusUpdate + // require that we get a call to UpsertDeploymentStatusUpdate c := &matchDeploymentStatusUpdateConfig{ DeploymentID: d.ID, Status: structs.DeploymentStatusFailed, @@ -847,15 +849,15 @@ func TestDeploymentWatcher_Watch_ProgressDeadline(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) - // Update the alloc to be unhealthy and assert that nothing happens. + // Update the alloc to be unhealthy and require that nothing happens. a2 := a.Copy() a2.DeploymentStatus = &structs.AllocDeploymentStatus{ Healthy: helper.BoolToPtr(false), Timestamp: now, } - assert.Nil(m.state.UpdateAllocsFromClient(100, []*structs.Allocation{a2})) + require.Nil(m.state.UpdateAllocsFromClient(100, []*structs.Allocation{a2})) // Wait for the deployment to be failed testutil.WaitForResult(func() (bool, error) { @@ -869,7 +871,7 @@ func TestDeploymentWatcher_Watch_ProgressDeadline(t *testing.T) { t.Fatal(err) }) - // Assert there are is only one evaluation + // require there are is only one evaluation testutil.WaitForResult(func() (bool, error) { ws := memdb.NewWatchSet() evals, err := m.state.EvalsByJob(ws, j.Namespace, j.ID) @@ -887,6 +889,107 @@ func TestDeploymentWatcher_Watch_ProgressDeadline(t *testing.T) { }) } +// Test that progress deadline handling works when there are multiple groups +func TestDeploymentWatcher_ProgressCutoff(t *testing.T) { + t.Parallel() + require := require.New(t) + w, m := testDeploymentWatcher(t, 1000.0, 1*time.Millisecond) + + // Create a job, alloc, and a deployment + j := mock.Job() + j.TaskGroups[0].Count = 1 + j.TaskGroups[0].Update = structs.DefaultUpdateStrategy.Copy() + j.TaskGroups[0].Update.ProgressDeadline = 500 * time.Millisecond + j.TaskGroups = append(j.TaskGroups, j.TaskGroups[0].Copy()) + j.TaskGroups[1].Name = "foo" + j.TaskGroups[1].Update.ProgressDeadline = 1 * time.Second + j.Stable = true + + d := mock.Deployment() + d.JobID = j.ID + d.TaskGroups["web"].DesiredTotal = 1 + d.TaskGroups["foo"] = d.TaskGroups["web"].Copy() + d.TaskGroups["web"].ProgressDeadline = 500 * time.Millisecond + d.TaskGroups["foo"].ProgressDeadline = 1 * time.Second + + a := mock.Alloc() + now := time.Now() + a.CreateTime = now.UnixNano() + a.ModifyTime = now.UnixNano() + a.DeploymentID = d.ID + + a2 := mock.Alloc() + a2.TaskGroup = "foo" + a2.CreateTime = now.UnixNano() + a2.ModifyTime = now.UnixNano() + a2.DeploymentID = d.ID + + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a, a2}), "UpsertAllocs") + + // We may get an update for the desired transition. + m1 := matchUpdateAllocDesiredTransitions([]string{d.ID}) + m.On("UpdateAllocDesiredTransition", mocker.MatchedBy(m1)).Return(nil).Once() + + w.SetEnabled(true, m.state) + testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) + + watcher, err := w.getOrCreateWatcher(d.ID) + require.NoError(err) + require.NotNil(watcher) + + d1, err := m.state.DeploymentByID(nil, d.ID) + require.NoError(err) + + done := watcher.doneGroups(d1) + require.Contains(done, "web") + require.False(done["web"]) + require.Contains(done, "foo") + require.False(done["foo"]) + + cutoff1 := watcher.getDeploymentProgressCutoff(d1) + require.False(cutoff1.IsZero()) + + // Update the first allocation to be healthy + a3 := a.Copy() + a3.DeploymentStatus = &structs.AllocDeploymentStatus{Healthy: helper.BoolToPtr(true)} + require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a3}), "UpsertAllocs") + + // Get the updated deployment + d2, err := m.state.DeploymentByID(nil, d.ID) + require.NoError(err) + + done = watcher.doneGroups(d2) + require.Contains(done, "web") + require.True(done["web"]) + require.Contains(done, "foo") + require.False(done["foo"]) + + cutoff2 := watcher.getDeploymentProgressCutoff(d2) + require.False(cutoff2.IsZero()) + require.True(cutoff1.UnixNano() < cutoff2.UnixNano()) + + // Update the second allocation to be healthy + a4 := a2.Copy() + a4.DeploymentStatus = &structs.AllocDeploymentStatus{Healthy: helper.BoolToPtr(true)} + require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a4}), "UpsertAllocs") + + // Get the updated deployment + d3, err := m.state.DeploymentByID(nil, d.ID) + require.NoError(err) + + done = watcher.doneGroups(d3) + require.Contains(done, "web") + require.True(done["web"]) + require.Contains(done, "foo") + require.True(done["foo"]) + + cutoff3 := watcher.getDeploymentProgressCutoff(d2) + require.True(cutoff3.IsZero()) +} + // Test that we will allow the progress deadline to be reached when the canaries // are healthy but we haven't promoted func TestDeploymentWatcher_Watch_ProgressDeadline_Canaries(t *testing.T) { @@ -915,7 +1018,7 @@ func TestDeploymentWatcher_Watch_ProgressDeadline_Canaries(t *testing.T) { require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") - // Assert that we will get a createEvaluation call only once. This will + // require that we will get a createEvaluation call only once. This will // verify that the watcher is batching allocation changes m1 := matchUpdateAllocDesiredTransitions([]string{d.ID}) m.On("UpdateAllocDesiredTransition", mocker.MatchedBy(m1)).Return(nil).Once() @@ -1056,7 +1159,7 @@ func TestDeploymentWatcher_PromotedCanary_UpdatedAllocs(t *testing.T) { // After the deployment is updated, a failed alloc's DesiredTransition should be set func TestDeploymentWatcher_Watch_StartWithoutProgressDeadline(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := testDeploymentWatcher(t, 1000.0, 1*time.Millisecond) // Create a job, and a deployment @@ -1068,18 +1171,18 @@ func TestDeploymentWatcher_Watch_StartWithoutProgressDeadline(t *testing.T) { d := mock.Deployment() d.JobID = j.ID - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") a := mock.Alloc() a.CreateTime = time.Now().UnixNano() a.DeploymentID = d.ID - assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") d.TaskGroups["web"].ProgressDeadline = 500 * time.Millisecond // Update the deployment with a progress deadline - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") // Match on DesiredTransition set to Reschedule for the failed alloc m1 := matchUpdateAllocDesiredTransitionReschedule([]string{a.ID}) @@ -1087,7 +1190,7 @@ func TestDeploymentWatcher_Watch_StartWithoutProgressDeadline(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) // Update the alloc to be unhealthy a2 := a.Copy() @@ -1095,7 +1198,7 @@ func TestDeploymentWatcher_Watch_StartWithoutProgressDeadline(t *testing.T) { Healthy: helper.BoolToPtr(false), Timestamp: time.Now(), } - assert.Nil(m.state.UpdateAllocsFromClient(m.nextIndex(), []*structs.Allocation{a2})) + require.Nil(m.state.UpdateAllocsFromClient(m.nextIndex(), []*structs.Allocation{a2})) // Wait for the alloc's DesiredState to set reschedule testutil.WaitForResult(func() (bool, error) { @@ -1114,7 +1217,7 @@ func TestDeploymentWatcher_Watch_StartWithoutProgressDeadline(t *testing.T) { // Tests that the watcher fails rollback when the spec hasn't changed func TestDeploymentWatcher_RollbackFailed(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := testDeploymentWatcher(t, 1000.0, 1*time.Millisecond) // Create a job, alloc, and a deployment @@ -1129,22 +1232,22 @@ func TestDeploymentWatcher_RollbackFailed(t *testing.T) { d.TaskGroups["web"].AutoRevert = true a := mock.Alloc() a.DeploymentID = d.ID - assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + require.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") // Upsert the job again to get a new version j2 := j.Copy() // Modify the job to make its specification different j2.Stable = false - assert.Nil(m.state.UpsertJob(m.nextIndex(), j2), "UpsertJob2") + require.Nil(m.state.UpsertJob(m.nextIndex(), j2), "UpsertJob2") - // Assert that we will get a createEvaluation call only once. This will + // require that we will get a createEvaluation call only once. This will // verify that the watcher is batching allocation changes m1 := matchUpdateAllocDesiredTransitions([]string{d.ID}) m.On("UpdateAllocDesiredTransition", mocker.MatchedBy(m1)).Return(nil).Once() - // Assert that we get a call to UpsertDeploymentStatusUpdate with roll back failed as the status + // require that we get a call to UpsertDeploymentStatusUpdate with roll back failed as the status c := &matchDeploymentStatusUpdateConfig{ DeploymentID: d.ID, Status: structs.DeploymentStatusFailed, @@ -1157,7 +1260,7 @@ func TestDeploymentWatcher_RollbackFailed(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, - func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + func(err error) { require.Equal(1, len(w.watchers), "Should have 1 deployment") }) // Update the allocs health to healthy which should create an evaluation for i := 0; i < 5; i++ { @@ -1167,7 +1270,7 @@ func TestDeploymentWatcher_RollbackFailed(t *testing.T) { HealthyAllocationIDs: []string{a.ID}, }, } - assert.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req), "UpsertDeploymentAllocHealth") + require.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req), "UpsertDeploymentAllocHealth") } // Wait for there to be one eval @@ -1195,7 +1298,7 @@ func TestDeploymentWatcher_RollbackFailed(t *testing.T) { UnhealthyAllocationIDs: []string{a.ID}, }, } - assert.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req2), "UpsertDeploymentAllocHealth") + require.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req2), "UpsertDeploymentAllocHealth") // Wait for there to be one eval testutil.WaitForResult(func() (bool, error) { @@ -1218,13 +1321,13 @@ func TestDeploymentWatcher_RollbackFailed(t *testing.T) { // verify that the job version hasn't changed after upsert m.state.JobByID(nil, structs.DefaultNamespace, j.ID) - assert.Equal(uint64(0), j.Version, "Expected job version 0 but got ", j.Version) + require.Equal(uint64(0), j.Version, "Expected job version 0 but got ", j.Version) } // Test allocation updates and evaluation creation is batched between watchers func TestWatcher_BatchAllocUpdates(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) w, m := testDeploymentWatcher(t, 1000.0, 1*time.Second) // Create a job, alloc, for two deployments @@ -1248,14 +1351,14 @@ func TestWatcher_BatchAllocUpdates(t *testing.T) { a2.JobID = j2.ID a2.DeploymentID = d2.ID - assert.Nil(m.state.UpsertJob(m.nextIndex(), j1), "UpsertJob") - assert.Nil(m.state.UpsertJob(m.nextIndex(), j2), "UpsertJob") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d1), "UpsertDeployment") - assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d2), "UpsertDeployment") - assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a1}), "UpsertAllocs") - assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a2}), "UpsertAllocs") + require.Nil(m.state.UpsertJob(m.nextIndex(), j1), "UpsertJob") + require.Nil(m.state.UpsertJob(m.nextIndex(), j2), "UpsertJob") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d1), "UpsertDeployment") + require.Nil(m.state.UpsertDeployment(m.nextIndex(), d2), "UpsertDeployment") + require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a1}), "UpsertAllocs") + require.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a2}), "UpsertAllocs") - // Assert that we will get a createEvaluation call only once and it contains + // require that we will get a createEvaluation call only once and it contains // both deployments. This will verify that the watcher is batching // allocation changes m1 := matchUpdateAllocDesiredTransitions([]string{d1.ID, d2.ID}) @@ -1263,7 +1366,7 @@ func TestWatcher_BatchAllocUpdates(t *testing.T) { w.SetEnabled(true, m.state) testutil.WaitForResult(func() (bool, error) { return 2 == len(w.watchers), nil }, - func(err error) { assert.Equal(2, len(w.watchers), "Should have 2 deployment") }) + func(err error) { require.Equal(2, len(w.watchers), "Should have 2 deployment") }) // Update the allocs health to healthy which should create an evaluation req := &structs.ApplyDeploymentAllocHealthRequest{ @@ -1272,7 +1375,7 @@ func TestWatcher_BatchAllocUpdates(t *testing.T) { HealthyAllocationIDs: []string{a1.ID}, }, } - assert.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req), "UpsertDeploymentAllocHealth") + require.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req), "UpsertDeploymentAllocHealth") req2 := &structs.ApplyDeploymentAllocHealthRequest{ DeploymentAllocHealthRequest: structs.DeploymentAllocHealthRequest{ @@ -1280,7 +1383,7 @@ func TestWatcher_BatchAllocUpdates(t *testing.T) { HealthyAllocationIDs: []string{a2.ID}, }, } - assert.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req2), "UpsertDeploymentAllocHealth") + require.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req2), "UpsertDeploymentAllocHealth") // Wait for there to be one eval for each job testutil.WaitForResult(func() (bool, error) { @@ -1310,5 +1413,5 @@ func TestWatcher_BatchAllocUpdates(t *testing.T) { m.AssertCalled(t, "UpdateAllocDesiredTransition", mocker.MatchedBy(m1)) testutil.WaitForResult(func() (bool, error) { return 2 == len(w.watchers), nil }, - func(err error) { assert.Equal(2, len(w.watchers), "Should have 2 deployment") }) + func(err error) { require.Equal(2, len(w.watchers), "Should have 2 deployment") }) } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 8885ebab420..3c79cc48295 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -7,9 +7,8 @@ import ( "time" log "github.com/hashicorp/go-hclog" - multierror "github.com/hashicorp/go-multierror" - "github.com/hashicorp/go-memdb" + multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" ) @@ -2695,7 +2694,7 @@ func (s *StateStore) UpdateDeploymentPromotion(index uint64, req *structs.ApplyD } // Ensure the canaries are healthy - if !alloc.DeploymentStatus.IsHealthy() { + if alloc.TerminalStatus() || !alloc.DeploymentStatus.IsHealthy() { continue } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 79cd8559f98..d3bfddc502e 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -5468,7 +5468,15 @@ func TestStateStore_UpsertDeploymentPromotion_Unhealthy(t *testing.T) { c2.DeploymentID = d.ID d.TaskGroups[c2.TaskGroup].PlacedCanaries = append(d.TaskGroups[c2.TaskGroup].PlacedCanaries, c2.ID) - require.Nil(state.UpsertAllocs(3, []*structs.Allocation{c1, c2})) + // Create a healthy but terminal alloc + c3 := mock.Alloc() + c3.JobID = j.ID + c3.DeploymentID = d.ID + c3.DesiredStatus = structs.AllocDesiredStatusStop + c3.DeploymentStatus = &structs.AllocDeploymentStatus{Healthy: helper.BoolToPtr(true)} + d.TaskGroups[c3.TaskGroup].PlacedCanaries = append(d.TaskGroups[c3.TaskGroup].PlacedCanaries, c3.ID) + + require.Nil(state.UpsertAllocs(3, []*structs.Allocation{c1, c2, c3})) // Promote the canaries req := &structs.ApplyDeploymentPromoteRequest{ From ccb7440316455ca6c4bb3d9a6f90a8e2dfb4b3f4 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 5 Nov 2018 16:39:09 -0800 Subject: [PATCH 2/3] more robust merging of the deployment status when getting updates from the client --- nomad/state/state_store.go | 23 ++++++++++++++++++----- nomad/state/state_store_test.go | 2 ++ 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 3c79cc48295..41c60090904 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1922,11 +1922,24 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, index uint64, a copyAlloc.ClientDescription = alloc.ClientDescription copyAlloc.TaskStates = alloc.TaskStates - // Merge the deployment status taking only what the client should set - oldDeploymentStatus := copyAlloc.DeploymentStatus - copyAlloc.DeploymentStatus = alloc.DeploymentStatus - if oldDeploymentStatus != nil && oldDeploymentStatus.Canary { - copyAlloc.DeploymentStatus.Canary = true + // The client can only set its deployment health and timestamp, so just take + // those + if copyAlloc.DeploymentStatus != nil && alloc.DeploymentStatus != nil { + oldHasHealthy := copyAlloc.DeploymentStatus.HasHealth() + newHasHealthy := alloc.DeploymentStatus.HasHealth() + + // We got new health information from the client + if newHasHealthy && (!oldHasHealthy || *copyAlloc.DeploymentStatus.Healthy != *alloc.DeploymentStatus.Healthy) { + // Updated deployment health and timestamp + copyAlloc.DeploymentStatus.Healthy = helper.BoolToPtr(*alloc.DeploymentStatus.Healthy) + copyAlloc.DeploymentStatus.Timestamp = alloc.DeploymentStatus.Timestamp + copyAlloc.DeploymentStatus.ModifyIndex = index + } + } else if alloc.DeploymentStatus != nil { + // First time getting a deployment status so copy everything and just + // set the index + copyAlloc.DeploymentStatus = alloc.DeploymentStatus.Copy() + copyAlloc.DeploymentStatus.ModifyIndex = index } // Update the modify index diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index d3bfddc502e..4ab83785812 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -3551,6 +3551,8 @@ func TestStateStore_UpdateAllocsFromClient_DeploymentStateMerges(t *testing.T) { require.Nil(err) require.NotNil(out) require.True(out.DeploymentStatus.Canary) + require.NotNil(out.DeploymentStatus.Healthy) + require.True(*out.DeploymentStatus.Healthy) } func TestStateStore_UpsertAlloc_Alloc(t *testing.T) { From ad1564976cba8d345dd024e2e5a3378c860245a9 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 8 Nov 2018 09:48:36 -0800 Subject: [PATCH 3/3] review fixes --- nomad/deploymentwatcher/deployment_watcher.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/nomad/deploymentwatcher/deployment_watcher.go b/nomad/deploymentwatcher/deployment_watcher.go index 55703c5a96f..c798cbcedb6 100644 --- a/nomad/deploymentwatcher/deployment_watcher.go +++ b/nomad/deploymentwatcher/deployment_watcher.go @@ -419,7 +419,12 @@ FAIL: } // If the next deadline is zero, we should not reset the timer - // as we aren't tracking towards a progress deadline yet. + // as we aren't tracking towards a progress deadline yet. This + // can happen if you have multiple task groups with progress + // deadlines and one of the task groups hasn't made any + // placements. As soon as the other task group finishes its + // rollout, the next progress deadline becomes zero, so we want + // to avoid resetting, causing a deployment failure. if !next.IsZero() { deadlineTimer.Reset(next.Sub(time.Now())) } @@ -855,6 +860,12 @@ func (w *deploymentWatcher) jobEvalStatus() (latestIndex uint64, err error) { return 0, err } + // If there are no evals for the job, return zero, since we want any + // allocation change to trigger an evaluation. + if len(evals) == 0 { + return 0, nil + } + var max uint64 for _, eval := range evals { // A cancelled eval never impacts what the scheduler has saw, so do not @@ -871,10 +882,5 @@ func (w *deploymentWatcher) jobEvalStatus() (latestIndex uint64, err error) { } } - if max == uint64(0) { - index, err := snap.Index("evals") - return index, err - } - return max, nil }