From 5720266c91c5d4cb00b650ac59224b99e99042a8 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Thu, 13 Aug 2020 09:35:09 -0400 Subject: [PATCH 1/5] Respect alloc job version for lost/failed allocs This change fixes a bug where lost/failed allocations are replaced by allocations with the latest versions, even if the version hasn't been promoted yet. Now, when generating a plan for lost/failed allocations, the scheduler first checks if the current deployment is in Canary stage, and if so, it ensures that any lost/failed allocations is replaced one with the latest promoted version instead. --- CHANGELOG.md | 4 + nomad/structs/structs.go | 10 +- scheduler/generic_sched.go | 67 +++++++++++++- scheduler/generic_sched_test.go | 159 ++++++++++++++++++++++++++++++++ scheduler/reconcile.go | 21 +++-- scheduler/reconcile_util.go | 13 +++ scheduler/scheduler.go | 6 ++ scheduler/stack.go | 8 ++ scheduler/system_sched.go | 2 +- scheduler/util.go | 2 +- 10 files changed, 278 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ed45fdb77b7..6065a82f550 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ IMPROVEMENTS: * api: Added node purge SDK functionality. [[GH-8142](https://github.com/hashicorp/nomad/issues/8142)] * driver/docker: Allow configurable image pull context timeout setting. [[GH-5718](https://github.com/hashicorp/nomad/issues/5718)] +BUG FIXES: + + * core: Fixed a bug where unpromoted job versions are used when rescheduling failed allocations [[GH-8691](https://github.com/hashicorp/nomad/issues/8691)] + ## 0.12.3 (August 13, 2020) BUG FIXES: diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 3a77d15dc6c..14dcc3c94a1 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -9830,12 +9830,18 @@ func (p *Plan) PopUpdate(alloc *Allocation) { } } -func (p *Plan) AppendAlloc(alloc *Allocation) { +// AppendAlloc appends the alloc to the plan allocations. +// To save space, it clears the Job field so it can be derived from the plan Job. +// If keepJob is true, the normalizatin skipped to accommodate cases where a plan +// needs to support multiple versions of the same job. +func (p *Plan) AppendAlloc(alloc *Allocation, keepJob bool) { node := alloc.NodeID existing := p.NodeAllocation[node] // Normalize the job - alloc.Job = nil + if !keepJob { + alloc.Job = nil + } p.NodeAllocation[node] = append(existing, alloc) } diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index a16b2955ece..bc48dbf5c91 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -2,6 +2,7 @@ package scheduler import ( "fmt" + "sort" "time" log "github.com/hashicorp/go-hclog" @@ -387,12 +388,12 @@ func (s *GenericScheduler) computeJobAllocs() error { update.DeploymentID = s.deployment.GetID() update.DeploymentStatus = nil } - s.ctx.Plan().AppendAlloc(update) + s.ctx.Plan().AppendAlloc(update, false) } // Handle the annotation updates for _, update := range results.attributeUpdates { - s.ctx.Plan().AppendAlloc(update) + s.ctx.Plan().AppendAlloc(update, false) } // Nothing remaining to do if placement is not required @@ -429,6 +430,32 @@ func (s *GenericScheduler) computeJobAllocs() error { return s.computePlacements(destructive, place) } +// downgradedJobForPlacement returns the job appropriate for non-canary placement replacement +func (s *GenericScheduler) downgradedJobForPlacement(p placementResult) (string, *structs.Job, error) { + ns, jobID := s.job.Namespace, s.job.ID + tgName := p.TaskGroup().Name + + // find deployments and use the latest promoted or canaried version + deployments, err := s.state.DeploymentsByJobID(nil, ns, jobID, false) + if err != nil { + return "", nil, fmt.Errorf("failed to lookup job deployments: %v", err) + } + sort.Slice(deployments, func(i, j int) bool { return deployments[i].JobVersion > deployments[i].JobVersion }) + for _, d := range deployments { + // It's unexpected to have a recent deployment that doesn't contain the TaskGroup; as all allocations + // should be destroyed. In such cases, attempt to find the deployment for that TaskGroup and hopefully + // we will kill it soon. This is a defensive measure, have not seen it in practice + // + // Zero dstate.DesiredCanaries indicates that the TaskGroup allocates were updated in-place without using canaries. + if dstate := d.TaskGroups[tgName]; dstate != nil && (dstate.Promoted || dstate.DesiredCanaries == 0) { + job, err := s.state.JobByIDAndVersion(nil, ns, jobID, d.JobVersion) + return d.ID, job, err + } + } + + return "", nil, nil +} + // computePlacements computes placements for allocations. It is given the set of // destructive updates to place and the set of new placements to place. func (s *GenericScheduler) computePlacements(destructive, place []placementResult) error { @@ -457,6 +484,31 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul // Get the task group tg := missing.TaskGroup() + var downgradedJob *structs.Job + + if missing.DowngradeNonCanary() { + jobDeploymentID, job, err := s.downgradedJobForPlacement(missing) + if err != nil { + return err + } + + // Defensive check - if there is no appropriate deployment for this job, use the latest + if job != nil && job.Version >= missing.MinJobVersion() && job.LookupTaskGroup(tg.Name) != nil { + tg = job.LookupTaskGroup(tg.Name) + downgradedJob = job + deploymentID = jobDeploymentID + + // ensure we are operating on the correct job + s.stack.SetJob(job) + } else { + jobVersion := -1 + if job != nil { + jobVersion = int(job.Version) + } + s.logger.Warn("failed to find appropriate job; using the latest", "expected_version", missing.MinJobVersion, "found_version", jobVersion) + } + } + // Check if this task group has already failed if metric, ok := s.failedTGAllocs[tg.Name]; ok { metric.CoalescedFailures += 1 @@ -489,6 +541,11 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul // Compute top K scoring node metadata s.ctx.Metrics().PopulateScoreMetaData() + // restore stack to use the latest job version again + if downgradedJob != nil { + s.stack.SetJob(s.job) + } + // Set fields based on if we found an allocation option if option != nil { resources := &structs.AllocatedResources{ @@ -544,10 +601,14 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul } } + if downgradedJob != nil { + alloc.Job = downgradedJob + } + s.handlePreemptions(option, alloc, missing) // Track the placement - s.plan.AppendAlloc(alloc) + s.plan.AppendAlloc(alloc, downgradedJob != nil) } else { // Lazy initialize the failed map diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 96d623b7ca3..e8dc50e6944 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -5342,3 +5342,162 @@ func TestServiceSched_Preemption(t *testing.T) { } require.Equal(expectedPreemptedAllocs, actualPreemptedAllocs) } + +// TestServiceSched_Migrate_CanaryStatus asserts that migrations/rescheduling +// of allocations use the proper versions of allocs rather than latest: +// Canaries should be replaced by canaries, and non-canaries should be replaced +// with the latest promoted version. +func TestServiceSched_Migrate_CanaryStatus(t *testing.T) { + h := NewHarness(t) + + node1 := mock.Node() + require.NoError(t, h.State.UpsertNode(h.NextIndex(), node1)) + + totalCount := 3 + desiredCanaries := 1 + + job := mock.Job() + job.Stable = true + job.TaskGroups[0].Count = totalCount + job.TaskGroups[0].Update = &structs.UpdateStrategy{ + MaxParallel: 1, + Canary: desiredCanaries, + } + require.NoError(t, h.State.UpsertJob(h.NextIndex(), job)) + + deployment := &structs.Deployment{ + ID: uuid.Generate(), + JobID: job.ID, + Namespace: job.Namespace, + JobVersion: job.Version, + JobModifyIndex: job.JobModifyIndex, + JobCreateIndex: job.CreateIndex, + TaskGroups: map[string]*structs.DeploymentState{ + "web": {DesiredTotal: totalCount}, + }, + Status: structs.DeploymentStatusSuccessful, + StatusDescription: structs.DeploymentStatusDescriptionSuccessful, + } + require.NoError(t, h.State.UpsertDeployment(h.NextIndex(), deployment)) + + var allocs []*structs.Allocation + for i := 0; i < 3; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node1.ID + alloc.DeploymentID = deployment.ID + alloc.Name = fmt.Sprintf("my-job.web[%d]", i) + allocs = append(allocs, alloc) + } + require.NoError(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // new update with new task group + job2 := job.Copy() + job2.Stable = false + job2.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other" + require.NoError(t, h.State.UpsertJob(h.NextIndex(), job2)) + + // Create a mock evaluation + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: 50, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + require.NoError(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + require.NoError(t, err) + + // Ensure a single plan + require.Len(t, h.Plans, 1) + plan := h.Plans[0] + + // Ensure a deployment was created + require.NotNil(t, plan.Deployment) + updateDeployment := plan.Deployment.ID + + // Check status first - should be 4 allocs, only one is canary + { + ws := memdb.NewWatchSet() + allocs, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, true) + require.NoError(t, err) + require.Len(t, allocs, 4) + + sort.Slice(allocs, func(i, j int) bool { return allocs[i].CreateIndex < allocs[j].CreateIndex }) + + for _, a := range allocs[:3] { + require.Equal(t, structs.AllocDesiredStatusRun, a.DesiredStatus) + require.Equal(t, uint64(0), a.Job.Version) + require.False(t, a.DeploymentStatus.IsCanary()) + require.Equal(t, node1.ID, a.NodeID) + require.Equal(t, deployment.ID, a.DeploymentID) + } + require.Equal(t, structs.AllocDesiredStatusRun, allocs[3].DesiredStatus) + require.Equal(t, uint64(1), allocs[3].Job.Version) + require.True(t, allocs[3].DeploymentStatus.Canary) + require.Equal(t, node1.ID, allocs[3].NodeID) + require.Equal(t, updateDeployment, allocs[3].DeploymentID) + } + + // now, drain node1 and ensure all are migrated to node2 + node1 = node1.Copy() + node1.Status = structs.NodeStatusDown + require.NoError(t, h.State.UpsertNode(h.NextIndex(), node1)) + + node2 := mock.Node() + require.NoError(t, h.State.UpsertNode(h.NextIndex(), node2)) + + neval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: 50, + TriggeredBy: structs.EvalTriggerNodeUpdate, + NodeID: node1.ID, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + require.NoError(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{neval})) + + // Process the evaluation + err = h.Process(NewServiceScheduler, eval) + require.NoError(t, err) + + // Now test that all node1 allocs are migrated while preserving Version and Canary info + { + ws := memdb.NewWatchSet() + allocs, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, true) + require.NoError(t, err) + require.Len(t, allocs, 8) + + nodeAllocs := map[string][]*structs.Allocation{} + for _, a := range allocs { + nodeAllocs[a.NodeID] = append(nodeAllocs[a.NodeID], a) + } + + require.Len(t, nodeAllocs[node1.ID], 4) + for _, a := range nodeAllocs[node1.ID] { + require.Equal(t, structs.AllocDesiredStatusStop, a.DesiredStatus) + require.Equal(t, node1.ID, a.NodeID) + } + + node2Allocs := nodeAllocs[node2.ID] + require.Len(t, node2Allocs, 4) + sort.Slice(node2Allocs, func(i, j int) bool { return node2Allocs[i].Job.Version < node2Allocs[j].Job.Version }) + + for _, a := range node2Allocs[:3] { + require.Equal(t, structs.AllocDesiredStatusRun, a.DesiredStatus) + require.Equal(t, uint64(0), a.Job.Version) + require.Equal(t, node2.ID, a.NodeID) + require.Equal(t, deployment.ID, a.DeploymentID) + } + require.Equal(t, structs.AllocDesiredStatusRun, node2Allocs[3].DesiredStatus) + require.Equal(t, uint64(1), node2Allocs[3].Job.Version) + require.Equal(t, node2.ID, node2Allocs[3].NodeID) + require.Equal(t, updateDeployment, node2Allocs[3].DeploymentID) + } +} diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index 31bed702d60..e46f7cb3d8a 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -424,10 +424,10 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { // The fact that we have destructive updates and have less canaries than is // desired means we need to create canaries - numDestructive := len(destructive) strategy := tg.Update canariesPromoted := dstate != nil && dstate.Promoted - requireCanary := numDestructive != 0 && strategy != nil && len(canaries) < strategy.Canary && !canariesPromoted + requireCanary := (len(destructive) != 0 || (len(untainted) == 0 && len(migrate)+len(lost) != 0)) && + strategy != nil && len(canaries) < strategy.Canary && !canariesPromoted if requireCanary { dstate.DesiredCanaries = strategy.Canary } @@ -455,7 +455,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { // * There is no delayed stop_after_client_disconnect alloc, which delays scheduling for the whole group var place []allocPlaceResult if len(lostLater) == 0 { - place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow) + place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, canaryState) if !existingDeployment { dstate.DesiredTotal += len(place) } @@ -533,9 +533,12 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { }) a.result.place = append(a.result.place, allocPlaceResult{ name: alloc.Name, - canary: false, + canary: alloc.DeploymentStatus.IsCanary(), taskGroup: tg, previousAlloc: alloc, + + downgradeNonCanary: canaryState && !alloc.DeploymentStatus.IsCanary(), + minJobVersion: alloc.Job.Version, }) } @@ -708,7 +711,7 @@ func (a *allocReconciler) computeLimit(group *structs.TaskGroup, untainted, dest // computePlacement returns the set of allocations to place given the group // definition, the set of untainted, migrating and reschedule allocations for the group. func (a *allocReconciler) computePlacements(group *structs.TaskGroup, - nameIndex *allocNameIndex, untainted, migrate allocSet, reschedule allocSet) []allocPlaceResult { + nameIndex *allocNameIndex, untainted, migrate allocSet, reschedule allocSet, canaryState bool) []allocPlaceResult { // Add rescheduled placement results var place []allocPlaceResult @@ -719,6 +722,9 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup, previousAlloc: alloc, reschedule: true, canary: alloc.DeploymentStatus.IsCanary(), + + downgradeNonCanary: canaryState && (alloc.DeploymentStatus == nil || !alloc.DeploymentStatus.IsCanary()), + minJobVersion: alloc.Job.Version, }) } @@ -732,8 +738,9 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup, if existing < group.Count { for _, name := range nameIndex.Next(uint(group.Count - existing)) { place = append(place, allocPlaceResult{ - name: name, - taskGroup: group, + name: name, + taskGroup: group, + downgradeNonCanary: canaryState, }) } } diff --git a/scheduler/reconcile_util.go b/scheduler/reconcile_util.go index 4c50b25c525..d3080563860 100644 --- a/scheduler/reconcile_util.go +++ b/scheduler/reconcile_util.go @@ -34,6 +34,12 @@ type placementResult interface { // StopPreviousAlloc returns whether the previous allocation should be // stopped and if so the status description. StopPreviousAlloc() (bool, string) + + // DowngradeNonCanary indicates that placement should use the latest stable job + // with the MinJobVersion, rather than the current deployment version + DowngradeNonCanary() bool + + MinJobVersion() uint64 } // allocStopResult contains the information required to stop a single allocation @@ -52,6 +58,9 @@ type allocPlaceResult struct { taskGroup *structs.TaskGroup previousAlloc *structs.Allocation reschedule bool + + downgradeNonCanary bool + minJobVersion uint64 } func (a allocPlaceResult) TaskGroup() *structs.TaskGroup { return a.taskGroup } @@ -60,6 +69,8 @@ func (a allocPlaceResult) Canary() bool { return a.ca func (a allocPlaceResult) PreviousAllocation() *structs.Allocation { return a.previousAlloc } func (a allocPlaceResult) IsRescheduling() bool { return a.reschedule } func (a allocPlaceResult) StopPreviousAlloc() (bool, string) { return false, "" } +func (a allocPlaceResult) DowngradeNonCanary() bool { return a.downgradeNonCanary } +func (a allocPlaceResult) MinJobVersion() uint64 { return a.minJobVersion } // allocDestructiveResult contains the information required to do a destructive // update. Destructive changes should be applied atomically, as in the old alloc @@ -79,6 +90,8 @@ func (a allocDestructiveResult) IsRescheduling() bool { retur func (a allocDestructiveResult) StopPreviousAlloc() (bool, string) { return true, a.stopStatusDescription } +func (a allocDestructiveResult) DowngradeNonCanary() bool { return false } +func (a allocDestructiveResult) MinJobVersion() uint64 { return 0 } // allocMatrix is a mapping of task groups to their allocation set. type allocMatrix map[string]allocSet diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 0fc51635737..a950690db44 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -88,6 +88,12 @@ type State interface { // GetJobByID is used to lookup a job by ID JobByID(ws memdb.WatchSet, namespace, id string) (*structs.Job, error) + // DeploymentsByJobID returns the deployments associated with the job + DeploymentsByJobID(ws memdb.WatchSet, namespace, jobID string, all bool) ([]*structs.Deployment, error) + + // JobByIDAndVersion returns the job associated with id and specific version + JobByIDAndVersion(ws memdb.WatchSet, namespace, id string, version uint64) (*structs.Job, error) + // LatestDeploymentByJobID returns the latest deployment matching the given // job ID LatestDeploymentByJobID(ws memdb.WatchSet, namespace, jobID string) (*structs.Deployment, error) diff --git a/scheduler/stack.go b/scheduler/stack.go index 91b7c752efd..bccabc7899a 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -46,6 +46,7 @@ type GenericStack struct { wrappedChecks *FeasibilityWrapper quota FeasibleIterator + jobVersion *uint64 jobConstraint *ConstraintChecker taskGroupDrivers *DriverChecker taskGroupConstraint *ConstraintChecker @@ -89,6 +90,13 @@ func (s *GenericStack) SetNodes(baseNodes []*structs.Node) { } func (s *GenericStack) SetJob(job *structs.Job) { + if s.jobVersion != nil && *s.jobVersion == job.Version { + return + } + + jobVer := job.Version + s.jobVersion = &jobVer + s.jobConstraint.SetConstraints(job.Constraints) s.distinctHostsConstraint.SetJob(job) s.distinctPropertyConstraint.SetJob(job) diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index 2c179de0567..c0fe396b746 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -398,7 +398,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { alloc.PreemptedAllocations = preemptedAllocIDs } - s.plan.AppendAlloc(alloc) + s.plan.AppendAlloc(alloc, false) } return nil diff --git a/scheduler/util.go b/scheduler/util.go index a3f4565754d..b465efb190b 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -655,7 +655,7 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job, }, } newAlloc.Metrics = ctx.Metrics() - ctx.Plan().AppendAlloc(newAlloc) + ctx.Plan().AppendAlloc(newAlloc, false) // Remove this allocation from the slice doInplace(&i, &n, &inplaceCount) From cb038b1a8cceed14c6bc353f10bd5334497f886d Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Tue, 25 Aug 2020 17:09:21 -0400 Subject: [PATCH 2/5] Have Plan.AppendAlloc accept the job --- nomad/structs/structs.go | 12 ++++-------- scheduler/generic_sched.go | 10 +++------- scheduler/system_sched.go | 2 +- scheduler/util.go | 2 +- 4 files changed, 9 insertions(+), 17 deletions(-) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 14dcc3c94a1..0e501219398 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -9831,17 +9831,13 @@ func (p *Plan) PopUpdate(alloc *Allocation) { } // AppendAlloc appends the alloc to the plan allocations. -// To save space, it clears the Job field so it can be derived from the plan Job. -// If keepJob is true, the normalizatin skipped to accommodate cases where a plan -// needs to support multiple versions of the same job. -func (p *Plan) AppendAlloc(alloc *Allocation, keepJob bool) { +// Uses the passed job if explicitly passed, otherwise +// it is assumed the alloc will use the plan Job version. +func (p *Plan) AppendAlloc(alloc *Allocation, job *Job) { node := alloc.NodeID existing := p.NodeAllocation[node] - // Normalize the job - if !keepJob { - alloc.Job = nil - } + alloc.Job = job p.NodeAllocation[node] = append(existing, alloc) } diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index bc48dbf5c91..ee51fcb6a3d 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -388,12 +388,12 @@ func (s *GenericScheduler) computeJobAllocs() error { update.DeploymentID = s.deployment.GetID() update.DeploymentStatus = nil } - s.ctx.Plan().AppendAlloc(update, false) + s.ctx.Plan().AppendAlloc(update, nil) } // Handle the annotation updates for _, update := range results.attributeUpdates { - s.ctx.Plan().AppendAlloc(update, false) + s.ctx.Plan().AppendAlloc(update, nil) } // Nothing remaining to do if placement is not required @@ -601,14 +601,10 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul } } - if downgradedJob != nil { - alloc.Job = downgradedJob - } - s.handlePreemptions(option, alloc, missing) // Track the placement - s.plan.AppendAlloc(alloc, downgradedJob != nil) + s.plan.AppendAlloc(alloc, downgradedJob) } else { // Lazy initialize the failed map diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index c0fe396b746..f8088b02a1b 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -398,7 +398,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { alloc.PreemptedAllocations = preemptedAllocIDs } - s.plan.AppendAlloc(alloc, false) + s.plan.AppendAlloc(alloc, nil) } return nil diff --git a/scheduler/util.go b/scheduler/util.go index b465efb190b..1799ede53e3 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -655,7 +655,7 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job, }, } newAlloc.Metrics = ctx.Metrics() - ctx.Plan().AppendAlloc(newAlloc, false) + ctx.Plan().AppendAlloc(newAlloc, nil) // Remove this allocation from the slice doInplace(&i, &n, &inplaceCount) From 92bb3728c9f2b63c32cbbeff079a424b3cd4eb62 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Tue, 25 Aug 2020 17:23:20 -0400 Subject: [PATCH 3/5] tweak stack job manipulation To address review comments --- scheduler/generic_sched.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index ee51fcb6a3d..0b089a2c57d 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -497,15 +497,12 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul tg = job.LookupTaskGroup(tg.Name) downgradedJob = job deploymentID = jobDeploymentID - - // ensure we are operating on the correct job - s.stack.SetJob(job) } else { jobVersion := -1 if job != nil { jobVersion = int(job.Version) } - s.logger.Warn("failed to find appropriate job; using the latest", "expected_version", missing.MinJobVersion, "found_version", jobVersion) + s.logger.Debug("failed to find appropriate job; using the latest", "expected_version", missing.MinJobVersion, "found_version", jobVersion) } } @@ -515,6 +512,12 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul continue } + // Use downgraded job in scheduling stack to honor + // old job resources and constraints + if downgradedJob != nil { + s.stack.SetJob(downgradedJob) + } + // Find the preferred node preferredNode, err := s.findPreferredNode(missing) if err != nil { @@ -541,7 +544,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul // Compute top K scoring node metadata s.ctx.Metrics().PopulateScoreMetaData() - // restore stack to use the latest job version again + // Restore stack job now that placement is done, to use plan job version if downgradedJob != nil { s.stack.SetJob(s.job) } From 3a28b85b8a9c283bcb1180a95eb677cda07f7712 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Tue, 25 Aug 2020 17:30:43 -0400 Subject: [PATCH 4/5] simplify canary check `(alloc.DeploymentStatus == nil || !alloc.DeploymentStatus.IsCanary())` and `!alloc.DeploymentStatus.IsCanary()` are equivalent. --- scheduler/reconcile.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index e46f7cb3d8a..b906c6c9bbb 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -723,7 +723,7 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup, reschedule: true, canary: alloc.DeploymentStatus.IsCanary(), - downgradeNonCanary: canaryState && (alloc.DeploymentStatus == nil || !alloc.DeploymentStatus.IsCanary()), + downgradeNonCanary: canaryState && !alloc.DeploymentStatus.IsCanary(), minJobVersion: alloc.Job.Version, }) } From f075bcc811677e6a30f6cedcc2da36cb6b714910 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Tue, 25 Aug 2020 17:33:15 -0400 Subject: [PATCH 5/5] Update scheduler/reconcile.go Co-authored-by: Chris Baker <1675087+cgbaker@users.noreply.github.com> --- scheduler/reconcile.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index b906c6c9bbb..029fe3b3124 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -426,7 +426,8 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { // desired means we need to create canaries strategy := tg.Update canariesPromoted := dstate != nil && dstate.Promoted - requireCanary := (len(destructive) != 0 || (len(untainted) == 0 && len(migrate)+len(lost) != 0)) && + replaceAllAllocs := len(untainted) == 0 && len(migrate)+len(lost) != 0 + requireCanary := (len(destructive) != 0 || replaceAllAllocs) && strategy != nil && len(canaries) < strategy.Canary && !canariesPromoted if requireCanary { dstate.DesiredCanaries = strategy.Canary