Skip to content

Commit

Permalink
Merge pull request #7288 from hashicorp/f-task-restart-policy
Browse files Browse the repository at this point in the history
Support per-task RestartPolicy
  • Loading branch information
Mahmood Ali authored Mar 24, 2020
2 parents 10bdc6f + 8083022 commit b807491
Show file tree
Hide file tree
Showing 11 changed files with 312 additions and 42 deletions.
217 changes: 203 additions & 14 deletions api/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,10 @@ func TestJobs_Canonicalize(t *testing.T) {
Migrate: DefaultMigrateStrategy(),
Tasks: []*Task{
{
KillTimeout: timeToPtr(5 * time.Second),
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
RestartPolicy: defaultServiceJobRestartPolicy(),
},
},
},
Expand Down Expand Up @@ -222,9 +223,10 @@ func TestJobs_Canonicalize(t *testing.T) {
},
Tasks: []*Task{
{
KillTimeout: timeToPtr(5 * time.Second),
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
RestartPolicy: defaultBatchJobRestartPolicy(),
},
},
},
Expand Down Expand Up @@ -316,10 +318,11 @@ func TestJobs_Canonicalize(t *testing.T) {
Migrate: DefaultMigrateStrategy(),
Tasks: []*Task{
{
Name: "task1",
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
Name: "task1",
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
RestartPolicy: defaultServiceJobRestartPolicy(),
},
},
},
Expand Down Expand Up @@ -363,6 +366,10 @@ func TestJobs_Canonicalize(t *testing.T) {
"db": 6379,
}},
},
RestartPolicy: &RestartPolicy{
// inherit other values from TG
Attempts: intToPtr(20),
},
Resources: &Resources{
CPU: intToPtr(500),
MemoryMB: intToPtr(256),
Expand Down Expand Up @@ -486,6 +493,12 @@ func TestJobs_Canonicalize(t *testing.T) {
"db": 6379,
}},
},
RestartPolicy: &RestartPolicy{
Interval: timeToPtr(5 * time.Minute),
Attempts: intToPtr(20),
Delay: timeToPtr(25 * time.Second),
Mode: stringToPtr("delay"),
},
Resources: &Resources{
CPU: intToPtr(500),
MemoryMB: intToPtr(256),
Expand Down Expand Up @@ -712,10 +725,11 @@ func TestJobs_Canonicalize(t *testing.T) {
Migrate: DefaultMigrateStrategy(),
Tasks: []*Task{
{
Name: "task1",
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
Name: "task1",
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
RestartPolicy: defaultServiceJobRestartPolicy(),
},
},
},
Expand Down Expand Up @@ -753,12 +767,187 @@ func TestJobs_Canonicalize(t *testing.T) {
AutoPromote: boolToPtr(false),
},
Migrate: DefaultMigrateStrategy(),
Tasks: []*Task{
{
Name: "task1",
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
RestartPolicy: defaultServiceJobRestartPolicy(),
},
},
},
},
},
},

{
name: "restart_merge",
input: &Job{
Name: stringToPtr("foo"),
ID: stringToPtr("bar"),
ParentID: stringToPtr("lol"),
TaskGroups: []*TaskGroup{
{
Name: stringToPtr("bar"),
RestartPolicy: &RestartPolicy{
Delay: timeToPtr(15 * time.Second),
Attempts: intToPtr(2),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr("fail"),
},
Tasks: []*Task{
{
Name: "task1",
RestartPolicy: &RestartPolicy{
Attempts: intToPtr(5),
Delay: timeToPtr(1 * time.Second),
},
},
},
},
{
Name: stringToPtr("baz"),
RestartPolicy: &RestartPolicy{
Delay: timeToPtr(20 * time.Second),
Attempts: intToPtr(2),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr("fail"),
},
Tasks: []*Task{
{
Name: "task1",
},
},
},
},
},
expected: &Job{
Namespace: stringToPtr(DefaultNamespace),
ID: stringToPtr("bar"),
Name: stringToPtr("foo"),
Region: stringToPtr("global"),
Type: stringToPtr("service"),
ParentID: stringToPtr("lol"),
Priority: intToPtr(50),
AllAtOnce: boolToPtr(false),
ConsulToken: stringToPtr(""),
VaultToken: stringToPtr(""),
Stop: boolToPtr(false),
Stable: boolToPtr(false),
Version: uint64ToPtr(0),
Status: stringToPtr(""),
StatusDescription: stringToPtr(""),
CreateIndex: uint64ToPtr(0),
ModifyIndex: uint64ToPtr(0),
JobModifyIndex: uint64ToPtr(0),
Update: &UpdateStrategy{
Stagger: timeToPtr(30 * time.Second),
MaxParallel: intToPtr(1),
HealthCheck: stringToPtr("checks"),
MinHealthyTime: timeToPtr(10 * time.Second),
HealthyDeadline: timeToPtr(5 * time.Minute),
ProgressDeadline: timeToPtr(10 * time.Minute),
AutoRevert: boolToPtr(false),
Canary: intToPtr(0),
AutoPromote: boolToPtr(false),
},
TaskGroups: []*TaskGroup{
{
Name: stringToPtr("bar"),
Count: intToPtr(1),
EphemeralDisk: &EphemeralDisk{
Sticky: boolToPtr(false),
Migrate: boolToPtr(false),
SizeMB: intToPtr(300),
},
RestartPolicy: &RestartPolicy{
Delay: timeToPtr(15 * time.Second),
Attempts: intToPtr(2),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr("fail"),
},
ReschedulePolicy: &ReschedulePolicy{
Attempts: intToPtr(0),
Interval: timeToPtr(0),
DelayFunction: stringToPtr("exponential"),
Delay: timeToPtr(30 * time.Second),
MaxDelay: timeToPtr(1 * time.Hour),
Unlimited: boolToPtr(true),
},
Update: &UpdateStrategy{
Stagger: timeToPtr(30 * time.Second),
MaxParallel: intToPtr(1),
HealthCheck: stringToPtr("checks"),
MinHealthyTime: timeToPtr(10 * time.Second),
HealthyDeadline: timeToPtr(5 * time.Minute),
ProgressDeadline: timeToPtr(10 * time.Minute),
AutoRevert: boolToPtr(false),
Canary: intToPtr(0),
AutoPromote: boolToPtr(false),
},
Migrate: DefaultMigrateStrategy(),
Tasks: []*Task{
{
Name: "task1",
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
RestartPolicy: &RestartPolicy{
Attempts: intToPtr(5),
Delay: timeToPtr(1 * time.Second),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr("fail"),
},
},
},
},
{
Name: stringToPtr("baz"),
Count: intToPtr(1),
EphemeralDisk: &EphemeralDisk{
Sticky: boolToPtr(false),
Migrate: boolToPtr(false),
SizeMB: intToPtr(300),
},
RestartPolicy: &RestartPolicy{
Delay: timeToPtr(20 * time.Second),
Attempts: intToPtr(2),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr("fail"),
},
ReschedulePolicy: &ReschedulePolicy{
Attempts: intToPtr(0),
Interval: timeToPtr(0),
DelayFunction: stringToPtr("exponential"),
Delay: timeToPtr(30 * time.Second),
MaxDelay: timeToPtr(1 * time.Hour),
Unlimited: boolToPtr(true),
},
Update: &UpdateStrategy{
Stagger: timeToPtr(30 * time.Second),
MaxParallel: intToPtr(1),
HealthCheck: stringToPtr("checks"),
MinHealthyTime: timeToPtr(10 * time.Second),
HealthyDeadline: timeToPtr(5 * time.Minute),
ProgressDeadline: timeToPtr(10 * time.Minute),
AutoRevert: boolToPtr(false),
Canary: intToPtr(0),
AutoPromote: boolToPtr(false),
},
Migrate: DefaultMigrateStrategy(),
Tasks: []*Task{
{
Name: "task1",
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
RestartPolicy: &RestartPolicy{
Delay: timeToPtr(20 * time.Second),
Attempts: intToPtr(2),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr("fail"),
},
},
},
},
Expand Down
56 changes: 37 additions & 19 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,9 +453,6 @@ func (g *TaskGroup) Canonicalize(job *Job) {
if g.Scaling != nil {
g.Scaling.Canonicalize(*g.Count)
}
for _, t := range g.Tasks {
t.Canonicalize(g, job)
}
if g.EphemeralDisk == nil {
g.EphemeralDisk = DefaultEphemeralDisk()
} else {
Expand Down Expand Up @@ -515,30 +512,20 @@ func (g *TaskGroup) Canonicalize(job *Job) {
var defaultRestartPolicy *RestartPolicy
switch *job.Type {
case "service", "system":
// These needs to be in sync with DefaultServiceJobRestartPolicy in
// in nomad/structs/structs.go
defaultRestartPolicy = &RestartPolicy{
Delay: timeToPtr(15 * time.Second),
Attempts: intToPtr(2),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr(RestartPolicyModeFail),
}
defaultRestartPolicy = defaultServiceJobRestartPolicy()
default:
// These needs to be in sync with DefaultBatchJobRestartPolicy in
// in nomad/structs/structs.go
defaultRestartPolicy = &RestartPolicy{
Delay: timeToPtr(15 * time.Second),
Attempts: intToPtr(3),
Interval: timeToPtr(24 * time.Hour),
Mode: stringToPtr(RestartPolicyModeFail),
}
defaultRestartPolicy = defaultBatchJobRestartPolicy()
}

if g.RestartPolicy != nil {
defaultRestartPolicy.Merge(g.RestartPolicy)
}
g.RestartPolicy = defaultRestartPolicy

for _, t := range g.Tasks {
t.Canonicalize(g, job)
}

for _, spread := range g.Spreads {
spread.Canonicalize()
}
Expand All @@ -553,6 +540,28 @@ func (g *TaskGroup) Canonicalize(job *Job) {
}
}

// These needs to be in sync with DefaultServiceJobRestartPolicy in
// in nomad/structs/structs.go
func defaultServiceJobRestartPolicy() *RestartPolicy {
return &RestartPolicy{
Delay: timeToPtr(15 * time.Second),
Attempts: intToPtr(2),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr(RestartPolicyModeFail),
}
}

// These needs to be in sync with DefaultBatchJobRestartPolicy in
// in nomad/structs/structs.go
func defaultBatchJobRestartPolicy() *RestartPolicy {
return &RestartPolicy{
Delay: timeToPtr(15 * time.Second),
Attempts: intToPtr(3),
Interval: timeToPtr(24 * time.Hour),
Mode: stringToPtr(RestartPolicyModeFail),
}
}

// Constrain is used to add a constraint to a task group.
func (g *TaskGroup) Constrain(c *Constraint) *TaskGroup {
g.Constraints = append(g.Constraints, c)
Expand Down Expand Up @@ -645,6 +654,7 @@ type Task struct {
Env map[string]string
Services []*Service
Resources *Resources
RestartPolicy *RestartPolicy
Meta map[string]string
KillTimeout *time.Duration `mapstructure:"kill_timeout"`
LogConfig *LogConfig `mapstructure:"logs"`
Expand Down Expand Up @@ -697,6 +707,14 @@ func (t *Task) Canonicalize(tg *TaskGroup, job *Job) {
if t.CSIPluginConfig != nil {
t.CSIPluginConfig.Canonicalize()
}
if t.RestartPolicy == nil {
t.RestartPolicy = tg.RestartPolicy
} else {
tgrp := &RestartPolicy{}
*tgrp = *tg.RestartPolicy
tgrp.Merge(t.RestartPolicy)
t.RestartPolicy = tgrp
}
}

// TaskArtifact is used to download artifacts before running a task.
Expand Down
4 changes: 3 additions & 1 deletion client/alloc_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,12 @@ func TestAllocations_GarbageCollect(t *testing.T) {

a := mock.Alloc()
a.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
a.Job.TaskGroups[0].RestartPolicy = &nstructs.RestartPolicy{
rp := &nstructs.RestartPolicy{
Attempts: 0,
Mode: nstructs.RestartPolicyModeFail,
}
a.Job.TaskGroups[0].RestartPolicy = rp
a.Job.TaskGroups[0].Tasks[0].RestartPolicy = rp
a.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "10ms",
}
Expand Down
Loading

0 comments on commit b807491

Please sign in to comment.