Skip to content

Commit

Permalink
per-task restart policy
Browse files Browse the repository at this point in the history
  • Loading branch information
Mahmood Ali committed Mar 7, 2020
1 parent 79ce20a commit e20527f
Show file tree
Hide file tree
Showing 9 changed files with 300 additions and 40 deletions.
217 changes: 203 additions & 14 deletions api/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,10 @@ func TestJobs_Canonicalize(t *testing.T) {
Migrate: DefaultMigrateStrategy(),
Tasks: []*Task{
{
KillTimeout: timeToPtr(5 * time.Second),
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
RestartPolicy: defaultServiceJobRestartPolicy(),
},
},
},
Expand Down Expand Up @@ -222,9 +223,10 @@ func TestJobs_Canonicalize(t *testing.T) {
},
Tasks: []*Task{
{
KillTimeout: timeToPtr(5 * time.Second),
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
RestartPolicy: defaultBatchJobRestartPolicy(),
},
},
},
Expand Down Expand Up @@ -316,10 +318,11 @@ func TestJobs_Canonicalize(t *testing.T) {
Migrate: DefaultMigrateStrategy(),
Tasks: []*Task{
{
Name: "task1",
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
Name: "task1",
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
RestartPolicy: defaultServiceJobRestartPolicy(),
},
},
},
Expand Down Expand Up @@ -363,6 +366,10 @@ func TestJobs_Canonicalize(t *testing.T) {
"db": 6379,
}},
},
RestartPolicy: &RestartPolicy{
// inherit other values from TG
Attempts: intToPtr(20),
},
Resources: &Resources{
CPU: intToPtr(500),
MemoryMB: intToPtr(256),
Expand Down Expand Up @@ -487,6 +494,12 @@ func TestJobs_Canonicalize(t *testing.T) {
"db": 6379,
}},
},
RestartPolicy: &RestartPolicy{
Interval: timeToPtr(5 * time.Minute),
Attempts: intToPtr(20),
Delay: timeToPtr(25 * time.Second),
Mode: stringToPtr("delay"),
},
Resources: &Resources{
CPU: intToPtr(500),
MemoryMB: intToPtr(256),
Expand Down Expand Up @@ -713,10 +726,11 @@ func TestJobs_Canonicalize(t *testing.T) {
Migrate: DefaultMigrateStrategy(),
Tasks: []*Task{
{
Name: "task1",
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
Name: "task1",
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
RestartPolicy: defaultServiceJobRestartPolicy(),
},
},
},
Expand Down Expand Up @@ -754,12 +768,187 @@ func TestJobs_Canonicalize(t *testing.T) {
AutoPromote: boolToPtr(false),
},
Migrate: DefaultMigrateStrategy(),
Tasks: []*Task{
{
Name: "task1",
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
RestartPolicy: defaultServiceJobRestartPolicy(),
},
},
},
},
},
},

{
name: "restart_merge",
input: &Job{
Name: stringToPtr("foo"),
ID: stringToPtr("bar"),
ParentID: stringToPtr("lol"),
TaskGroups: []*TaskGroup{
{
Name: stringToPtr("bar"),
RestartPolicy: &RestartPolicy{
Delay: timeToPtr(15 * time.Second),
Attempts: intToPtr(2),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr("fail"),
},
Tasks: []*Task{
{
Name: "task1",
RestartPolicy: &RestartPolicy{
Attempts: intToPtr(5),
Delay: timeToPtr(1 * time.Second),
},
},
},
},
{
Name: stringToPtr("baz"),
RestartPolicy: &RestartPolicy{
Delay: timeToPtr(20 * time.Second),
Attempts: intToPtr(2),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr("fail"),
},
Tasks: []*Task{
{
Name: "task1",
},
},
},
},
},
expected: &Job{
Namespace: stringToPtr(DefaultNamespace),
ID: stringToPtr("bar"),
Name: stringToPtr("foo"),
Region: stringToPtr("global"),
Type: stringToPtr("service"),
ParentID: stringToPtr("lol"),
Priority: intToPtr(50),
AllAtOnce: boolToPtr(false),
ConsulToken: stringToPtr(""),
VaultToken: stringToPtr(""),
Stop: boolToPtr(false),
Stable: boolToPtr(false),
Version: uint64ToPtr(0),
Status: stringToPtr(""),
StatusDescription: stringToPtr(""),
CreateIndex: uint64ToPtr(0),
ModifyIndex: uint64ToPtr(0),
JobModifyIndex: uint64ToPtr(0),
Update: &UpdateStrategy{
Stagger: timeToPtr(30 * time.Second),
MaxParallel: intToPtr(1),
HealthCheck: stringToPtr("checks"),
MinHealthyTime: timeToPtr(10 * time.Second),
HealthyDeadline: timeToPtr(5 * time.Minute),
ProgressDeadline: timeToPtr(10 * time.Minute),
AutoRevert: boolToPtr(false),
Canary: intToPtr(0),
AutoPromote: boolToPtr(false),
},
TaskGroups: []*TaskGroup{
{
Name: stringToPtr("bar"),
Count: intToPtr(1),
EphemeralDisk: &EphemeralDisk{
Sticky: boolToPtr(false),
Migrate: boolToPtr(false),
SizeMB: intToPtr(300),
},
RestartPolicy: &RestartPolicy{
Delay: timeToPtr(15 * time.Second),
Attempts: intToPtr(2),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr("fail"),
},
ReschedulePolicy: &ReschedulePolicy{
Attempts: intToPtr(0),
Interval: timeToPtr(0),
DelayFunction: stringToPtr("exponential"),
Delay: timeToPtr(30 * time.Second),
MaxDelay: timeToPtr(1 * time.Hour),
Unlimited: boolToPtr(true),
},
Update: &UpdateStrategy{
Stagger: timeToPtr(30 * time.Second),
MaxParallel: intToPtr(1),
HealthCheck: stringToPtr("checks"),
MinHealthyTime: timeToPtr(10 * time.Second),
HealthyDeadline: timeToPtr(5 * time.Minute),
ProgressDeadline: timeToPtr(10 * time.Minute),
AutoRevert: boolToPtr(false),
Canary: intToPtr(0),
AutoPromote: boolToPtr(false),
},
Migrate: DefaultMigrateStrategy(),
Tasks: []*Task{
{
Name: "task1",
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
RestartPolicy: &RestartPolicy{
Attempts: intToPtr(5),
Delay: timeToPtr(1 * time.Second),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr("fail"),
},
},
},
},
{
Name: stringToPtr("baz"),
Count: intToPtr(1),
EphemeralDisk: &EphemeralDisk{
Sticky: boolToPtr(false),
Migrate: boolToPtr(false),
SizeMB: intToPtr(300),
},
RestartPolicy: &RestartPolicy{
Delay: timeToPtr(20 * time.Second),
Attempts: intToPtr(2),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr("fail"),
},
ReschedulePolicy: &ReschedulePolicy{
Attempts: intToPtr(0),
Interval: timeToPtr(0),
DelayFunction: stringToPtr("exponential"),
Delay: timeToPtr(30 * time.Second),
MaxDelay: timeToPtr(1 * time.Hour),
Unlimited: boolToPtr(true),
},
Update: &UpdateStrategy{
Stagger: timeToPtr(30 * time.Second),
MaxParallel: intToPtr(1),
HealthCheck: stringToPtr("checks"),
MinHealthyTime: timeToPtr(10 * time.Second),
HealthyDeadline: timeToPtr(5 * time.Minute),
ProgressDeadline: timeToPtr(10 * time.Minute),
AutoRevert: boolToPtr(false),
Canary: intToPtr(0),
AutoPromote: boolToPtr(false),
},
Migrate: DefaultMigrateStrategy(),
Tasks: []*Task{
{
Name: "task1",
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
RestartPolicy: &RestartPolicy{
Delay: timeToPtr(20 * time.Second),
Attempts: intToPtr(2),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr("fail"),
},
},
},
},
Expand Down
56 changes: 37 additions & 19 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,9 +443,6 @@ func (g *TaskGroup) Canonicalize(job *Job) {
if g.Count == nil {
g.Count = intToPtr(1)
}
for _, t := range g.Tasks {
t.Canonicalize(g, job)
}
if g.EphemeralDisk == nil {
g.EphemeralDisk = DefaultEphemeralDisk()
} else {
Expand Down Expand Up @@ -505,30 +502,20 @@ func (g *TaskGroup) Canonicalize(job *Job) {
var defaultRestartPolicy *RestartPolicy
switch *job.Type {
case "service", "system":
// These needs to be in sync with DefaultServiceJobRestartPolicy in
// in nomad/structs/structs.go
defaultRestartPolicy = &RestartPolicy{
Delay: timeToPtr(15 * time.Second),
Attempts: intToPtr(2),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr(RestartPolicyModeFail),
}
defaultRestartPolicy = defaultServiceJobRestartPolicy()
default:
// These needs to be in sync with DefaultBatchJobRestartPolicy in
// in nomad/structs/structs.go
defaultRestartPolicy = &RestartPolicy{
Delay: timeToPtr(15 * time.Second),
Attempts: intToPtr(3),
Interval: timeToPtr(24 * time.Hour),
Mode: stringToPtr(RestartPolicyModeFail),
}
defaultRestartPolicy = defaultBatchJobRestartPolicy()
}

if g.RestartPolicy != nil {
defaultRestartPolicy.Merge(g.RestartPolicy)
}
g.RestartPolicy = defaultRestartPolicy

for _, t := range g.Tasks {
t.Canonicalize(g, job)
}

for _, spread := range g.Spreads {
spread.Canonicalize()
}
Expand All @@ -543,6 +530,28 @@ func (g *TaskGroup) Canonicalize(job *Job) {
}
}

// These needs to be in sync with DefaultServiceJobRestartPolicy in
// in nomad/structs/structs.go
func defaultServiceJobRestartPolicy() *RestartPolicy {
return &RestartPolicy{
Delay: timeToPtr(15 * time.Second),
Attempts: intToPtr(2),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr(RestartPolicyModeFail),
}
}

// These needs to be in sync with DefaultBatchJobRestartPolicy in
// in nomad/structs/structs.go
func defaultBatchJobRestartPolicy() *RestartPolicy {
return &RestartPolicy{
Delay: timeToPtr(15 * time.Second),
Attempts: intToPtr(3),
Interval: timeToPtr(24 * time.Hour),
Mode: stringToPtr(RestartPolicyModeFail),
}
}

// Constrain is used to add a constraint to a task group.
func (g *TaskGroup) Constrain(c *Constraint) *TaskGroup {
g.Constraints = append(g.Constraints, c)
Expand Down Expand Up @@ -620,6 +629,7 @@ type Task struct {
Env map[string]string
Services []*Service
Resources *Resources
RestartPolicy *RestartPolicy
Meta map[string]string
KillTimeout *time.Duration `mapstructure:"kill_timeout"`
LogConfig *LogConfig `mapstructure:"logs"`
Expand Down Expand Up @@ -665,6 +675,14 @@ func (t *Task) Canonicalize(tg *TaskGroup, job *Job) {
for _, vm := range t.VolumeMounts {
vm.Canonicalize()
}
if t.RestartPolicy == nil {
t.RestartPolicy = tg.RestartPolicy
} else {
tgrp := &RestartPolicy{}
*tgrp = *tg.RestartPolicy
tgrp.Merge(t.RestartPolicy)
t.RestartPolicy = tgrp
}
}

// TaskArtifact is used to download artifacts before running a task.
Expand Down
14 changes: 9 additions & 5 deletions client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,16 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
tr.taskResources = tres

// Build the restart tracker.
tg := tr.alloc.Job.LookupTaskGroup(tr.alloc.TaskGroup)
if tg == nil {
tr.logger.Error("alloc missing task group")
return nil, fmt.Errorf("alloc missing task group")
rp := config.Task.RestartPolicy
if rp == nil {
tg := tr.alloc.Job.LookupTaskGroup(tr.alloc.TaskGroup)
if tg == nil {
tr.logger.Error("alloc missing task group")
return nil, fmt.Errorf("alloc missing task group")
}
rp = tg.RestartPolicy
}
tr.restartTracker = restarts.NewRestartTracker(tg.RestartPolicy, tr.alloc.Job.Type)
tr.restartTracker = restarts.NewRestartTracker(rp, tr.alloc.Job.Type)

// Get the driver
if err := tr.initDriver(); err != nil {
Expand Down
Loading

0 comments on commit e20527f

Please sign in to comment.