Skip to content

Commit

Permalink
Merge pull request #8691 from hashicorp/b-reschedule-job-versions
Browse files Browse the repository at this point in the history
Respect alloc job version for lost/failed allocs
  • Loading branch information
Mahmood Ali authored Aug 25, 2020
2 parents 3e3dff6 + f075bcc commit 1afd415
Show file tree
Hide file tree
Showing 10 changed files with 275 additions and 15 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ IMPROVEMENTS:
* api: Added node purge SDK functionality. [[GH-8142](https://github.com/hashicorp/nomad/issues/8142)]
* driver/docker: Allow configurable image pull context timeout setting. [[GH-5718](https://github.com/hashicorp/nomad/issues/5718)]

BUG FIXES:

* core: Fixed a bug where unpromoted job versions are used when rescheduling failed allocations [[GH-8691](https://github.com/hashicorp/nomad/issues/8691)]

## 0.12.3 (August 13, 2020)

BUG FIXES:
Expand Down
8 changes: 5 additions & 3 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9850,12 +9850,14 @@ func (p *Plan) PopUpdate(alloc *Allocation) {
}
}

func (p *Plan) AppendAlloc(alloc *Allocation) {
// AppendAlloc appends the alloc to the plan allocations.
// Uses the passed job if explicitly passed, otherwise
// it is assumed the alloc will use the plan Job version.
func (p *Plan) AppendAlloc(alloc *Allocation, job *Job) {
node := alloc.NodeID
existing := p.NodeAllocation[node]

// Normalize the job
alloc.Job = nil
alloc.Job = job

p.NodeAllocation[node] = append(existing, alloc)
}
Expand Down
66 changes: 63 additions & 3 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package scheduler

import (
"fmt"
"sort"
"time"

log "github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -387,12 +388,12 @@ func (s *GenericScheduler) computeJobAllocs() error {
update.DeploymentID = s.deployment.GetID()
update.DeploymentStatus = nil
}
s.ctx.Plan().AppendAlloc(update)
s.ctx.Plan().AppendAlloc(update, nil)
}

// Handle the annotation updates
for _, update := range results.attributeUpdates {
s.ctx.Plan().AppendAlloc(update)
s.ctx.Plan().AppendAlloc(update, nil)
}

// Nothing remaining to do if placement is not required
Expand Down Expand Up @@ -429,6 +430,32 @@ func (s *GenericScheduler) computeJobAllocs() error {
return s.computePlacements(destructive, place)
}

// downgradedJobForPlacement returns the job appropriate for non-canary placement replacement
func (s *GenericScheduler) downgradedJobForPlacement(p placementResult) (string, *structs.Job, error) {
ns, jobID := s.job.Namespace, s.job.ID
tgName := p.TaskGroup().Name

// find deployments and use the latest promoted or canaried version
deployments, err := s.state.DeploymentsByJobID(nil, ns, jobID, false)
if err != nil {
return "", nil, fmt.Errorf("failed to lookup job deployments: %v", err)
}
sort.Slice(deployments, func(i, j int) bool { return deployments[i].JobVersion > deployments[i].JobVersion })
for _, d := range deployments {
// It's unexpected to have a recent deployment that doesn't contain the TaskGroup; as all allocations
// should be destroyed. In such cases, attempt to find the deployment for that TaskGroup and hopefully
// we will kill it soon. This is a defensive measure, have not seen it in practice
//
// Zero dstate.DesiredCanaries indicates that the TaskGroup allocates were updated in-place without using canaries.
if dstate := d.TaskGroups[tgName]; dstate != nil && (dstate.Promoted || dstate.DesiredCanaries == 0) {
job, err := s.state.JobByIDAndVersion(nil, ns, jobID, d.JobVersion)
return d.ID, job, err
}
}

return "", nil, nil
}

// computePlacements computes placements for allocations. It is given the set of
// destructive updates to place and the set of new placements to place.
func (s *GenericScheduler) computePlacements(destructive, place []placementResult) error {
Expand Down Expand Up @@ -457,12 +484,40 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
// Get the task group
tg := missing.TaskGroup()

var downgradedJob *structs.Job

if missing.DowngradeNonCanary() {
jobDeploymentID, job, err := s.downgradedJobForPlacement(missing)
if err != nil {
return err
}

// Defensive check - if there is no appropriate deployment for this job, use the latest
if job != nil && job.Version >= missing.MinJobVersion() && job.LookupTaskGroup(tg.Name) != nil {
tg = job.LookupTaskGroup(tg.Name)
downgradedJob = job
deploymentID = jobDeploymentID
} else {
jobVersion := -1
if job != nil {
jobVersion = int(job.Version)
}
s.logger.Debug("failed to find appropriate job; using the latest", "expected_version", missing.MinJobVersion, "found_version", jobVersion)
}
}

// Check if this task group has already failed
if metric, ok := s.failedTGAllocs[tg.Name]; ok {
metric.CoalescedFailures += 1
continue
}

// Use downgraded job in scheduling stack to honor
// old job resources and constraints
if downgradedJob != nil {
s.stack.SetJob(downgradedJob)
}

// Find the preferred node
preferredNode, err := s.findPreferredNode(missing)
if err != nil {
Expand All @@ -489,6 +544,11 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
// Compute top K scoring node metadata
s.ctx.Metrics().PopulateScoreMetaData()

// Restore stack job now that placement is done, to use plan job version
if downgradedJob != nil {
s.stack.SetJob(s.job)
}

// Set fields based on if we found an allocation option
if option != nil {
resources := &structs.AllocatedResources{
Expand Down Expand Up @@ -547,7 +607,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
s.handlePreemptions(option, alloc, missing)

// Track the placement
s.plan.AppendAlloc(alloc)
s.plan.AppendAlloc(alloc, downgradedJob)

} else {
// Lazy initialize the failed map
Expand Down
159 changes: 159 additions & 0 deletions scheduler/generic_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5342,3 +5342,162 @@ func TestServiceSched_Preemption(t *testing.T) {
}
require.Equal(expectedPreemptedAllocs, actualPreemptedAllocs)
}

// TestServiceSched_Migrate_CanaryStatus asserts that migrations/rescheduling
// of allocations use the proper versions of allocs rather than latest:
// Canaries should be replaced by canaries, and non-canaries should be replaced
// with the latest promoted version.
func TestServiceSched_Migrate_CanaryStatus(t *testing.T) {
h := NewHarness(t)

node1 := mock.Node()
require.NoError(t, h.State.UpsertNode(h.NextIndex(), node1))

totalCount := 3
desiredCanaries := 1

job := mock.Job()
job.Stable = true
job.TaskGroups[0].Count = totalCount
job.TaskGroups[0].Update = &structs.UpdateStrategy{
MaxParallel: 1,
Canary: desiredCanaries,
}
require.NoError(t, h.State.UpsertJob(h.NextIndex(), job))

deployment := &structs.Deployment{
ID: uuid.Generate(),
JobID: job.ID,
Namespace: job.Namespace,
JobVersion: job.Version,
JobModifyIndex: job.JobModifyIndex,
JobCreateIndex: job.CreateIndex,
TaskGroups: map[string]*structs.DeploymentState{
"web": {DesiredTotal: totalCount},
},
Status: structs.DeploymentStatusSuccessful,
StatusDescription: structs.DeploymentStatusDescriptionSuccessful,
}
require.NoError(t, h.State.UpsertDeployment(h.NextIndex(), deployment))

var allocs []*structs.Allocation
for i := 0; i < 3; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node1.ID
alloc.DeploymentID = deployment.ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
allocs = append(allocs, alloc)
}
require.NoError(t, h.State.UpsertAllocs(h.NextIndex(), allocs))

// new update with new task group
job2 := job.Copy()
job2.Stable = false
job2.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other"
require.NoError(t, h.State.UpsertJob(h.NextIndex(), job2))

// Create a mock evaluation
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval}))

// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
require.NoError(t, err)

// Ensure a single plan
require.Len(t, h.Plans, 1)
plan := h.Plans[0]

// Ensure a deployment was created
require.NotNil(t, plan.Deployment)
updateDeployment := plan.Deployment.ID

// Check status first - should be 4 allocs, only one is canary
{
ws := memdb.NewWatchSet()
allocs, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, true)
require.NoError(t, err)
require.Len(t, allocs, 4)

sort.Slice(allocs, func(i, j int) bool { return allocs[i].CreateIndex < allocs[j].CreateIndex })

for _, a := range allocs[:3] {
require.Equal(t, structs.AllocDesiredStatusRun, a.DesiredStatus)
require.Equal(t, uint64(0), a.Job.Version)
require.False(t, a.DeploymentStatus.IsCanary())
require.Equal(t, node1.ID, a.NodeID)
require.Equal(t, deployment.ID, a.DeploymentID)
}
require.Equal(t, structs.AllocDesiredStatusRun, allocs[3].DesiredStatus)
require.Equal(t, uint64(1), allocs[3].Job.Version)
require.True(t, allocs[3].DeploymentStatus.Canary)
require.Equal(t, node1.ID, allocs[3].NodeID)
require.Equal(t, updateDeployment, allocs[3].DeploymentID)
}

// now, drain node1 and ensure all are migrated to node2
node1 = node1.Copy()
node1.Status = structs.NodeStatusDown
require.NoError(t, h.State.UpsertNode(h.NextIndex(), node1))

node2 := mock.Node()
require.NoError(t, h.State.UpsertNode(h.NextIndex(), node2))

neval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
NodeID: node1.ID,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{neval}))

// Process the evaluation
err = h.Process(NewServiceScheduler, eval)
require.NoError(t, err)

// Now test that all node1 allocs are migrated while preserving Version and Canary info
{
ws := memdb.NewWatchSet()
allocs, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, true)
require.NoError(t, err)
require.Len(t, allocs, 8)

nodeAllocs := map[string][]*structs.Allocation{}
for _, a := range allocs {
nodeAllocs[a.NodeID] = append(nodeAllocs[a.NodeID], a)
}

require.Len(t, nodeAllocs[node1.ID], 4)
for _, a := range nodeAllocs[node1.ID] {
require.Equal(t, structs.AllocDesiredStatusStop, a.DesiredStatus)
require.Equal(t, node1.ID, a.NodeID)
}

node2Allocs := nodeAllocs[node2.ID]
require.Len(t, node2Allocs, 4)
sort.Slice(node2Allocs, func(i, j int) bool { return node2Allocs[i].Job.Version < node2Allocs[j].Job.Version })

for _, a := range node2Allocs[:3] {
require.Equal(t, structs.AllocDesiredStatusRun, a.DesiredStatus)
require.Equal(t, uint64(0), a.Job.Version)
require.Equal(t, node2.ID, a.NodeID)
require.Equal(t, deployment.ID, a.DeploymentID)
}
require.Equal(t, structs.AllocDesiredStatusRun, node2Allocs[3].DesiredStatus)
require.Equal(t, uint64(1), node2Allocs[3].Job.Version)
require.Equal(t, node2.ID, node2Allocs[3].NodeID)
require.Equal(t, updateDeployment, node2Allocs[3].DeploymentID)
}
}
22 changes: 15 additions & 7 deletions scheduler/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,10 +424,11 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {

// The fact that we have destructive updates and have less canaries than is
// desired means we need to create canaries
numDestructive := len(destructive)
strategy := tg.Update
canariesPromoted := dstate != nil && dstate.Promoted
requireCanary := numDestructive != 0 && strategy != nil && len(canaries) < strategy.Canary && !canariesPromoted
replaceAllAllocs := len(untainted) == 0 && len(migrate)+len(lost) != 0
requireCanary := (len(destructive) != 0 || replaceAllAllocs) &&
strategy != nil && len(canaries) < strategy.Canary && !canariesPromoted
if requireCanary {
dstate.DesiredCanaries = strategy.Canary
}
Expand Down Expand Up @@ -455,7 +456,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
// * There is no delayed stop_after_client_disconnect alloc, which delays scheduling for the whole group
var place []allocPlaceResult
if len(lostLater) == 0 {
place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow)
place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, canaryState)
if !existingDeployment {
dstate.DesiredTotal += len(place)
}
Expand Down Expand Up @@ -533,9 +534,12 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
})
a.result.place = append(a.result.place, allocPlaceResult{
name: alloc.Name,
canary: false,
canary: alloc.DeploymentStatus.IsCanary(),
taskGroup: tg,
previousAlloc: alloc,

downgradeNonCanary: canaryState && !alloc.DeploymentStatus.IsCanary(),
minJobVersion: alloc.Job.Version,
})
}

Expand Down Expand Up @@ -708,7 +712,7 @@ func (a *allocReconciler) computeLimit(group *structs.TaskGroup, untainted, dest
// computePlacement returns the set of allocations to place given the group
// definition, the set of untainted, migrating and reschedule allocations for the group.
func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
nameIndex *allocNameIndex, untainted, migrate allocSet, reschedule allocSet) []allocPlaceResult {
nameIndex *allocNameIndex, untainted, migrate allocSet, reschedule allocSet, canaryState bool) []allocPlaceResult {

// Add rescheduled placement results
var place []allocPlaceResult
Expand All @@ -719,6 +723,9 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
previousAlloc: alloc,
reschedule: true,
canary: alloc.DeploymentStatus.IsCanary(),

downgradeNonCanary: canaryState && !alloc.DeploymentStatus.IsCanary(),
minJobVersion: alloc.Job.Version,
})
}

Expand All @@ -732,8 +739,9 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
if existing < group.Count {
for _, name := range nameIndex.Next(uint(group.Count - existing)) {
place = append(place, allocPlaceResult{
name: name,
taskGroup: group,
name: name,
taskGroup: group,
downgradeNonCanary: canaryState,
})
}
}
Expand Down
Loading

0 comments on commit 1afd415

Please sign in to comment.