Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add default update stanza and max_parallel=0 disables deployments #6191

Merged
merged 10 commits into from
Sep 2, 2019
2 changes: 2 additions & 0 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,8 @@ func (j *Job) Canonicalize() {
}
if j.Update != nil {
j.Update.Canonicalize()
} else if *j.Type == JobTypeService {
j.Update = DefaultUpdateStrategy()
}

for _, tg := range j.TaskGroups {
Expand Down
119 changes: 119 additions & 0 deletions api/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,17 @@ func TestJobs_Canonicalize(t *testing.T) {
CreateIndex: uint64ToPtr(0),
ModifyIndex: uint64ToPtr(0),
JobModifyIndex: uint64ToPtr(0),
Update: &UpdateStrategy{
Stagger: timeToPtr(30 * time.Second),
MaxParallel: intToPtr(1),
HealthCheck: stringToPtr("checks"),
MinHealthyTime: timeToPtr(10 * time.Second),
HealthyDeadline: timeToPtr(5 * time.Minute),
ProgressDeadline: timeToPtr(10 * time.Minute),
AutoRevert: boolToPtr(false),
Canary: intToPtr(0),
AutoPromote: boolToPtr(false),
},
TaskGroups: []*TaskGroup{
{
Name: stringToPtr(""),
Expand All @@ -131,6 +142,17 @@ func TestJobs_Canonicalize(t *testing.T) {
MaxDelay: timeToPtr(1 * time.Hour),
Unlimited: boolToPtr(true),
},
Update: &UpdateStrategy{
Stagger: timeToPtr(30 * time.Second),
MaxParallel: intToPtr(1),
HealthCheck: stringToPtr("checks"),
MinHealthyTime: timeToPtr(10 * time.Second),
HealthyDeadline: timeToPtr(5 * time.Minute),
ProgressDeadline: timeToPtr(10 * time.Minute),
AutoRevert: boolToPtr(false),
Canary: intToPtr(0),
AutoPromote: boolToPtr(false),
},
Migrate: DefaultMigrateStrategy(),
Tasks: []*Task{
{
Expand All @@ -143,6 +165,70 @@ func TestJobs_Canonicalize(t *testing.T) {
},
},
},
{
name: "batch",
input: &Job{
Type: stringToPtr("batch"),
TaskGroups: []*TaskGroup{
{
Tasks: []*Task{
{},
},
},
},
},
expected: &Job{
ID: stringToPtr(""),
Name: stringToPtr(""),
Region: stringToPtr("global"),
Namespace: stringToPtr(DefaultNamespace),
Type: stringToPtr("batch"),
ParentID: stringToPtr(""),
Priority: intToPtr(50),
AllAtOnce: boolToPtr(false),
VaultToken: stringToPtr(""),
Status: stringToPtr(""),
StatusDescription: stringToPtr(""),
Stop: boolToPtr(false),
Stable: boolToPtr(false),
Version: uint64ToPtr(0),
CreateIndex: uint64ToPtr(0),
ModifyIndex: uint64ToPtr(0),
JobModifyIndex: uint64ToPtr(0),
TaskGroups: []*TaskGroup{
{
Name: stringToPtr(""),
Count: intToPtr(1),
EphemeralDisk: &EphemeralDisk{
Sticky: boolToPtr(false),
Migrate: boolToPtr(false),
SizeMB: intToPtr(300),
},
RestartPolicy: &RestartPolicy{
Delay: timeToPtr(15 * time.Second),
Attempts: intToPtr(3),
Interval: timeToPtr(24 * time.Hour),
Mode: stringToPtr("fail"),
},
ReschedulePolicy: &ReschedulePolicy{
Attempts: intToPtr(1),
Interval: timeToPtr(24 * time.Hour),
DelayFunction: stringToPtr("constant"),
Delay: timeToPtr(5 * time.Second),
MaxDelay: timeToPtr(0),
Unlimited: boolToPtr(false),
},
Tasks: []*Task{
{
KillTimeout: timeToPtr(5 * time.Second),
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
},
},
},
},
},
},
{
name: "partial",
input: &Job{
Expand Down Expand Up @@ -179,6 +265,17 @@ func TestJobs_Canonicalize(t *testing.T) {
CreateIndex: uint64ToPtr(0),
ModifyIndex: uint64ToPtr(0),
JobModifyIndex: uint64ToPtr(0),
Update: &UpdateStrategy{
Stagger: timeToPtr(30 * time.Second),
MaxParallel: intToPtr(1),
HealthCheck: stringToPtr("checks"),
MinHealthyTime: timeToPtr(10 * time.Second),
HealthyDeadline: timeToPtr(5 * time.Minute),
ProgressDeadline: timeToPtr(10 * time.Minute),
AutoRevert: boolToPtr(false),
Canary: intToPtr(0),
AutoPromote: boolToPtr(false),
},
TaskGroups: []*TaskGroup{
{
Name: stringToPtr("bar"),
Expand All @@ -202,6 +299,17 @@ func TestJobs_Canonicalize(t *testing.T) {
MaxDelay: timeToPtr(1 * time.Hour),
Unlimited: boolToPtr(true),
},
Update: &UpdateStrategy{
Stagger: timeToPtr(30 * time.Second),
MaxParallel: intToPtr(1),
HealthCheck: stringToPtr("checks"),
MinHealthyTime: timeToPtr(10 * time.Second),
HealthyDeadline: timeToPtr(5 * time.Minute),
ProgressDeadline: timeToPtr(10 * time.Minute),
AutoRevert: boolToPtr(false),
Canary: intToPtr(0),
AutoPromote: boolToPtr(false),
},
Migrate: DefaultMigrateStrategy(),
Tasks: []*Task{
{
Expand Down Expand Up @@ -467,6 +575,17 @@ func TestJobs_Canonicalize(t *testing.T) {
CreateIndex: uint64ToPtr(0),
ModifyIndex: uint64ToPtr(0),
JobModifyIndex: uint64ToPtr(0),
Update: &UpdateStrategy{
Stagger: timeToPtr(30 * time.Second),
MaxParallel: intToPtr(1),
HealthCheck: stringToPtr("checks"),
MinHealthyTime: timeToPtr(10 * time.Second),
HealthyDeadline: timeToPtr(5 * time.Minute),
ProgressDeadline: timeToPtr(10 * time.Minute),
AutoRevert: boolToPtr(false),
Canary: intToPtr(0),
AutoPromote: boolToPtr(false),
},
Periodic: &PeriodicConfig{
Enabled: boolToPtr(true),
Spec: stringToPtr(""),
Expand Down
2 changes: 1 addition & 1 deletion client/allocrunner/health_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (h *allocHealthWatcherHook) init() error {

// No need to watch allocs for deployments that rely on operators
// manually setting health
if h.isDeploy && (tg.Update == nil || tg.Update.HealthCheck == structs.UpdateStrategyHealthCheck_Manual) {
if h.isDeploy && (tg.Update.IsEmpty() || tg.Update.HealthCheck == structs.UpdateStrategyHealthCheck_Manual) {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ func ApiJobToStructJob(job *api.Job) *structs.Job {
// Update has been pushed into the task groups. stagger and max_parallel are
// preserved at the job level, but all other values are discarded. The job.Update
// api value is merged into TaskGroups already in api.Canonicalize
if job.Update != nil {
if job.Update != nil && job.Update.MaxParallel != nil && *job.Update.MaxParallel > 0 {
j.Update = structs.UpdateStrategy{}

if job.Update.Stagger != nil {
Expand Down
111 changes: 111 additions & 0 deletions nomad/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,117 @@ func Job() *structs.Job {
return job
}

func MaxParallelJob() *structs.Job {
update := *structs.DefaultUpdateStrategy
update.MaxParallel = 0
job := &structs.Job{
Region: "global",
ID: fmt.Sprintf("mock-service-%s", uuid.Generate()),
Name: "my-job",
Namespace: structs.DefaultNamespace,
Type: structs.JobTypeService,
Priority: 50,
AllAtOnce: false,
Datacenters: []string{"dc1"},
Constraints: []*structs.Constraint{
{
LTarget: "${attr.kernel.name}",
RTarget: "linux",
Operand: "=",
},
},
Update: update,
TaskGroups: []*structs.TaskGroup{
{
Name: "web",
Count: 10,
EphemeralDisk: &structs.EphemeralDisk{
SizeMB: 150,
},
RestartPolicy: &structs.RestartPolicy{
Attempts: 3,
Interval: 10 * time.Minute,
Delay: 1 * time.Minute,
Mode: structs.RestartPolicyModeDelay,
},
ReschedulePolicy: &structs.ReschedulePolicy{
Attempts: 2,
Interval: 10 * time.Minute,
Delay: 5 * time.Second,
DelayFunction: "constant",
},
Migrate: structs.DefaultMigrateStrategy(),
Update: &update,
Tasks: []*structs.Task{
{
Name: "web",
Driver: "exec",
Config: map[string]interface{}{
"command": "/bin/date",
},
Env: map[string]string{
"FOO": "bar",
},
Services: []*structs.Service{
{
Name: "${TASK}-frontend",
PortLabel: "http",
Tags: []string{"pci:${meta.pci-dss}", "datacenter:${node.datacenter}"},
Checks: []*structs.ServiceCheck{
{
Name: "check-table",
Type: structs.ServiceCheckScript,
Command: "/usr/local/check-table-${meta.database}",
Args: []string{"${meta.version}"},
Interval: 30 * time.Second,
Timeout: 5 * time.Second,
},
},
},
{
Name: "${TASK}-admin",
PortLabel: "admin",
},
},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 256,
Networks: []*structs.NetworkResource{
{
MBits: 50,
DynamicPorts: []structs.Port{
{Label: "http"},
{Label: "admin"},
},
},
},
},
Meta: map[string]string{
"foo": "bar",
},
},
},
Meta: map[string]string{
"elb_check_type": "http",
"elb_check_interval": "30s",
"elb_check_min": "3",
},
},
},
Meta: map[string]string{
"owner": "armon",
},
Status: structs.JobStatusPending,
Version: 0,
CreateIndex: 42,
ModifyIndex: 99,
JobModifyIndex: 99,
}
job.Canonicalize()
return job
}

func BatchJob() *structs.Job {
job := &structs.Job{
Region: "global",
Expand Down
14 changes: 11 additions & 3 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3640,7 +3640,7 @@ func (j *Job) Stopped() bool {
// HasUpdateStrategy returns if any task group in the job has an update strategy
func (j *Job) HasUpdateStrategy() bool {
for _, tg := range j.TaskGroups {
if tg.Update != nil {
if !tg.Update.IsEmpty() {
return true
}
}
Expand Down Expand Up @@ -3969,8 +3969,8 @@ func (u *UpdateStrategy) Validate() error {
multierror.Append(&mErr, fmt.Errorf("Invalid health check given: %q", u.HealthCheck))
}

if u.MaxParallel < 1 {
multierror.Append(&mErr, fmt.Errorf("Max parallel can not be less than one: %d < 1", u.MaxParallel))
if u.MaxParallel < 0 {
multierror.Append(&mErr, fmt.Errorf("Max parallel can not be less than zero: %d < 0", u.MaxParallel))
}
if u.Canary < 0 {
multierror.Append(&mErr, fmt.Errorf("Canary count can not be less than zero: %d < 0", u.Canary))
Expand Down Expand Up @@ -4000,6 +4000,14 @@ func (u *UpdateStrategy) Validate() error {
return mErr.ErrorOrNil()
}

func (u *UpdateStrategy) IsEmpty() bool {
if u == nil {
return true
}

return u.MaxParallel == 0
}

// TODO(alexdadgar): Remove once no longer used by the scheduler.
// Rolling returns if a rolling strategy should be used
func (u *UpdateStrategy) Rolling() bool {
Expand Down
4 changes: 2 additions & 2 deletions nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1995,7 +1995,7 @@ func TestAffinity_Validate(t *testing.T) {

func TestUpdateStrategy_Validate(t *testing.T) {
u := &UpdateStrategy{
MaxParallel: 0,
MaxParallel: -1,
HealthCheck: "foo",
MinHealthyTime: -10,
HealthyDeadline: -15,
Expand All @@ -2009,7 +2009,7 @@ func TestUpdateStrategy_Validate(t *testing.T) {
if !strings.Contains(mErr.Errors[0].Error(), "Invalid health check given") {
t.Fatalf("err: %s", err)
}
if !strings.Contains(mErr.Errors[1].Error(), "Max parallel can not be less than one") {
if !strings.Contains(mErr.Errors[1].Error(), "Max parallel can not be less than zero") {
t.Fatalf("err: %s", err)
}
if !strings.Contains(mErr.Errors[2].Error(), "Canary count can not be less than zero") {
Expand Down
6 changes: 3 additions & 3 deletions scheduler/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
}
if !existingDeployment {
dstate = &structs.DeploymentState{}
if tg.Update != nil {
if !tg.Update.IsEmpty() {
dstate.AutoRevert = tg.Update.AutoRevert
dstate.AutoPromote = tg.Update.AutoPromote
dstate.ProgressDeadline = tg.Update.ProgressDeadline
Expand Down Expand Up @@ -509,7 +509,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
}

// Create a new deployment if necessary
if !existingDeployment && strategy != nil && dstate.DesiredTotal != 0 && (!hadRunning || updatingSpec) {
if !existingDeployment && !strategy.IsEmpty() && dstate.DesiredTotal != 0 && (!hadRunning || updatingSpec) {
// A previous group may have made the deployment already
if a.deployment == nil {
a.deployment = structs.NewDeployment(a.job)
Expand Down Expand Up @@ -618,7 +618,7 @@ func (a *allocReconciler) handleGroupCanaries(all allocSet, desiredChanges *stru
func (a *allocReconciler) computeLimit(group *structs.TaskGroup, untainted, destructive, migrate allocSet, canaryState bool) int {
// If there is no update strategy or deployment for the group we can deploy
// as many as the group has
if group.Update == nil || len(destructive)+len(migrate) == 0 {
if group.Update.IsEmpty() || len(destructive)+len(migrate) == 0 {
return group.Count
} else if a.deploymentPaused || a.deploymentFailed {
// If the deployment is paused or failed, do not create anything else
Expand Down
Loading