diff --git a/api/jobs.go b/api/jobs.go index 9964990235c..b240b394f49 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -751,6 +751,8 @@ func (j *Job) Canonicalize() { } if j.Update != nil { j.Update.Canonicalize() + } else if *j.Type == JobTypeService { + j.Update = DefaultUpdateStrategy() } for _, tg := range j.TaskGroups { diff --git a/api/jobs_test.go b/api/jobs_test.go index d88cd93fb80..34de8e8d0de 100644 --- a/api/jobs_test.go +++ b/api/jobs_test.go @@ -108,6 +108,17 @@ func TestJobs_Canonicalize(t *testing.T) { CreateIndex: uint64ToPtr(0), ModifyIndex: uint64ToPtr(0), JobModifyIndex: uint64ToPtr(0), + Update: &UpdateStrategy{ + Stagger: timeToPtr(30 * time.Second), + MaxParallel: intToPtr(1), + HealthCheck: stringToPtr("checks"), + MinHealthyTime: timeToPtr(10 * time.Second), + HealthyDeadline: timeToPtr(5 * time.Minute), + ProgressDeadline: timeToPtr(10 * time.Minute), + AutoRevert: boolToPtr(false), + Canary: intToPtr(0), + AutoPromote: boolToPtr(false), + }, TaskGroups: []*TaskGroup{ { Name: stringToPtr(""), @@ -131,6 +142,17 @@ func TestJobs_Canonicalize(t *testing.T) { MaxDelay: timeToPtr(1 * time.Hour), Unlimited: boolToPtr(true), }, + Update: &UpdateStrategy{ + Stagger: timeToPtr(30 * time.Second), + MaxParallel: intToPtr(1), + HealthCheck: stringToPtr("checks"), + MinHealthyTime: timeToPtr(10 * time.Second), + HealthyDeadline: timeToPtr(5 * time.Minute), + ProgressDeadline: timeToPtr(10 * time.Minute), + AutoRevert: boolToPtr(false), + Canary: intToPtr(0), + AutoPromote: boolToPtr(false), + }, Migrate: DefaultMigrateStrategy(), Tasks: []*Task{ { @@ -143,6 +165,70 @@ func TestJobs_Canonicalize(t *testing.T) { }, }, }, + { + name: "batch", + input: &Job{ + Type: stringToPtr("batch"), + TaskGroups: []*TaskGroup{ + { + Tasks: []*Task{ + {}, + }, + }, + }, + }, + expected: &Job{ + ID: stringToPtr(""), + Name: stringToPtr(""), + Region: stringToPtr("global"), + Namespace: stringToPtr(DefaultNamespace), + Type: stringToPtr("batch"), + ParentID: stringToPtr(""), + Priority: intToPtr(50), + AllAtOnce: boolToPtr(false), + VaultToken: stringToPtr(""), + Status: stringToPtr(""), + StatusDescription: stringToPtr(""), + Stop: boolToPtr(false), + Stable: boolToPtr(false), + Version: uint64ToPtr(0), + CreateIndex: uint64ToPtr(0), + ModifyIndex: uint64ToPtr(0), + JobModifyIndex: uint64ToPtr(0), + TaskGroups: []*TaskGroup{ + { + Name: stringToPtr(""), + Count: intToPtr(1), + EphemeralDisk: &EphemeralDisk{ + Sticky: boolToPtr(false), + Migrate: boolToPtr(false), + SizeMB: intToPtr(300), + }, + RestartPolicy: &RestartPolicy{ + Delay: timeToPtr(15 * time.Second), + Attempts: intToPtr(3), + Interval: timeToPtr(24 * time.Hour), + Mode: stringToPtr("fail"), + }, + ReschedulePolicy: &ReschedulePolicy{ + Attempts: intToPtr(1), + Interval: timeToPtr(24 * time.Hour), + DelayFunction: stringToPtr("constant"), + Delay: timeToPtr(5 * time.Second), + MaxDelay: timeToPtr(0), + Unlimited: boolToPtr(false), + }, + Tasks: []*Task{ + { + KillTimeout: timeToPtr(5 * time.Second), + LogConfig: DefaultLogConfig(), + Resources: DefaultResources(), + }, + }, + }, + }, + }, + }, { name: "partial", input: &Job{ @@ -179,6 +265,17 @@ func TestJobs_Canonicalize(t *testing.T) { CreateIndex: uint64ToPtr(0), ModifyIndex: uint64ToPtr(0), JobModifyIndex: uint64ToPtr(0), + Update: &UpdateStrategy{ + Stagger: timeToPtr(30 * time.Second), + MaxParallel: intToPtr(1), + HealthCheck: stringToPtr("checks"), + MinHealthyTime: timeToPtr(10 * time.Second), + HealthyDeadline: timeToPtr(5 * time.Minute), + ProgressDeadline: timeToPtr(10 * time.Minute), + AutoRevert: boolToPtr(false), + Canary: intToPtr(0), + AutoPromote: boolToPtr(false), + }, TaskGroups: []*TaskGroup{ { Name: stringToPtr("bar"), @@ -202,6 +299,17 @@ func TestJobs_Canonicalize(t *testing.T) { MaxDelay: timeToPtr(1 * time.Hour), Unlimited: boolToPtr(true), }, + Update: &UpdateStrategy{ + Stagger: timeToPtr(30 * time.Second), + MaxParallel: intToPtr(1), + HealthCheck: stringToPtr("checks"), + MinHealthyTime: timeToPtr(10 * time.Second), + HealthyDeadline: timeToPtr(5 * time.Minute), + ProgressDeadline: timeToPtr(10 * time.Minute), + AutoRevert: boolToPtr(false), + Canary: intToPtr(0), + AutoPromote: boolToPtr(false), + }, Migrate: DefaultMigrateStrategy(), Tasks: []*Task{ { @@ -467,6 +575,17 @@ func TestJobs_Canonicalize(t *testing.T) { CreateIndex: uint64ToPtr(0), ModifyIndex: uint64ToPtr(0), JobModifyIndex: uint64ToPtr(0), + Update: &UpdateStrategy{ + Stagger: timeToPtr(30 * time.Second), + MaxParallel: intToPtr(1), + HealthCheck: stringToPtr("checks"), + MinHealthyTime: timeToPtr(10 * time.Second), + HealthyDeadline: timeToPtr(5 * time.Minute), + ProgressDeadline: timeToPtr(10 * time.Minute), + AutoRevert: boolToPtr(false), + Canary: intToPtr(0), + AutoPromote: boolToPtr(false), + }, Periodic: &PeriodicConfig{ Enabled: boolToPtr(true), Spec: stringToPtr(""), diff --git a/client/allocrunner/health_hook.go b/client/allocrunner/health_hook.go index a4055655747..a4bebacacdf 100644 --- a/client/allocrunner/health_hook.go +++ b/client/allocrunner/health_hook.go @@ -117,7 +117,7 @@ func (h *allocHealthWatcherHook) init() error { // No need to watch allocs for deployments that rely on operators // manually setting health - if h.isDeploy && (tg.Update == nil || tg.Update.HealthCheck == structs.UpdateStrategyHealthCheck_Manual) { + if h.isDeploy && (tg.Update.IsEmpty() || tg.Update.HealthCheck == structs.UpdateStrategyHealthCheck_Manual) { return nil } diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index d2c70a37dcd..0ccc35a3e3f 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -641,7 +641,7 @@ func ApiJobToStructJob(job *api.Job) *structs.Job { // Update has been pushed into the task groups. stagger and max_parallel are // preserved at the job level, but all other values are discarded. The job.Update // api value is merged into TaskGroups already in api.Canonicalize - if job.Update != nil { + if job.Update != nil && job.Update.MaxParallel != nil && *job.Update.MaxParallel > 0 { j.Update = structs.UpdateStrategy{} if job.Update.Stagger != nil { diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index bd3252e9683..113daf1764e 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -270,6 +270,117 @@ func Job() *structs.Job { return job } +func MaxParallelJob() *structs.Job { + update := *structs.DefaultUpdateStrategy + update.MaxParallel = 0 + job := &structs.Job{ + Region: "global", + ID: fmt.Sprintf("mock-service-%s", uuid.Generate()), + Name: "my-job", + Namespace: structs.DefaultNamespace, + Type: structs.JobTypeService, + Priority: 50, + AllAtOnce: false, + Datacenters: []string{"dc1"}, + Constraints: []*structs.Constraint{ + { + LTarget: "${attr.kernel.name}", + RTarget: "linux", + Operand: "=", + }, + }, + Update: update, + TaskGroups: []*structs.TaskGroup{ + { + Name: "web", + Count: 10, + EphemeralDisk: &structs.EphemeralDisk{ + SizeMB: 150, + }, + RestartPolicy: &structs.RestartPolicy{ + Attempts: 3, + Interval: 10 * time.Minute, + Delay: 1 * time.Minute, + Mode: structs.RestartPolicyModeDelay, + }, + ReschedulePolicy: &structs.ReschedulePolicy{ + Attempts: 2, + Interval: 10 * time.Minute, + Delay: 5 * time.Second, + DelayFunction: "constant", + }, + Migrate: structs.DefaultMigrateStrategy(), + Update: &update, + Tasks: []*structs.Task{ + { + Name: "web", + Driver: "exec", + Config: map[string]interface{}{ + "command": "/bin/date", + }, + Env: map[string]string{ + "FOO": "bar", + }, + Services: []*structs.Service{ + { + Name: "${TASK}-frontend", + PortLabel: "http", + Tags: []string{"pci:${meta.pci-dss}", "datacenter:${node.datacenter}"}, + Checks: []*structs.ServiceCheck{ + { + Name: "check-table", + Type: structs.ServiceCheckScript, + Command: "/usr/local/check-table-${meta.database}", + Args: []string{"${meta.version}"}, + Interval: 30 * time.Second, + Timeout: 5 * time.Second, + }, + }, + }, + { + Name: "${TASK}-admin", + PortLabel: "admin", + }, + }, + LogConfig: structs.DefaultLogConfig(), + Resources: &structs.Resources{ + CPU: 500, + MemoryMB: 256, + Networks: []*structs.NetworkResource{ + { + MBits: 50, + DynamicPorts: []structs.Port{ + {Label: "http"}, + {Label: "admin"}, + }, + }, + }, + }, + Meta: map[string]string{ + "foo": "bar", + }, + }, + }, + Meta: map[string]string{ + "elb_check_type": "http", + "elb_check_interval": "30s", + "elb_check_min": "3", + }, + }, + }, + Meta: map[string]string{ + "owner": "armon", + }, + Status: structs.JobStatusPending, + Version: 0, + CreateIndex: 42, + ModifyIndex: 99, + JobModifyIndex: 99, + } + job.Canonicalize() + return job +} + func BatchJob() *structs.Job { job := &structs.Job{ Region: "global", diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 6a6dea2a2f7..67afd2498ea 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -3640,7 +3640,7 @@ func (j *Job) Stopped() bool { // HasUpdateStrategy returns if any task group in the job has an update strategy func (j *Job) HasUpdateStrategy() bool { for _, tg := range j.TaskGroups { - if tg.Update != nil { + if !tg.Update.IsEmpty() { return true } } @@ -3969,8 +3969,8 @@ func (u *UpdateStrategy) Validate() error { multierror.Append(&mErr, fmt.Errorf("Invalid health check given: %q", u.HealthCheck)) } - if u.MaxParallel < 1 { - multierror.Append(&mErr, fmt.Errorf("Max parallel can not be less than one: %d < 1", u.MaxParallel)) + if u.MaxParallel < 0 { + multierror.Append(&mErr, fmt.Errorf("Max parallel can not be less than zero: %d < 0", u.MaxParallel)) } if u.Canary < 0 { multierror.Append(&mErr, fmt.Errorf("Canary count can not be less than zero: %d < 0", u.Canary)) @@ -4000,6 +4000,14 @@ func (u *UpdateStrategy) Validate() error { return mErr.ErrorOrNil() } +func (u *UpdateStrategy) IsEmpty() bool { + if u == nil { + return true + } + + return u.MaxParallel == 0 +} + // TODO(alexdadgar): Remove once no longer used by the scheduler. // Rolling returns if a rolling strategy should be used func (u *UpdateStrategy) Rolling() bool { diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index e2529e8b25d..30c5c603b02 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -1995,7 +1995,7 @@ func TestAffinity_Validate(t *testing.T) { func TestUpdateStrategy_Validate(t *testing.T) { u := &UpdateStrategy{ - MaxParallel: 0, + MaxParallel: -1, HealthCheck: "foo", MinHealthyTime: -10, HealthyDeadline: -15, @@ -2009,7 +2009,7 @@ func TestUpdateStrategy_Validate(t *testing.T) { if !strings.Contains(mErr.Errors[0].Error(), "Invalid health check given") { t.Fatalf("err: %s", err) } - if !strings.Contains(mErr.Errors[1].Error(), "Max parallel can not be less than one") { + if !strings.Contains(mErr.Errors[1].Error(), "Max parallel can not be less than zero") { t.Fatalf("err: %s", err) } if !strings.Contains(mErr.Errors[2].Error(), "Canary count can not be less than zero") { diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index d3f20fe81f9..ccddfa07f85 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -331,7 +331,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { } if !existingDeployment { dstate = &structs.DeploymentState{} - if tg.Update != nil { + if !tg.Update.IsEmpty() { dstate.AutoRevert = tg.Update.AutoRevert dstate.AutoPromote = tg.Update.AutoPromote dstate.ProgressDeadline = tg.Update.ProgressDeadline @@ -509,7 +509,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { } // Create a new deployment if necessary - if !existingDeployment && strategy != nil && dstate.DesiredTotal != 0 && (!hadRunning || updatingSpec) { + if !existingDeployment && !strategy.IsEmpty() && dstate.DesiredTotal != 0 && (!hadRunning || updatingSpec) { // A previous group may have made the deployment already if a.deployment == nil { a.deployment = structs.NewDeployment(a.job) @@ -618,7 +618,7 @@ func (a *allocReconciler) handleGroupCanaries(all allocSet, desiredChanges *stru func (a *allocReconciler) computeLimit(group *structs.TaskGroup, untainted, destructive, migrate allocSet, canaryState bool) int { // If there is no update strategy or deployment for the group we can deploy // as many as the group has - if group.Update == nil || len(destructive)+len(migrate) == 0 { + if group.Update.IsEmpty() || len(destructive)+len(migrate) == 0 { return group.Count } else if a.deploymentPaused || a.deploymentFailed { // If the deployment is paused or failed, do not create anything else diff --git a/scheduler/reconcile_test.go b/scheduler/reconcile_test.go index 132104afadc..0366ebb539c 100644 --- a/scheduler/reconcile_test.go +++ b/scheduler/reconcile_test.go @@ -611,6 +611,39 @@ func TestReconciler_Destructive(t *testing.T) { assertNamesHaveIndexes(t, intRange(0, 9), destructiveResultsToNames(r.destructiveUpdate)) } +// Tests the reconciler properly handles destructive upgrading allocations when max_parallel=0 +func TestReconciler_DestructiveMaxParallel(t *testing.T) { + job := mock.MaxParallelJob() + + // Create 10 existing allocations + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = uuid.Generate() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + allocs = append(allocs, alloc) + } + + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil, "") + r := reconciler.Compute() + + // Assert the correct results + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: nil, + destructive: 10, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + DestructiveUpdate: 10, + }, + }, + }) + + assertNamesHaveIndexes(t, intRange(0, 9), destructiveResultsToNames(r.destructiveUpdate)) +} + // Tests the reconciler properly handles destructive upgrading allocations while // scaling up func TestReconciler_Destructive_ScaleUp(t *testing.T) { diff --git a/website/source/docs/job-specification/update.html.md b/website/source/docs/job-specification/update.html.md index e0e2ca3be83..8aeb717cf09 100644 --- a/website/source/docs/job-specification/update.html.md +++ b/website/source/docs/job-specification/update.html.md @@ -52,9 +52,11 @@ job "docs" { ## `update` Parameters -- `max_parallel` `(int: 0)` - Specifies the number of allocations within a task group that can be +- `max_parallel` `(int: 1)` - Specifies the number of allocations within a task group that can be updated at the same time. The task groups themselves are updated in parallel. + - `max_parallel = 0` - Specifies that the allocation should use forced updates instead of deployments + - `health_check` `(string: "checks")` - Specifies the mechanism in which allocations health is determined. The potential values are: