Skip to content

Commit

Permalink
add default update stanza and max_parallel=0 disables deployments (#6191
Browse files Browse the repository at this point in the history
)
  • Loading branch information
jazzyfresh authored Sep 2, 2019
1 parent 9a44545 commit c346a47
Show file tree
Hide file tree
Showing 10 changed files with 286 additions and 11 deletions.
2 changes: 2 additions & 0 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,8 @@ func (j *Job) Canonicalize() {
}
if j.Update != nil {
j.Update.Canonicalize()
} else if *j.Type == JobTypeService {
j.Update = DefaultUpdateStrategy()
}

for _, tg := range j.TaskGroups {
Expand Down
119 changes: 119 additions & 0 deletions api/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,17 @@ func TestJobs_Canonicalize(t *testing.T) {
CreateIndex: uint64ToPtr(0),
ModifyIndex: uint64ToPtr(0),
JobModifyIndex: uint64ToPtr(0),
Update: &UpdateStrategy{
Stagger: timeToPtr(30 * time.Second),
MaxParallel: intToPtr(1),
HealthCheck: stringToPtr("checks"),
MinHealthyTime: timeToPtr(10 * time.Second),
HealthyDeadline: timeToPtr(5 * time.Minute),
ProgressDeadline: timeToPtr(10 * time.Minute),
AutoRevert: boolToPtr(false),
Canary: intToPtr(0),
AutoPromote: boolToPtr(false),
},
TaskGroups: []*TaskGroup{
{
Name: stringToPtr(""),
Expand All @@ -131,6 +142,17 @@ func TestJobs_Canonicalize(t *testing.T) {
MaxDelay: timeToPtr(1 * time.Hour),
Unlimited: boolToPtr(true),
},
Update: &UpdateStrategy{
Stagger: timeToPtr(30 * time.Second),
MaxParallel: intToPtr(1),
HealthCheck: stringToPtr("checks"),
MinHealthyTime: timeToPtr(10 * time.Second),
HealthyDeadline: timeToPtr(5 * time.Minute),
ProgressDeadline: timeToPtr(10 * time.Minute),
AutoRevert: boolToPtr(false),
Canary: intToPtr(0),
AutoPromote: boolToPtr(false),
},
Migrate: DefaultMigrateStrategy(),
Tasks: []*Task{
{
Expand All @@ -143,6 +165,70 @@ func TestJobs_Canonicalize(t *testing.T) {
},
},
},
{
name: "batch",
input: &Job{
Type: stringToPtr("batch"),
TaskGroups: []*TaskGroup{
{
Tasks: []*Task{
{},
},
},
},
},
expected: &Job{
ID: stringToPtr(""),
Name: stringToPtr(""),
Region: stringToPtr("global"),
Namespace: stringToPtr(DefaultNamespace),
Type: stringToPtr("batch"),
ParentID: stringToPtr(""),
Priority: intToPtr(50),
AllAtOnce: boolToPtr(false),
VaultToken: stringToPtr(""),
Status: stringToPtr(""),
StatusDescription: stringToPtr(""),
Stop: boolToPtr(false),
Stable: boolToPtr(false),
Version: uint64ToPtr(0),
CreateIndex: uint64ToPtr(0),
ModifyIndex: uint64ToPtr(0),
JobModifyIndex: uint64ToPtr(0),
TaskGroups: []*TaskGroup{
{
Name: stringToPtr(""),
Count: intToPtr(1),
EphemeralDisk: &EphemeralDisk{
Sticky: boolToPtr(false),
Migrate: boolToPtr(false),
SizeMB: intToPtr(300),
},
RestartPolicy: &RestartPolicy{
Delay: timeToPtr(15 * time.Second),
Attempts: intToPtr(3),
Interval: timeToPtr(24 * time.Hour),
Mode: stringToPtr("fail"),
},
ReschedulePolicy: &ReschedulePolicy{
Attempts: intToPtr(1),
Interval: timeToPtr(24 * time.Hour),
DelayFunction: stringToPtr("constant"),
Delay: timeToPtr(5 * time.Second),
MaxDelay: timeToPtr(0),
Unlimited: boolToPtr(false),
},
Tasks: []*Task{
{
KillTimeout: timeToPtr(5 * time.Second),
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
},
},
},
},
},
},
{
name: "partial",
input: &Job{
Expand Down Expand Up @@ -179,6 +265,17 @@ func TestJobs_Canonicalize(t *testing.T) {
CreateIndex: uint64ToPtr(0),
ModifyIndex: uint64ToPtr(0),
JobModifyIndex: uint64ToPtr(0),
Update: &UpdateStrategy{
Stagger: timeToPtr(30 * time.Second),
MaxParallel: intToPtr(1),
HealthCheck: stringToPtr("checks"),
MinHealthyTime: timeToPtr(10 * time.Second),
HealthyDeadline: timeToPtr(5 * time.Minute),
ProgressDeadline: timeToPtr(10 * time.Minute),
AutoRevert: boolToPtr(false),
Canary: intToPtr(0),
AutoPromote: boolToPtr(false),
},
TaskGroups: []*TaskGroup{
{
Name: stringToPtr("bar"),
Expand All @@ -202,6 +299,17 @@ func TestJobs_Canonicalize(t *testing.T) {
MaxDelay: timeToPtr(1 * time.Hour),
Unlimited: boolToPtr(true),
},
Update: &UpdateStrategy{
Stagger: timeToPtr(30 * time.Second),
MaxParallel: intToPtr(1),
HealthCheck: stringToPtr("checks"),
MinHealthyTime: timeToPtr(10 * time.Second),
HealthyDeadline: timeToPtr(5 * time.Minute),
ProgressDeadline: timeToPtr(10 * time.Minute),
AutoRevert: boolToPtr(false),
Canary: intToPtr(0),
AutoPromote: boolToPtr(false),
},
Migrate: DefaultMigrateStrategy(),
Tasks: []*Task{
{
Expand Down Expand Up @@ -467,6 +575,17 @@ func TestJobs_Canonicalize(t *testing.T) {
CreateIndex: uint64ToPtr(0),
ModifyIndex: uint64ToPtr(0),
JobModifyIndex: uint64ToPtr(0),
Update: &UpdateStrategy{
Stagger: timeToPtr(30 * time.Second),
MaxParallel: intToPtr(1),
HealthCheck: stringToPtr("checks"),
MinHealthyTime: timeToPtr(10 * time.Second),
HealthyDeadline: timeToPtr(5 * time.Minute),
ProgressDeadline: timeToPtr(10 * time.Minute),
AutoRevert: boolToPtr(false),
Canary: intToPtr(0),
AutoPromote: boolToPtr(false),
},
Periodic: &PeriodicConfig{
Enabled: boolToPtr(true),
Spec: stringToPtr(""),
Expand Down
2 changes: 1 addition & 1 deletion client/allocrunner/health_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (h *allocHealthWatcherHook) init() error {

// No need to watch allocs for deployments that rely on operators
// manually setting health
if h.isDeploy && (tg.Update == nil || tg.Update.HealthCheck == structs.UpdateStrategyHealthCheck_Manual) {
if h.isDeploy && (tg.Update.IsEmpty() || tg.Update.HealthCheck == structs.UpdateStrategyHealthCheck_Manual) {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ func ApiJobToStructJob(job *api.Job) *structs.Job {
// Update has been pushed into the task groups. stagger and max_parallel are
// preserved at the job level, but all other values are discarded. The job.Update
// api value is merged into TaskGroups already in api.Canonicalize
if job.Update != nil {
if job.Update != nil && job.Update.MaxParallel != nil && *job.Update.MaxParallel > 0 {
j.Update = structs.UpdateStrategy{}

if job.Update.Stagger != nil {
Expand Down
111 changes: 111 additions & 0 deletions nomad/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,117 @@ func Job() *structs.Job {
return job
}

func MaxParallelJob() *structs.Job {
update := *structs.DefaultUpdateStrategy
update.MaxParallel = 0
job := &structs.Job{
Region: "global",
ID: fmt.Sprintf("mock-service-%s", uuid.Generate()),
Name: "my-job",
Namespace: structs.DefaultNamespace,
Type: structs.JobTypeService,
Priority: 50,
AllAtOnce: false,
Datacenters: []string{"dc1"},
Constraints: []*structs.Constraint{
{
LTarget: "${attr.kernel.name}",
RTarget: "linux",
Operand: "=",
},
},
Update: update,
TaskGroups: []*structs.TaskGroup{
{
Name: "web",
Count: 10,
EphemeralDisk: &structs.EphemeralDisk{
SizeMB: 150,
},
RestartPolicy: &structs.RestartPolicy{
Attempts: 3,
Interval: 10 * time.Minute,
Delay: 1 * time.Minute,
Mode: structs.RestartPolicyModeDelay,
},
ReschedulePolicy: &structs.ReschedulePolicy{
Attempts: 2,
Interval: 10 * time.Minute,
Delay: 5 * time.Second,
DelayFunction: "constant",
},
Migrate: structs.DefaultMigrateStrategy(),
Update: &update,
Tasks: []*structs.Task{
{
Name: "web",
Driver: "exec",
Config: map[string]interface{}{
"command": "/bin/date",
},
Env: map[string]string{
"FOO": "bar",
},
Services: []*structs.Service{
{
Name: "${TASK}-frontend",
PortLabel: "http",
Tags: []string{"pci:${meta.pci-dss}", "datacenter:${node.datacenter}"},
Checks: []*structs.ServiceCheck{
{
Name: "check-table",
Type: structs.ServiceCheckScript,
Command: "/usr/local/check-table-${meta.database}",
Args: []string{"${meta.version}"},
Interval: 30 * time.Second,
Timeout: 5 * time.Second,
},
},
},
{
Name: "${TASK}-admin",
PortLabel: "admin",
},
},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 256,
Networks: []*structs.NetworkResource{
{
MBits: 50,
DynamicPorts: []structs.Port{
{Label: "http"},
{Label: "admin"},
},
},
},
},
Meta: map[string]string{
"foo": "bar",
},
},
},
Meta: map[string]string{
"elb_check_type": "http",
"elb_check_interval": "30s",
"elb_check_min": "3",
},
},
},
Meta: map[string]string{
"owner": "armon",
},
Status: structs.JobStatusPending,
Version: 0,
CreateIndex: 42,
ModifyIndex: 99,
JobModifyIndex: 99,
}
job.Canonicalize()
return job
}

func BatchJob() *structs.Job {
job := &structs.Job{
Region: "global",
Expand Down
14 changes: 11 additions & 3 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3640,7 +3640,7 @@ func (j *Job) Stopped() bool {
// HasUpdateStrategy returns if any task group in the job has an update strategy
func (j *Job) HasUpdateStrategy() bool {
for _, tg := range j.TaskGroups {
if tg.Update != nil {
if !tg.Update.IsEmpty() {
return true
}
}
Expand Down Expand Up @@ -3969,8 +3969,8 @@ func (u *UpdateStrategy) Validate() error {
multierror.Append(&mErr, fmt.Errorf("Invalid health check given: %q", u.HealthCheck))
}

if u.MaxParallel < 1 {
multierror.Append(&mErr, fmt.Errorf("Max parallel can not be less than one: %d < 1", u.MaxParallel))
if u.MaxParallel < 0 {
multierror.Append(&mErr, fmt.Errorf("Max parallel can not be less than zero: %d < 0", u.MaxParallel))
}
if u.Canary < 0 {
multierror.Append(&mErr, fmt.Errorf("Canary count can not be less than zero: %d < 0", u.Canary))
Expand Down Expand Up @@ -4000,6 +4000,14 @@ func (u *UpdateStrategy) Validate() error {
return mErr.ErrorOrNil()
}

func (u *UpdateStrategy) IsEmpty() bool {
if u == nil {
return true
}

return u.MaxParallel == 0
}

// TODO(alexdadgar): Remove once no longer used by the scheduler.
// Rolling returns if a rolling strategy should be used
func (u *UpdateStrategy) Rolling() bool {
Expand Down
4 changes: 2 additions & 2 deletions nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1995,7 +1995,7 @@ func TestAffinity_Validate(t *testing.T) {

func TestUpdateStrategy_Validate(t *testing.T) {
u := &UpdateStrategy{
MaxParallel: 0,
MaxParallel: -1,
HealthCheck: "foo",
MinHealthyTime: -10,
HealthyDeadline: -15,
Expand All @@ -2009,7 +2009,7 @@ func TestUpdateStrategy_Validate(t *testing.T) {
if !strings.Contains(mErr.Errors[0].Error(), "Invalid health check given") {
t.Fatalf("err: %s", err)
}
if !strings.Contains(mErr.Errors[1].Error(), "Max parallel can not be less than one") {
if !strings.Contains(mErr.Errors[1].Error(), "Max parallel can not be less than zero") {
t.Fatalf("err: %s", err)
}
if !strings.Contains(mErr.Errors[2].Error(), "Canary count can not be less than zero") {
Expand Down
6 changes: 3 additions & 3 deletions scheduler/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
}
if !existingDeployment {
dstate = &structs.DeploymentState{}
if tg.Update != nil {
if !tg.Update.IsEmpty() {
dstate.AutoRevert = tg.Update.AutoRevert
dstate.AutoPromote = tg.Update.AutoPromote
dstate.ProgressDeadline = tg.Update.ProgressDeadline
Expand Down Expand Up @@ -509,7 +509,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
}

// Create a new deployment if necessary
if !existingDeployment && strategy != nil && dstate.DesiredTotal != 0 && (!hadRunning || updatingSpec) {
if !existingDeployment && !strategy.IsEmpty() && dstate.DesiredTotal != 0 && (!hadRunning || updatingSpec) {
// A previous group may have made the deployment already
if a.deployment == nil {
a.deployment = structs.NewDeployment(a.job)
Expand Down Expand Up @@ -618,7 +618,7 @@ func (a *allocReconciler) handleGroupCanaries(all allocSet, desiredChanges *stru
func (a *allocReconciler) computeLimit(group *structs.TaskGroup, untainted, destructive, migrate allocSet, canaryState bool) int {
// If there is no update strategy or deployment for the group we can deploy
// as many as the group has
if group.Update == nil || len(destructive)+len(migrate) == 0 {
if group.Update.IsEmpty() || len(destructive)+len(migrate) == 0 {
return group.Count
} else if a.deploymentPaused || a.deploymentFailed {
// If the deployment is paused or failed, do not create anything else
Expand Down
Loading

0 comments on commit c346a47

Please sign in to comment.