diff --git a/api/allocations.go b/api/allocations.go index 42b5c8636b3..68047ee5b46 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -86,6 +86,7 @@ type Allocation struct { TaskStates map[string]*TaskState DeploymentID string DeploymentStatus *AllocDeploymentStatus + FollowupEvalID string PreviousAllocation string NextAllocation string RescheduleTracker *RescheduleTracker @@ -129,6 +130,7 @@ type AllocationListStub struct { TaskStates map[string]*TaskState DeploymentStatus *AllocDeploymentStatus RescheduleTracker *RescheduleTracker + FollowupEvalID string CreateIndex uint64 ModifyIndex uint64 CreateTime int64 diff --git a/api/evaluations.go b/api/evaluations.go index 5aa893469ee..f8ec9432923 100644 --- a/api/evaluations.go +++ b/api/evaluations.go @@ -67,6 +67,7 @@ type Evaluation struct { Status string StatusDescription string Wait time.Duration + WaitUntil time.Time NextEval string PreviousEval string BlockedEval string diff --git a/api/jobs_test.go b/api/jobs_test.go index dc2dd351ffd..5d7a2675038 100644 --- a/api/jobs_test.go +++ b/api/jobs_test.go @@ -136,8 +136,12 @@ func TestJobs_Canonicalize(t *testing.T) { Mode: helper.StringToPtr("fail"), }, ReschedulePolicy: &ReschedulePolicy{ - Attempts: helper.IntToPtr(2), - Interval: helper.TimeToPtr(1 * time.Hour), + Attempts: helper.IntToPtr(0), + Interval: helper.TimeToPtr(0), + DelayFunction: helper.StringToPtr("exponential"), + Delay: helper.TimeToPtr(30 * time.Second), + MaxDelay: helper.TimeToPtr(1 * time.Hour), + Unlimited: helper.BoolToPtr(true), }, Tasks: []*Task{ { @@ -202,8 +206,12 @@ func TestJobs_Canonicalize(t *testing.T) { Mode: helper.StringToPtr("fail"), }, ReschedulePolicy: &ReschedulePolicy{ - Attempts: helper.IntToPtr(2), - Interval: helper.TimeToPtr(1 * time.Hour), + Attempts: helper.IntToPtr(0), + Interval: helper.TimeToPtr(0), + DelayFunction: helper.StringToPtr("exponential"), + Delay: helper.TimeToPtr(30 * time.Second), + MaxDelay: helper.TimeToPtr(1 * time.Hour), + Unlimited: helper.BoolToPtr(true), }, Tasks: []*Task{ { @@ -335,8 +343,12 @@ func TestJobs_Canonicalize(t *testing.T) { Mode: helper.StringToPtr("delay"), }, ReschedulePolicy: &ReschedulePolicy{ - Attempts: helper.IntToPtr(2), - Interval: helper.TimeToPtr(1 * time.Hour), + Attempts: helper.IntToPtr(0), + Interval: helper.TimeToPtr(0), + DelayFunction: helper.StringToPtr("exponential"), + Delay: helper.TimeToPtr(30 * time.Second), + MaxDelay: helper.TimeToPtr(1 * time.Hour), + Unlimited: helper.BoolToPtr(true), }, EphemeralDisk: &EphemeralDisk{ Sticky: helper.BoolToPtr(false), @@ -550,8 +562,12 @@ func TestJobs_Canonicalize(t *testing.T) { Mode: helper.StringToPtr("fail"), }, ReschedulePolicy: &ReschedulePolicy{ - Attempts: helper.IntToPtr(2), - Interval: helper.TimeToPtr(1 * time.Hour), + Attempts: helper.IntToPtr(0), + Interval: helper.TimeToPtr(0), + DelayFunction: helper.StringToPtr("exponential"), + Delay: helper.TimeToPtr(30 * time.Second), + MaxDelay: helper.TimeToPtr(1 * time.Hour), + Unlimited: helper.BoolToPtr(true), }, Update: &UpdateStrategy{ Stagger: helper.TimeToPtr(2 * time.Second), @@ -586,8 +602,12 @@ func TestJobs_Canonicalize(t *testing.T) { Mode: helper.StringToPtr("fail"), }, ReschedulePolicy: &ReschedulePolicy{ - Attempts: helper.IntToPtr(2), - Interval: helper.TimeToPtr(1 * time.Hour), + Attempts: helper.IntToPtr(0), + Interval: helper.TimeToPtr(0), + DelayFunction: helper.StringToPtr("exponential"), + Delay: helper.TimeToPtr(30 * time.Second), + MaxDelay: helper.TimeToPtr(1 * time.Hour), + Unlimited: helper.BoolToPtr(true), }, Update: &UpdateStrategy{ Stagger: helper.TimeToPtr(1 * time.Second), diff --git a/api/search_test.go b/api/search_test.go index e62d9966745..892394c39eb 100644 --- a/api/search_test.go +++ b/api/search_test.go @@ -4,11 +4,11 @@ import ( "testing" "github.com/hashicorp/nomad/api/contexts" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestSearch_List(t *testing.T) { - assert := assert.New(t) + require := require.New(t) t.Parallel() c, s := makeClient(t, nil, nil) @@ -16,16 +16,17 @@ func TestSearch_List(t *testing.T) { job := testJob() _, _, err := c.Jobs().Register(job, nil) - assert.Nil(err) + require.Nil(err) id := *job.ID prefix := id[:len(id)-2] resp, qm, err := c.Search().PrefixSearch(prefix, contexts.Jobs, nil) - assert.Nil(err) - assert.NotNil(qm) + require.Nil(err) + require.NotNil(qm) + require.NotNil(qm) jobMatches := resp.Matches[contexts.Jobs] - assert.Equal(1, len(jobMatches)) - assert.Equal(id, jobMatches[0]) + require.Equal(1, len(jobMatches)) + require.Equal(id, jobMatches[0]) } diff --git a/api/tasks.go b/api/tasks.go index 69a8022f935..5bf95780055 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -86,6 +86,20 @@ type ReschedulePolicy struct { // Interval is a duration in which we can limit the number of reschedule attempts. Interval *time.Duration `mapstructure:"interval"` + + // Delay is a minimum duration to wait between reschedule attempts. + // The delay function determines how much subsequent reschedule attempts are delayed by. + Delay *time.Duration `mapstructure:"delay"` + + // DelayFunction determines how the delay progressively changes on subsequent reschedule + // attempts. Valid values are "exponential", "linear", and "fibonacci". + DelayFunction *string `mapstructure:"delay_function"` + + // MaxDelay is an upper bound on the delay. + MaxDelay *time.Duration `mapstructure:"max_delay"` + + // Unlimited allows rescheduling attempts until they succeed + Unlimited *bool `mapstructure:"unlimited"` } func (r *ReschedulePolicy) Merge(rp *ReschedulePolicy) { @@ -95,6 +109,18 @@ func (r *ReschedulePolicy) Merge(rp *ReschedulePolicy) { if rp.Attempts != nil { r.Attempts = rp.Attempts } + if rp.Delay != nil { + r.Delay = rp.Delay + } + if rp.DelayFunction != nil { + r.DelayFunction = rp.DelayFunction + } + if rp.MaxDelay != nil { + r.MaxDelay = rp.MaxDelay + } + if rp.Unlimited != nil { + r.Unlimited = rp.Unlimited + } } func (r *ReschedulePolicy) Copy() *ReschedulePolicy { @@ -316,13 +342,21 @@ func (g *TaskGroup) Canonicalize(job *Job) { switch *job.Type { case "service": defaultReschedulePolicy = &ReschedulePolicy{ - Attempts: helper.IntToPtr(structs.DefaultServiceJobReschedulePolicy.Attempts), - Interval: helper.TimeToPtr(structs.DefaultServiceJobReschedulePolicy.Interval), + Attempts: helper.IntToPtr(structs.DefaultServiceJobReschedulePolicy.Attempts), + Interval: helper.TimeToPtr(structs.DefaultServiceJobReschedulePolicy.Interval), + Delay: helper.TimeToPtr(structs.DefaultServiceJobReschedulePolicy.Delay), + DelayFunction: helper.StringToPtr(structs.DefaultServiceJobReschedulePolicy.DelayFunction), + MaxDelay: helper.TimeToPtr(structs.DefaultServiceJobReschedulePolicy.MaxDelay), + Unlimited: helper.BoolToPtr(structs.DefaultServiceJobReschedulePolicy.Unlimited), } case "batch": defaultReschedulePolicy = &ReschedulePolicy{ - Attempts: helper.IntToPtr(structs.DefaultBatchJobReschedulePolicy.Attempts), - Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval), + Attempts: helper.IntToPtr(structs.DefaultBatchJobReschedulePolicy.Attempts), + Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval), + Delay: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Delay), + DelayFunction: helper.StringToPtr(structs.DefaultBatchJobReschedulePolicy.DelayFunction), + MaxDelay: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.MaxDelay), + Unlimited: helper.BoolToPtr(structs.DefaultBatchJobReschedulePolicy.Unlimited), } default: defaultReschedulePolicy = &ReschedulePolicy{ diff --git a/api/tasks_test.go b/api/tasks_test.go index 37c47d51429..3280507ad59 100644 --- a/api/tasks_test.go +++ b/api/tasks_test.go @@ -284,70 +284,115 @@ func TestTaskGroup_Canonicalize_ReschedulePolicy(t *testing.T) { jobReschedulePolicy: nil, taskReschedulePolicy: nil, expected: &ReschedulePolicy{ - Attempts: helper.IntToPtr(structs.DefaultBatchJobReschedulePolicy.Attempts), - Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval), + Attempts: helper.IntToPtr(structs.DefaultBatchJobReschedulePolicy.Attempts), + Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval), + Delay: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Delay), + DelayFunction: helper.StringToPtr(structs.DefaultBatchJobReschedulePolicy.DelayFunction), + MaxDelay: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.MaxDelay), + Unlimited: helper.BoolToPtr(structs.DefaultBatchJobReschedulePolicy.Unlimited), }, }, { desc: "Empty job reschedule policy", jobReschedulePolicy: &ReschedulePolicy{ - Attempts: helper.IntToPtr(0), - Interval: helper.TimeToPtr(0), + Attempts: helper.IntToPtr(0), + Interval: helper.TimeToPtr(0), + Delay: helper.TimeToPtr(0), + MaxDelay: helper.TimeToPtr(0), + DelayFunction: helper.StringToPtr(""), + Unlimited: helper.BoolToPtr(false), }, taskReschedulePolicy: nil, expected: &ReschedulePolicy{ - Attempts: helper.IntToPtr(0), - Interval: helper.TimeToPtr(0), + Attempts: helper.IntToPtr(0), + Interval: helper.TimeToPtr(0), + Delay: helper.TimeToPtr(0), + MaxDelay: helper.TimeToPtr(0), + DelayFunction: helper.StringToPtr(""), + Unlimited: helper.BoolToPtr(false), }, }, { desc: "Inherit from job", jobReschedulePolicy: &ReschedulePolicy{ - Attempts: helper.IntToPtr(1), - Interval: helper.TimeToPtr(20 * time.Second), + Attempts: helper.IntToPtr(1), + Interval: helper.TimeToPtr(20 * time.Second), + Delay: helper.TimeToPtr(20 * time.Second), + MaxDelay: helper.TimeToPtr(10 * time.Minute), + DelayFunction: helper.StringToPtr("linear"), + Unlimited: helper.BoolToPtr(false), }, taskReschedulePolicy: nil, expected: &ReschedulePolicy{ - Attempts: helper.IntToPtr(1), - Interval: helper.TimeToPtr(20 * time.Second), + Attempts: helper.IntToPtr(1), + Interval: helper.TimeToPtr(20 * time.Second), + Delay: helper.TimeToPtr(20 * time.Second), + MaxDelay: helper.TimeToPtr(10 * time.Minute), + DelayFunction: helper.StringToPtr("linear"), + Unlimited: helper.BoolToPtr(false), }, }, { desc: "Set in task", jobReschedulePolicy: nil, taskReschedulePolicy: &ReschedulePolicy{ - Attempts: helper.IntToPtr(5), - Interval: helper.TimeToPtr(2 * time.Minute), + Attempts: helper.IntToPtr(5), + Interval: helper.TimeToPtr(2 * time.Minute), + Delay: helper.TimeToPtr(20 * time.Second), + MaxDelay: helper.TimeToPtr(10 * time.Minute), + DelayFunction: helper.StringToPtr("linear"), + Unlimited: helper.BoolToPtr(false), }, expected: &ReschedulePolicy{ - Attempts: helper.IntToPtr(5), - Interval: helper.TimeToPtr(2 * time.Minute), + Attempts: helper.IntToPtr(5), + Interval: helper.TimeToPtr(2 * time.Minute), + Delay: helper.TimeToPtr(20 * time.Second), + MaxDelay: helper.TimeToPtr(10 * time.Minute), + DelayFunction: helper.StringToPtr("linear"), + Unlimited: helper.BoolToPtr(false), }, }, { desc: "Merge from job", jobReschedulePolicy: &ReschedulePolicy{ Attempts: helper.IntToPtr(1), + Delay: helper.TimeToPtr(20 * time.Second), + MaxDelay: helper.TimeToPtr(10 * time.Minute), }, taskReschedulePolicy: &ReschedulePolicy{ - Interval: helper.TimeToPtr(5 * time.Minute), + Interval: helper.TimeToPtr(5 * time.Minute), + DelayFunction: helper.StringToPtr("linear"), + Unlimited: helper.BoolToPtr(false), }, expected: &ReschedulePolicy{ - Attempts: helper.IntToPtr(1), - Interval: helper.TimeToPtr(5 * time.Minute), + Attempts: helper.IntToPtr(1), + Interval: helper.TimeToPtr(5 * time.Minute), + Delay: helper.TimeToPtr(20 * time.Second), + MaxDelay: helper.TimeToPtr(10 * time.Minute), + DelayFunction: helper.StringToPtr("linear"), + Unlimited: helper.BoolToPtr(false), }, }, { desc: "Override from group", jobReschedulePolicy: &ReschedulePolicy{ Attempts: helper.IntToPtr(1), + MaxDelay: helper.TimeToPtr(10 * time.Second), }, taskReschedulePolicy: &ReschedulePolicy{ - Attempts: helper.IntToPtr(5), + Attempts: helper.IntToPtr(5), + Delay: helper.TimeToPtr(20 * time.Second), + MaxDelay: helper.TimeToPtr(20 * time.Minute), + DelayFunction: helper.StringToPtr("linear"), + Unlimited: helper.BoolToPtr(false), }, expected: &ReschedulePolicy{ - Attempts: helper.IntToPtr(5), - Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval), + Attempts: helper.IntToPtr(5), + Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval), + Delay: helper.TimeToPtr(20 * time.Second), + MaxDelay: helper.TimeToPtr(20 * time.Minute), + DelayFunction: helper.StringToPtr("linear"), + Unlimited: helper.BoolToPtr(false), }, }, { @@ -357,8 +402,12 @@ func TestTaskGroup_Canonicalize_ReschedulePolicy(t *testing.T) { }, taskReschedulePolicy: nil, expected: &ReschedulePolicy{ - Attempts: helper.IntToPtr(1), - Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval), + Attempts: helper.IntToPtr(1), + Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval), + Delay: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Delay), + DelayFunction: helper.StringToPtr(structs.DefaultBatchJobReschedulePolicy.DelayFunction), + MaxDelay: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.MaxDelay), + Unlimited: helper.BoolToPtr(structs.DefaultBatchJobReschedulePolicy.Unlimited), }, }, } diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index c661e4b0bdd..2012513a79b 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -639,8 +639,12 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) { } tg.ReschedulePolicy = &structs.ReschedulePolicy{ - Attempts: *taskGroup.ReschedulePolicy.Attempts, - Interval: *taskGroup.ReschedulePolicy.Interval, + Attempts: *taskGroup.ReschedulePolicy.Attempts, + Interval: *taskGroup.ReschedulePolicy.Interval, + Delay: *taskGroup.ReschedulePolicy.Delay, + DelayFunction: *taskGroup.ReschedulePolicy.DelayFunction, + MaxDelay: *taskGroup.ReschedulePolicy.MaxDelay, + Unlimited: *taskGroup.ReschedulePolicy.Unlimited, } tg.EphemeralDisk = &structs.EphemeralDisk{ diff --git a/command/agent/job_endpoint_test.go b/command/agent/job_endpoint_test.go index 2cf2e6fa8a7..dc50dd77e66 100644 --- a/command/agent/job_endpoint_test.go +++ b/command/agent/job_endpoint_test.go @@ -1172,8 +1172,12 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) { Mode: helper.StringToPtr("delay"), }, ReschedulePolicy: &api.ReschedulePolicy{ - Interval: helper.TimeToPtr(12 * time.Hour), - Attempts: helper.IntToPtr(5), + Interval: helper.TimeToPtr(12 * time.Hour), + Attempts: helper.IntToPtr(5), + DelayFunction: helper.StringToPtr("linear"), + Delay: helper.TimeToPtr(30 * time.Second), + Unlimited: helper.BoolToPtr(true), + MaxDelay: helper.TimeToPtr(20 * time.Minute), }, EphemeralDisk: &api.EphemeralDisk{ SizeMB: helper.IntToPtr(100), @@ -1384,8 +1388,12 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) { Mode: "delay", }, ReschedulePolicy: &structs.ReschedulePolicy{ - Interval: 12 * time.Hour, - Attempts: 5, + Interval: 12 * time.Hour, + Attempts: 5, + DelayFunction: "linear", + Delay: 30 * time.Second, + Unlimited: true, + MaxDelay: 20 * time.Minute, }, EphemeralDisk: &structs.EphemeralDisk{ SizeMB: 100, diff --git a/command/alloc_status.go b/command/alloc_status.go index 1bea22610d7..77455cdebf8 100644 --- a/command/alloc_status.go +++ b/command/alloc_status.go @@ -283,6 +283,13 @@ func formatAllocBasicInfo(alloc *api.Allocation, client *api.Client, uuidLength basic = append(basic, fmt.Sprintf("Replacement Alloc ID|%s", limit(alloc.NextAllocation, uuidLength))) } + if alloc.FollowupEvalID != "" { + nextEvalTime := futureEvalTimePretty(alloc.FollowupEvalID, client) + if nextEvalTime != "" { + basic = append(basic, + fmt.Sprintf("Reschedule Eligibility|%s", nextEvalTime)) + } + } if verbose { basic = append(basic, @@ -296,6 +303,18 @@ func formatAllocBasicInfo(alloc *api.Allocation, client *api.Client, uuidLength return formatKV(basic), nil } +// futureEvalTimePretty returns when the eval is eligible to reschedule +// relative to current time, based on the WaitUntil field +func futureEvalTimePretty(evalID string, client *api.Client) string { + evaluation, _, err := client.Evaluations().Info(evalID, nil) + // Eval time is not a critical output, + // don't return it on errors, if its not set or already in the past + if err != nil || evaluation.WaitUntil.IsZero() || time.Now().After(evaluation.WaitUntil) { + return "" + } + return prettyTimeDiff(evaluation.WaitUntil, time.Now()) +} + // outputTaskDetails prints task details for each task in the allocation, // optionally printing verbose statistics if displayStats is set func (c *AllocStatusCommand) outputTaskDetails(alloc *api.Allocation, stats *api.AllocResourceUsage, displayStats bool) { diff --git a/command/helpers.go b/command/helpers.go index 68f0f15bc1a..9b1b6fed86b 100644 --- a/command/helpers.go +++ b/command/helpers.go @@ -103,7 +103,15 @@ func prettyTimeDiff(first, second time.Time) string { second = second.Round(time.Second) // calculate time difference in seconds - d := second.Sub(first) + var d time.Duration + messageSuffix := "ago" + if second.Equal(first) || second.After(first) { + d = second.Sub(first) + } else { + d = first.Sub(second) + messageSuffix = "from now" + } + u := uint64(d.Seconds()) var buf [32]byte @@ -183,9 +191,9 @@ func prettyTimeDiff(first, second time.Time) string { end = indexes[num_periods-3] } if start == end { //edge case when time difference is less than a second - return "0s ago" + return "0s " + messageSuffix } else { - return string(buf[start:end]) + " ago" + return string(buf[start:end]) + " " + messageSuffix } } diff --git a/command/helpers_test.go b/command/helpers_test.go index a639a6dfd3c..2bf88939d06 100644 --- a/command/helpers_test.go +++ b/command/helpers_test.go @@ -310,6 +310,7 @@ func TestPrettyTimeDiff(t *testing.T) { {now, now.Add(-60 * time.Minute), "1h ago"}, {now, now.Add(-80 * time.Minute), "1h20m ago"}, {now, now.Add(-6 * time.Hour), "6h ago"}, + {now.Add(-6 * time.Hour), now, "6h from now"}, {now, now.Add(-22165 * time.Second), "6h9m ago"}, {now, now.Add(-100 * time.Hour), "4d4h ago"}, {now, now.Add(-438000 * time.Minute), "10mo4d ago"}, diff --git a/jobspec/parse.go b/jobspec/parse.go index 53dc9c5fca6..d6f235e05f2 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -446,6 +446,10 @@ func parseReschedulePolicy(final **api.ReschedulePolicy, list *ast.ObjectList) e valid := []string{ "attempts", "interval", + "unlimited", + "delay", + "max_delay", + "delay_function", } if err := helper.CheckHCLKeys(obj.Val, valid); err != nil { return err diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 90901ba16cd..dbf1200570e 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -679,8 +679,42 @@ func TestParse(t *testing.T) { Type: helper.StringToPtr("batch"), Datacenters: []string{"dc1"}, Reschedule: &api.ReschedulePolicy{ - Attempts: helper.IntToPtr(15), - Interval: helper.TimeToPtr(30 * time.Minute), + Attempts: helper.IntToPtr(15), + Interval: helper.TimeToPtr(30 * time.Minute), + DelayFunction: helper.StringToPtr("linear"), + Delay: helper.TimeToPtr(10 * time.Second), + }, + TaskGroups: []*api.TaskGroup{ + { + Name: helper.StringToPtr("bar"), + Count: helper.IntToPtr(3), + Tasks: []*api.Task{ + { + Name: "bar", + Driver: "raw_exec", + Config: map[string]interface{}{ + "command": "bash", + "args": []interface{}{"-c", "echo hi"}, + }, + }, + }, + }, + }, + }, + false, + }, + { + "reschedule-job-unlimited.hcl", + &api.Job{ + ID: helper.StringToPtr("foo"), + Name: helper.StringToPtr("foo"), + Type: helper.StringToPtr("batch"), + Datacenters: []string{"dc1"}, + Reschedule: &api.ReschedulePolicy{ + DelayFunction: helper.StringToPtr("exponential"), + Delay: helper.TimeToPtr(10 * time.Second), + MaxDelay: helper.TimeToPtr(120 * time.Second), + Unlimited: helper.BoolToPtr(true), }, TaskGroups: []*api.TaskGroup{ { diff --git a/jobspec/test-fixtures/reschedule-job-unlimited.hcl b/jobspec/test-fixtures/reschedule-job-unlimited.hcl new file mode 100644 index 00000000000..c51be506a56 --- /dev/null +++ b/jobspec/test-fixtures/reschedule-job-unlimited.hcl @@ -0,0 +1,20 @@ +job "foo" { + datacenters = ["dc1"] + type = "batch" + reschedule { + delay = "10s", + delay_function = "exponential" + max_delay="120s" + unlimited = true + } + group "bar" { + count = 3 + task "bar" { + driver = "raw_exec" + config { + command = "bash" + args = ["-c", "echo hi"] + } + } + } +} diff --git a/jobspec/test-fixtures/reschedule-job.hcl b/jobspec/test-fixtures/reschedule-job.hcl index 323fef88267..48fd7167067 100644 --- a/jobspec/test-fixtures/reschedule-job.hcl +++ b/jobspec/test-fixtures/reschedule-job.hcl @@ -4,6 +4,8 @@ job "foo" { reschedule { attempts = 15 interval = "30m" + delay = "10s", + delay_function = "linear" } group "bar" { count = 3 diff --git a/lib/delay_heap.go b/lib/delay_heap.go new file mode 100644 index 00000000000..15558319857 --- /dev/null +++ b/lib/delay_heap.go @@ -0,0 +1,167 @@ +package lib + +import ( + "container/heap" + "fmt" + "time" + + "github.com/hashicorp/nomad/nomad/structs" +) + +// DelayHeap wraps a heap and gives operations other than Push/Pop. +// The inner heap is sorted by the time in the WaitUntil field of delayHeapNode +type DelayHeap struct { + index map[structs.NamespacedID]*delayHeapNode + heap delayedHeapImp +} + +// HeapNode is an interface type implemented by objects stored in the DelayHeap +type HeapNode interface { + Data() interface{} // The data object + ID() string // ID of the object, used in conjunction with namespace for deduplication + Namespace() string // Namespace of the object, can be empty +} + +// delayHeapNode encapsulates the node stored in DelayHeap +// WaitUntil is used as the sorting criteria +type delayHeapNode struct { + // Node is the data object stored in the delay heap + Node HeapNode + // WaitUntil is the time delay associated with the node + // Objects in the heap are sorted by WaitUntil + WaitUntil time.Time + + index int +} + +type delayedHeapImp []*delayHeapNode + +func (h delayedHeapImp) Len() int { + return len(h) +} + +func (h delayedHeapImp) Less(i, j int) bool { + // Two zero times should return false. + // Otherwise, zero is "greater" than any other time. + // (To sort it at the end of the list.) + // Sort such that zero times are at the end of the list. + iZero, jZero := h[i].WaitUntil.IsZero(), h[j].WaitUntil.IsZero() + if iZero && jZero { + return false + } else if iZero { + return false + } else if jZero { + return true + } + + return h[i].WaitUntil.Before(h[j].WaitUntil) +} + +func (h delayedHeapImp) Swap(i, j int) { + h[i], h[j] = h[j], h[i] + h[i].index = i + h[j].index = j +} + +func (h *delayedHeapImp) Push(x interface{}) { + node := x.(*delayHeapNode) + n := len(*h) + node.index = n + *h = append(*h, node) +} + +func (h *delayedHeapImp) Pop() interface{} { + old := *h + n := len(old) + node := old[n-1] + node.index = -1 // for safety + *h = old[0 : n-1] + return node +} + +func NewDelayHeap() *DelayHeap { + return &DelayHeap{ + index: make(map[structs.NamespacedID]*delayHeapNode), + heap: make(delayedHeapImp, 0), + } +} + +func (p *DelayHeap) Push(dataNode HeapNode, next time.Time) error { + tuple := structs.NamespacedID{ + ID: dataNode.ID(), + Namespace: dataNode.Namespace(), + } + if _, ok := p.index[tuple]; ok { + return fmt.Errorf("node %q (%s) already exists", dataNode.ID(), dataNode.Namespace()) + } + + delayHeapNode := &delayHeapNode{dataNode, next, 0} + p.index[tuple] = delayHeapNode + heap.Push(&p.heap, delayHeapNode) + return nil +} + +func (p *DelayHeap) Pop() *delayHeapNode { + if len(p.heap) == 0 { + return nil + } + + delayHeapNode := heap.Pop(&p.heap).(*delayHeapNode) + tuple := structs.NamespacedID{ + ID: delayHeapNode.Node.ID(), + Namespace: delayHeapNode.Node.Namespace(), + } + delete(p.index, tuple) + return delayHeapNode +} + +func (p *DelayHeap) Peek() *delayHeapNode { + if len(p.heap) == 0 { + return nil + } + + return p.heap[0] +} + +func (p *DelayHeap) Contains(heapNode HeapNode) bool { + tuple := structs.NamespacedID{ + ID: heapNode.ID(), + Namespace: heapNode.Namespace(), + } + _, ok := p.index[tuple] + return ok +} + +func (p *DelayHeap) Update(heapNode HeapNode, waitUntil time.Time) error { + tuple := structs.NamespacedID{ + ID: heapNode.ID(), + Namespace: heapNode.Namespace(), + } + if existingHeapNode, ok := p.index[tuple]; ok { + // Need to update the job as well because its spec can change. + existingHeapNode.Node = heapNode + existingHeapNode.WaitUntil = waitUntil + heap.Fix(&p.heap, existingHeapNode.index) + return nil + } + + return fmt.Errorf("heap doesn't contain object with ID %q (%s)", heapNode.ID(), heapNode.Namespace()) +} + +func (p *DelayHeap) Remove(heapNode HeapNode) error { + tuple := structs.NamespacedID{ + ID: heapNode.ID(), + Namespace: heapNode.Namespace(), + } + if node, ok := p.index[tuple]; ok { + heap.Remove(&p.heap, node.index) + delete(p.index, tuple) + return nil + } + + return fmt.Errorf("heap doesn't contain object with ID %q (%s)", heapNode.ID(), heapNode.Namespace()) +} + +func (p *DelayHeap) Length() int { + return len(p.heap) +} diff --git a/lib/delay_heap_test.go b/lib/delay_heap_test.go new file mode 100644 index 00000000000..ddf64067732 --- /dev/null +++ b/lib/delay_heap_test.go @@ -0,0 +1,115 @@ +package lib + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// HeapNodeImpl satisfies the HeapNode interface +type heapNodeImpl struct { + dataObject interface{} + id string + namespace string +} + +func (d *heapNodeImpl) Data() interface{} { + return d.dataObject +} + +func (d *heapNodeImpl) ID() string { + return d.id +} + +func (d *heapNodeImpl) Namespace() string { + return d.namespace +} + +func TestDelayHeap_PushPop(t *testing.T) { + delayHeap := NewDelayHeap() + now := time.Now() + require := require.New(t) + // a dummy type to use as the inner object in the heap + type myObj struct { + a int + b string + } + dataNode1 := &heapNodeImpl{ + dataObject: &myObj{a: 0, b: "hey"}, + id: "101", + namespace: "default", + } + delayHeap.Push(dataNode1, now.Add(-10*time.Minute)) + + dataNode2 := &heapNodeImpl{ + dataObject: &myObj{a: 0, b: "hey"}, + id: "102", + namespace: "default", + } + delayHeap.Push(dataNode2, now.Add(10*time.Minute)) + + dataNode3 := &heapNodeImpl{ + dataObject: &myObj{a: 0, b: "hey"}, + id: "103", + namespace: "default", + } + delayHeap.Push(dataNode3, now.Add(-15*time.Second)) + + dataNode4 := &heapNodeImpl{ + dataObject: &myObj{a: 0, b: "hey"}, + id: "101", + namespace: "test-namespace", + } + delayHeap.Push(dataNode4, now.Add(2*time.Hour)) + + expectedWaitTimes := []time.Time{now.Add(-10 * time.Minute), now.Add(-15 * time.Second), now.Add(10 * time.Minute), now.Add(2 * time.Hour)} + entries := getHeapEntries(delayHeap, now) + for i, entry := range entries { + require.Equal(expectedWaitTimes[i], entry.WaitUntil) + } + +} + +func TestDelayHeap_Update(t *testing.T) { + delayHeap := NewDelayHeap() + now := time.Now() + require := require.New(t) + // a dummy type to use as the inner object in the heap + type myObj struct { + a int + b string + } + dataNode1 := &heapNodeImpl{ + dataObject: &myObj{a: 0, b: "hey"}, + id: "101", + namespace: "default", + } + delayHeap.Push(dataNode1, now.Add(-10*time.Minute)) + + dataNode2 := &heapNodeImpl{ + dataObject: &myObj{a: 0, b: "hey"}, + id: "102", + namespace: "default", + } + delayHeap.Push(dataNode2, now.Add(10*time.Minute)) + delayHeap.Update(dataNode1, now.Add(20*time.Minute)) + + expectedWaitTimes := []time.Time{now.Add(10 * time.Minute), now.Add(20 * time.Minute)} + expectedIdOrder := []string{"102", "101"} + entries := getHeapEntries(delayHeap, now) + for i, entry := range entries { + require.Equal(expectedWaitTimes[i], entry.WaitUntil) + require.Equal(expectedIdOrder[i], entry.Node.ID()) + } + +} + +func getHeapEntries(delayHeap *DelayHeap, now time.Time) []*delayHeapNode { + var entries []*delayHeapNode + for node := delayHeap.Pop(); node != nil; { + entries = append(entries, node) + node = delayHeap.Pop() + } + return entries +} diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index f8ea14ff917..0217e966dc4 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -8,8 +8,11 @@ import ( "sync" "time" + "context" + "github.com/armon/go-metrics" "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/lib" "github.com/hashicorp/nomad/nomad/structs" ) @@ -77,7 +80,19 @@ type EvalBroker struct { // timeWait has evaluations that are waiting for time to elapse timeWait map[string]*time.Timer - // initialNackDelay is the delay applied before reenqueuing a + // delayedEvalCancelFunc is used to stop the long running go routine + // that processes delayed evaluations + delayedEvalCancelFunc context.CancelFunc + + // delayHeap is a heap used to track incoming evaluations that are + // not eligible to enqueue until their WaitTime + delayHeap *lib.DelayHeap + + // delayedEvalsUpdateCh is used to trigger notifications for updates + // to the delayHeap + delayedEvalsUpdateCh chan struct{} + + // initialNackDelay is the delay applied before re-enqueuing a // Nacked evaluation for the first time. initialNackDelay time.Duration @@ -113,22 +128,25 @@ func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration, return nil, fmt.Errorf("timeout cannot be negative") } b := &EvalBroker{ - nackTimeout: timeout, - deliveryLimit: deliveryLimit, - enabled: false, - stats: new(BrokerStats), - evals: make(map[string]int), - jobEvals: make(map[structs.NamespacedID]string), - blocked: make(map[structs.NamespacedID]PendingEvaluations), - ready: make(map[string]PendingEvaluations), - unack: make(map[string]*unackEval), - waiting: make(map[string]chan struct{}), - requeue: make(map[string]*structs.Evaluation), - timeWait: make(map[string]*time.Timer), - initialNackDelay: initialNackDelay, - subsequentNackDelay: subsequentNackDelay, + nackTimeout: timeout, + deliveryLimit: deliveryLimit, + enabled: false, + stats: new(BrokerStats), + evals: make(map[string]int), + jobEvals: make(map[structs.NamespacedID]string), + blocked: make(map[structs.NamespacedID]PendingEvaluations), + ready: make(map[string]PendingEvaluations), + unack: make(map[string]*unackEval), + waiting: make(map[string]chan struct{}), + requeue: make(map[string]*structs.Evaluation), + timeWait: make(map[string]*time.Timer), + initialNackDelay: initialNackDelay, + subsequentNackDelay: subsequentNackDelay, + delayHeap: lib.NewDelayHeap(), + delayedEvalsUpdateCh: make(chan struct{}, 1), } b.stats.ByScheduler = make(map[string]*SchedulerStats) + return b, nil } @@ -143,10 +161,17 @@ func (b *EvalBroker) Enabled() bool { // should only be enabled on the active leader. func (b *EvalBroker) SetEnabled(enabled bool) { b.l.Lock() + prevEnabled := b.enabled b.enabled = enabled + if !prevEnabled && enabled { + // start the go routine for delayed evals + ctx, cancel := context.WithCancel(context.Background()) + b.delayedEvalCancelFunc = cancel + go b.runDelayedEvalsWatcher(ctx) + } b.l.Unlock() if !enabled { - b.Flush() + b.flush() } } @@ -206,6 +231,17 @@ func (b *EvalBroker) processEnqueue(eval *structs.Evaluation, token string) { return } + if !eval.WaitUntil.IsZero() { + b.delayHeap.Push(&evalWrapper{eval}, eval.WaitUntil) + b.stats.TotalWaiting += 1 + // Signal an update. + select { + case b.delayedEvalsUpdateCh <- struct{}{}: + default: + } + return + } + b.enqueueLocked(eval, eval.Type) } @@ -643,7 +679,7 @@ func (b *EvalBroker) ResumeNackTimeout(evalID, token string) error { } // Flush is used to clear the state of the broker -func (b *EvalBroker) Flush() { +func (b *EvalBroker) flush() { b.l.Lock() defer b.l.Unlock() @@ -663,6 +699,14 @@ func (b *EvalBroker) Flush() { wait.Stop() } + // Cancel the delayed evaluations goroutine + if b.delayedEvalCancelFunc != nil { + b.delayedEvalCancelFunc() + } + + // Clear out the update channel for delayed evaluations + b.delayedEvalsUpdateCh = make(chan struct{}, 1) + // Reset the broker b.stats.TotalReady = 0 b.stats.TotalUnacked = 0 @@ -675,6 +719,75 @@ func (b *EvalBroker) Flush() { b.ready = make(map[string]PendingEvaluations) b.unack = make(map[string]*unackEval) b.timeWait = make(map[string]*time.Timer) + b.delayHeap = lib.NewDelayHeap() +} + +// evalWrapper satisfies the HeapNode interface +type evalWrapper struct { + eval *structs.Evaluation +} + +func (d *evalWrapper) Data() interface{} { + return d.eval +} + +func (d *evalWrapper) ID() string { + return d.eval.ID +} + +func (d *evalWrapper) Namespace() string { + return d.eval.Namespace +} + +// runDelayedEvalsWatcher is a long-lived function that waits till a time deadline is met for +// pending evaluations before enqueuing them +func (b *EvalBroker) runDelayedEvalsWatcher(ctx context.Context) { + var timerChannel <-chan time.Time + var delayTimer *time.Timer + for { + eval, waitUntil := b.nextDelayedEval() + if waitUntil.IsZero() { + timerChannel = nil + } else { + launchDur := waitUntil.Sub(time.Now().UTC()) + if delayTimer == nil { + delayTimer = time.NewTimer(launchDur) + } else { + delayTimer.Reset(launchDur) + } + timerChannel = delayTimer.C + } + + select { + case <-ctx.Done(): + return + case <-timerChannel: + // remove from the heap since we can enqueue it now + b.delayHeap.Remove(&evalWrapper{eval}) + b.l.Lock() + b.stats.TotalWaiting -= 1 + b.enqueueLocked(eval, eval.Type) + b.l.Unlock() + case <-b.delayedEvalsUpdateCh: + continue + } + } +} + +// nextDelayedEval returns the next delayed eval to launch and when it should be enqueued. +// This peeks at the heap to return the top. If the heap is empty, this returns nil and zero time. +func (b *EvalBroker) nextDelayedEval() (*structs.Evaluation, time.Time) { + // If there is nothing wait for an update. + if b.delayHeap.Length() == 0 { + return nil, time.Time{} + } + nextEval := b.delayHeap.Peek() + + if nextEval == nil { + return nil, time.Time{} + } + eval := nextEval.Node.Data().(*structs.Evaluation) + return eval, nextEval.WaitUntil } // Stats is used to query the state of the broker diff --git a/nomad/eval_broker_test.go b/nomad/eval_broker_test.go index 5275ed999f4..8f7f71510a9 100644 --- a/nomad/eval_broker_test.go +++ b/nomad/eval_broker_test.go @@ -1141,7 +1141,53 @@ func TestEvalBroker_Wait(t *testing.T) { } } -// Ensure that priority is taken into account when enqueuing many evaluations. +// Ensure that delayed evaluations work as expected +func TestEvalBroker_WaitUntil(t *testing.T) { + t.Parallel() + require := require.New(t) + b := testBroker(t, 0) + b.SetEnabled(true) + + now := time.Now() + // Create a few of evals with WaitUntil set + eval1 := mock.Eval() + eval1.WaitUntil = now.Add(1 * time.Second) + eval1.CreateIndex = 1 + b.Enqueue(eval1) + + eval2 := mock.Eval() + eval2.WaitUntil = now.Add(100 * time.Millisecond) + // set CreateIndex to use as a tie breaker when eval2 + // and eval3 are both in the pending evals heap + eval2.CreateIndex = 2 + b.Enqueue(eval2) + + eval3 := mock.Eval() + eval3.WaitUntil = now.Add(20 * time.Millisecond) + eval3.CreateIndex = 1 + b.Enqueue(eval3) + require.Equal(3, b.stats.TotalWaiting) + // sleep enough for two evals to be ready + time.Sleep(200 * time.Millisecond) + + // first dequeue should return eval3 + out, _, err := b.Dequeue(defaultSched, time.Second) + require.Nil(err) + require.Equal(eval3, out) + + // second dequeue should return eval2 + out, _, err = b.Dequeue(defaultSched, time.Second) + require.Nil(err) + require.Equal(eval2, out) + + // third dequeue should return eval1 + out, _, err = b.Dequeue(defaultSched, 2*time.Second) + require.Nil(err) + require.Equal(eval1, out) + require.Equal(0, b.stats.TotalWaiting) +} + +// Ensure that priority is taken into account when enqueueing many evaluations. func TestEvalBroker_EnqueueAll_Dequeue_Fair(t *testing.T) { t.Parallel() b := testBroker(t, 0) diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 7de4987a25b..3a8588b9cba 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -92,8 +92,10 @@ func Job() *structs.Job { Mode: structs.RestartPolicyModeDelay, }, ReschedulePolicy: &structs.ReschedulePolicy{ - Attempts: 2, - Interval: 10 * time.Minute, + Attempts: 2, + Interval: 10 * time.Minute, + Delay: 5 * time.Second, + DelayFunction: "linear", }, Tasks: []*structs.Task{ { diff --git a/nomad/structs/diff_test.go b/nomad/structs/diff_test.go index 10bce23b5c2..09453b38246 100644 --- a/nomad/structs/diff_test.go +++ b/nomad/structs/diff_test.go @@ -1499,8 +1499,12 @@ func TestTaskGroupDiff(t *testing.T) { Old: &TaskGroup{}, New: &TaskGroup{ ReschedulePolicy: &ReschedulePolicy{ - Attempts: 1, - Interval: 15 * time.Second, + Attempts: 1, + Interval: 15 * time.Second, + Delay: 5 * time.Second, + MaxDelay: 20 * time.Second, + DelayFunction: "exponential", + Unlimited: false, }, }, Expected: &TaskGroupDiff{ @@ -1516,12 +1520,36 @@ func TestTaskGroupDiff(t *testing.T) { Old: "", New: "1", }, + { + Type: DiffTypeAdded, + Name: "Delay", + Old: "", + New: "5000000000", + }, + { + Type: DiffTypeAdded, + Name: "DelayFunction", + Old: "", + New: "exponential", + }, { Type: DiffTypeAdded, Name: "Interval", Old: "", New: "15000000000", }, + { + Type: DiffTypeAdded, + Name: "MaxDelay", + Old: "", + New: "20000000000", + }, + { + Type: DiffTypeAdded, + Name: "Unlimited", + Old: "", + New: "false", + }, }, }, }, @@ -1531,8 +1559,12 @@ func TestTaskGroupDiff(t *testing.T) { // ReschedulePolicy deleted Old: &TaskGroup{ ReschedulePolicy: &ReschedulePolicy{ - Attempts: 1, - Interval: 15 * time.Second, + Attempts: 1, + Interval: 15 * time.Second, + Delay: 5 * time.Second, + MaxDelay: 20 * time.Second, + DelayFunction: "exponential", + Unlimited: false, }, }, New: &TaskGroup{}, @@ -1549,12 +1581,36 @@ func TestTaskGroupDiff(t *testing.T) { Old: "1", New: "", }, + { + Type: DiffTypeDeleted, + Name: "Delay", + Old: "5000000000", + New: "", + }, + { + Type: DiffTypeDeleted, + Name: "DelayFunction", + Old: "exponential", + New: "", + }, { Type: DiffTypeDeleted, Name: "Interval", Old: "15000000000", New: "", }, + { + Type: DiffTypeDeleted, + Name: "MaxDelay", + Old: "20000000000", + New: "", + }, + { + Type: DiffTypeDeleted, + Name: "Unlimited", + Old: "false", + New: "", + }, }, }, }, @@ -1564,14 +1620,22 @@ func TestTaskGroupDiff(t *testing.T) { // ReschedulePolicy edited Old: &TaskGroup{ ReschedulePolicy: &ReschedulePolicy{ - Attempts: 1, - Interval: 1 * time.Second, + Attempts: 1, + Interval: 1 * time.Second, + DelayFunction: "exponential", + Delay: 20 * time.Second, + MaxDelay: 1 * time.Minute, + Unlimited: false, }, }, New: &TaskGroup{ ReschedulePolicy: &ReschedulePolicy{ - Attempts: 2, - Interval: 2 * time.Second, + Attempts: 2, + Interval: 2 * time.Second, + DelayFunction: "linear", + Delay: 30 * time.Second, + MaxDelay: 1 * time.Minute, + Unlimited: true, }, }, Expected: &TaskGroupDiff{ @@ -1587,12 +1651,30 @@ func TestTaskGroupDiff(t *testing.T) { Old: "1", New: "2", }, + { + Type: DiffTypeEdited, + Name: "Delay", + Old: "20000000000", + New: "30000000000", + }, + { + Type: DiffTypeEdited, + Name: "DelayFunction", + Old: "exponential", + New: "linear", + }, { Type: DiffTypeEdited, Name: "Interval", Old: "1000000000", New: "2000000000", }, + { + Type: DiffTypeEdited, + Name: "Unlimited", + Old: "false", + New: "true", + }, }, }, }, @@ -1625,12 +1707,36 @@ func TestTaskGroupDiff(t *testing.T) { Old: "1", New: "1", }, + { + Type: DiffTypeNone, + Name: "Delay", + Old: "0", + New: "0", + }, + { + Type: DiffTypeNone, + Name: "DelayFunction", + Old: "", + New: "", + }, { Type: DiffTypeEdited, Name: "Interval", Old: "1000000000", New: "2000000000", }, + { + Type: DiffTypeNone, + Name: "MaxDelay", + Old: "0", + New: "0", + }, + { + Type: DiffTypeNone, + Name: "Unlimited", + Old: "false", + New: "false", + }, }, }, }, diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 402b5701fb9..2999026b618 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -35,6 +35,8 @@ import ( "github.com/mitchellh/copystructure" "github.com/ugorji/go/codec" + "math" + hcodec "github.com/hashicorp/go-msgpack/codec" ) @@ -2656,12 +2658,16 @@ var ( var ( DefaultServiceJobReschedulePolicy = ReschedulePolicy{ - Attempts: 2, - Interval: 1 * time.Hour, + Delay: 30 * time.Second, + DelayFunction: "exponential", + MaxDelay: 1 * time.Hour, + Unlimited: true, } DefaultBatchJobReschedulePolicy = ReschedulePolicy{ - Attempts: 1, - Interval: 24 * time.Hour, + Attempts: 1, + Interval: 24 * time.Hour, + Delay: 5 * time.Second, + DelayFunction: "linear", } ) @@ -2744,6 +2750,9 @@ func NewRestartPolicy(jobType string) *RestartPolicy { } const ReschedulePolicyMinInterval = 15 * time.Second +const ReschedulePolicyMinDelay = 5 * time.Second + +var RescheduleDelayFunctions = [...]string{"linear", "exponential", "fibonacci"} // ReschedulePolicy configures how Tasks are rescheduled when they crash or fail. type ReschedulePolicy struct { @@ -2753,7 +2762,20 @@ type ReschedulePolicy struct { // Interval is a duration in which we can limit the number of reschedule attempts. Interval time.Duration - //TODO delay + // Delay is a minimum duration to wait between reschedule attempts. + // The delay function determines how much subsequent reschedule attempts are delayed by. + Delay time.Duration + + // DelayFunction determines how the delay progressively changes on subsequent reschedule + // attempts. Valid values are "exponential", "linear", and "fibonacci". + DelayFunction string + + // MaxDelay is an upper bound on the delay. + MaxDelay time.Duration + + // Unlimited allows infinite rescheduling attempts. Only allowed when delay is set + // between reschedule attempts. + Unlimited bool } func (r *ReschedulePolicy) Copy() *ReschedulePolicy { @@ -2765,17 +2787,151 @@ func (r *ReschedulePolicy) Copy() *ReschedulePolicy { return nrp } +// Validate uses different criteria to validate the reschedule policy +// Delay must be a minimum of 5 seconds +// Delay Ceiling is ignored if Delay Function is "linear" +// Number of possible attempts is validated, given the interval, delay and delay function func (r *ReschedulePolicy) Validate() error { - if r != nil && r.Attempts > 0 { - var mErr multierror.Error - // Check for ambiguous/confusing settings + enabled := r != nil && (r.Attempts > 0 || r.Unlimited) + if !enabled { + return nil + } + var mErr multierror.Error + // Check for ambiguous/confusing settings + + delayPreCheck := true + // Delay should be bigger than the default + if r.Delay.Nanoseconds() < ReschedulePolicyMinDelay.Nanoseconds() { + multierror.Append(&mErr, fmt.Errorf("Delay cannot be less than %v (got %v)", ReschedulePolicyMinDelay, r.Delay)) + delayPreCheck = false + } + + // Must use a valid delay function + if !isValidDelayFunction(r.DelayFunction) { + multierror.Append(&mErr, fmt.Errorf("Invalid delay function %q, must be one of %q", r.DelayFunction, RescheduleDelayFunctions)) + delayPreCheck = false + } + + // Validate MaxDelay if not using linear delay progression + if r.DelayFunction != "linear" { + if r.MaxDelay.Nanoseconds() < ReschedulePolicyMinDelay.Nanoseconds() { + multierror.Append(&mErr, fmt.Errorf("Delay Ceiling cannot be less than %v (got %v)", ReschedulePolicyMinDelay, r.Delay)) + delayPreCheck = false + } + if r.MaxDelay < r.Delay { + multierror.Append(&mErr, fmt.Errorf("Delay Ceiling cannot be less than Delay %v (got %v)", r.Delay, r.MaxDelay)) + delayPreCheck = false + } + + } + + // Validate Interval and other delay parameters if attempts are limited + if !r.Unlimited { if r.Interval.Nanoseconds() < ReschedulePolicyMinInterval.Nanoseconds() { - multierror.Append(&mErr, fmt.Errorf("Interval cannot be less than %v (got %v)", RestartPolicyMinInterval, r.Interval)) + multierror.Append(&mErr, fmt.Errorf("Interval cannot be less than %v (got %v)", ReschedulePolicyMinInterval, r.Interval)) + } + if !delayPreCheck { + // We can't cross validate the rest of the delay params if delayPreCheck fails, so return early + return mErr.ErrorOrNil() } + crossValidationErr := r.validateDelayParams() + if crossValidationErr != nil { + multierror.Append(&mErr, crossValidationErr) + } + } + return mErr.ErrorOrNil() +} + +func isValidDelayFunction(delayFunc string) bool { + for _, value := range RescheduleDelayFunctions { + if value == delayFunc { + return true + } + } + return false +} - return mErr.ErrorOrNil() +func (r *ReschedulePolicy) validateDelayParams() error { + ok, possibleAttempts, recommendedInterval := r.viableAttempts() + if ok { + return nil } - return nil + var mErr multierror.Error + if r.DelayFunction == "linear" { + multierror.Append(&mErr, fmt.Errorf("Nomad can only make %v attempts in %v with initial delay %v and "+ + "delay function %q", possibleAttempts, r.Interval, r.Delay, r.DelayFunction)) + } else { + multierror.Append(&mErr, fmt.Errorf("Nomad can only make %v attempts in %v with initial delay %v, "+ + "delay function %q, and delay ceiling %v", possibleAttempts, r.Interval, r.Delay, r.DelayFunction, r.MaxDelay)) + } + multierror.Append(&mErr, fmt.Errorf("Set the interval to at least %v to accommodate %v attempts", recommendedInterval.Round(time.Minute), r.Attempts)) + return mErr.ErrorOrNil() +} + +func (r *ReschedulePolicy) viableAttempts() (bool, int, time.Duration) { + var possibleAttempts int + var recommendedInterval time.Duration + valid := true + switch r.DelayFunction { + case "linear": + recommendedInterval = time.Duration(r.Attempts) * r.Delay + if r.Interval < recommendedInterval { + possibleAttempts = int(r.Interval / r.Delay) + valid = false + } + case "exponential": + for i := 0; i < r.Attempts; i++ { + nextDelay := time.Duration(math.Pow(2, float64(i))) * r.Delay + if nextDelay > r.MaxDelay { + nextDelay = r.MaxDelay + recommendedInterval += nextDelay + } else { + recommendedInterval = nextDelay + } + if recommendedInterval < r.Interval { + possibleAttempts++ + } + } + if possibleAttempts < r.Attempts { + valid = false + } + case "fibonacci": + var slots []time.Duration + slots = append(slots, r.Delay) + slots = append(slots, r.Delay) + reachedCeiling := false + for i := 2; i < r.Attempts; i++ { + var nextDelay time.Duration + if reachedCeiling { + //switch to linear + nextDelay = slots[i-1] + r.MaxDelay + } else { + nextDelay = slots[i-1] + slots[i-2] + if nextDelay > r.MaxDelay { + nextDelay = r.MaxDelay + reachedCeiling = true + } + } + slots = append(slots, nextDelay) + } + recommendedInterval = slots[len(slots)-1] + if r.Interval < recommendedInterval { + valid = false + // calculate possible attempts + for i := 0; i < len(slots); i++ { + if slots[i] > r.Interval { + possibleAttempts = i + break + } + } + } + default: + return false, 0, 0 + } + if possibleAttempts < 0 { // can happen if delay is bigger than interval + possibleAttempts = 0 + } + return valid, possibleAttempts, recommendedInterval } func NewReschedulePolicy(jobType string) *ReschedulePolicy { @@ -2920,12 +3076,14 @@ func (tg *TaskGroup) Validate(j *Job) error { mErr.Errors = append(mErr.Errors, fmt.Errorf("Task Group %v should have a restart policy", tg.Name)) } - if tg.ReschedulePolicy != nil { - if err := tg.ReschedulePolicy.Validate(); err != nil { - mErr.Errors = append(mErr.Errors, err) + if j.Type != JobTypeSystem { + if tg.ReschedulePolicy != nil { + if err := tg.ReschedulePolicy.Validate(); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + } else { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Task Group %v should have a reschedule policy", tg.Name)) } - } else { - mErr.Errors = append(mErr.Errors, fmt.Errorf("Task Group %v should have a reschedule policy", tg.Name)) } if tg.EphemeralDisk != nil { @@ -5080,12 +5238,16 @@ type RescheduleEvent struct { // PrevNodeID is the node ID of the previous allocation PrevNodeID string + + // Delay is the reschedule delay associated with the attempt + Delay time.Duration } -func NewRescheduleEvent(rescheduleTime int64, prevAllocID string, prevNodeID string) *RescheduleEvent { +func NewRescheduleEvent(rescheduleTime int64, prevAllocID string, prevNodeID string, delay time.Duration) *RescheduleEvent { return &RescheduleEvent{RescheduleTime: rescheduleTime, PrevAllocID: prevAllocID, - PrevNodeID: prevNodeID} + PrevNodeID: prevNodeID, + Delay: delay} } func (re *RescheduleEvent) Copy() *RescheduleEvent { @@ -5181,6 +5343,13 @@ type Allocation struct { // given deployment DeploymentStatus *AllocDeploymentStatus + // RescheduleTrackers captures details of previous reschedule attempts of the allocation + RescheduleTracker *RescheduleTracker + + // FollowupEvalID captures a follow up evaluation created to handle a failed allocation + // that can be rescheduled in the future + FollowupEvalID string + // Raft Indexes CreateIndex uint64 ModifyIndex uint64 @@ -5195,9 +5364,6 @@ type Allocation struct { // ModifyTime is the time the allocation was last updated. ModifyTime int64 - - // RescheduleTrackers captures details of previous reschedule attempts of the allocation - RescheduleTracker *RescheduleTracker } // Index returns the index of the allocation. If the allocation is from a task @@ -5304,11 +5470,11 @@ func (a *Allocation) RescheduleEligible(reschedulePolicy *ReschedulePolicy, fail } attempts := reschedulePolicy.Attempts interval := reschedulePolicy.Interval - - if attempts == 0 { + enabled := attempts > 0 || reschedulePolicy.Unlimited + if !enabled { return false } - if (a.RescheduleTracker == nil || len(a.RescheduleTracker.Events) == 0) && attempts > 0 { + if (a.RescheduleTracker == nil || len(a.RescheduleTracker.Events) == 0) && attempts > 0 || reschedulePolicy.Unlimited { return true } attempted := 0 @@ -5322,6 +5488,98 @@ func (a *Allocation) RescheduleEligible(reschedulePolicy *ReschedulePolicy, fail return attempted < attempts } +// LastEventTime is the time of the last task event in the allocation. +// It is used to determine allocation failure time. +func (a *Allocation) LastEventTime() time.Time { + var lastEventTime time.Time + if a.TaskStates != nil { + for _, e := range a.TaskStates { + if lastEventTime.IsZero() || e.FinishedAt.After(lastEventTime) { + lastEventTime = e.FinishedAt + } + } + } + return lastEventTime +} + +// ReschedulePolicy returns the reschedule policy based on the task group +func (a *Allocation) ReschedulePolicy() *ReschedulePolicy { + tg := a.Job.LookupTaskGroup(a.TaskGroup) + if tg == nil { + return nil + } + return tg.ReschedulePolicy +} + +// NextRescheduleTime returns a time on or after which the allocation is eligible to be rescheduled, +// and whether the next reschedule time is within policy's interval if the policy doesn't allow unlimited reschedules +func (a *Allocation) NextRescheduleTime() (time.Time, bool) { + failTime := a.LastEventTime() + reschedulePolicy := a.ReschedulePolicy() + if a.ClientStatus != AllocClientStatusFailed || failTime.IsZero() || reschedulePolicy == nil { + return time.Time{}, false + } + + nextDelay := a.NextDelay() + nextRescheduleTime := failTime.Add(nextDelay) + rescheduleEligible := reschedulePolicy.Unlimited || (reschedulePolicy.Attempts > 0 && a.RescheduleTracker == nil) + if reschedulePolicy.Attempts > 0 && a.RescheduleTracker != nil && a.RescheduleTracker.Events != nil { + // Check for eligibility based on the interval if max attempts is set + attempted := 0 + for j := len(a.RescheduleTracker.Events) - 1; j >= 0; j-- { + lastAttempt := a.RescheduleTracker.Events[j].RescheduleTime + timeDiff := failTime.UTC().UnixNano() - lastAttempt + if timeDiff < reschedulePolicy.Interval.Nanoseconds() { + attempted += 1 + } + } + rescheduleEligible = attempted < reschedulePolicy.Attempts && nextDelay < reschedulePolicy.Interval + } + return nextRescheduleTime, rescheduleEligible +} + +// NextDelay returns a duration after which the allocation can be rescheduled. +// It is calculated according to the delay function and previous reschedule attempts. +func (a *Allocation) NextDelay() time.Duration { + policy := a.ReschedulePolicy() + delayDur := policy.Delay + if a.RescheduleTracker == nil || a.RescheduleTracker.Events == nil || len(a.RescheduleTracker.Events) == 0 { + return delayDur + } + events := a.RescheduleTracker.Events + switch policy.DelayFunction { + case "exponential": + delayDur = a.RescheduleTracker.Events[len(a.RescheduleTracker.Events)-1].Delay * 2 + case "fibonacci": + if len(events) >= 2 { + fibN1Delay := events[len(events)-1].Delay + fibN2Delay := events[len(events)-2].Delay + // Handle reset of delay ceiling which should cause + // a new series to start + if fibN2Delay == policy.MaxDelay && fibN1Delay == policy.Delay { + delayDur = fibN1Delay + } else { + delayDur = fibN1Delay + fibN2Delay + } + } + default: + return delayDur + } + if policy.MaxDelay > 0 && delayDur > policy.MaxDelay { + delayDur = policy.MaxDelay + // check if delay needs to be reset + + lastRescheduleEvent := a.RescheduleTracker.Events[len(a.RescheduleTracker.Events)-1] + timeDiff := a.LastEventTime().UTC().UnixNano() - lastRescheduleEvent.RescheduleTime + if timeDiff > delayDur.Nanoseconds() { + delayDur = policy.Delay + } + + } + + return delayDur +} + // Terminated returns if the allocation is in a terminal state on a client. func (a *Allocation) Terminated() bool { if a.ClientStatus == AllocClientStatusFailed || @@ -5426,6 +5684,7 @@ type AllocListStub struct { ClientDescription string TaskStates map[string]*TaskState DeploymentStatus *AllocDeploymentStatus + FollowupEvalID string CreateIndex uint64 ModifyIndex uint64 CreateTime int64 @@ -5711,9 +5970,14 @@ type Evaluation struct { StatusDescription string // Wait is a minimum wait time for running the eval. This is used to - // support a rolling upgrade. + // support a rolling upgrade in versions prior to 0.7.0 + // Deprecated Wait time.Duration + // WaitUntil is the time when this eval should be run. This is used to + // supported delayed rescheduling of failed allocations + WaitUntil time.Time + // NextEval is the evaluation ID for the eval created to do a followup. // This is used to support rolling upgrades, where we need a chain of evaluations. NextEval string diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 3a54b2f4282..d249d30c590 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -570,8 +570,10 @@ func testJob() *Job { Delay: 1 * time.Minute, }, ReschedulePolicy: &ReschedulePolicy{ - Interval: 5 * time.Minute, - Attempts: 10, + Interval: 5 * time.Minute, + Attempts: 10, + Delay: 5 * time.Second, + DelayFunction: "linear", }, Tasks: []*Task{ { @@ -930,6 +932,7 @@ func TestTaskGroup_Validate(t *testing.T) { ReschedulePolicy: &ReschedulePolicy{ Interval: 5 * time.Minute, Attempts: 5, + Delay: 5 * time.Second, }, } err := tg.Validate(j) @@ -1012,8 +1015,10 @@ func TestTaskGroup_Validate(t *testing.T) { Mode: RestartPolicyModeDelay, }, ReschedulePolicy: &ReschedulePolicy{ - Interval: 5 * time.Minute, - Attempts: 10, + Interval: 5 * time.Minute, + Attempts: 10, + Delay: 5 * time.Second, + DelayFunction: "linear", }, } @@ -2424,45 +2429,180 @@ func TestRestartPolicy_Validate(t *testing.T) { func TestReschedulePolicy_Validate(t *testing.T) { type testCase struct { + desc string ReschedulePolicy *ReschedulePolicy - err error + errors []error } testCases := []testCase{ { + desc: "Nil", + }, + { + desc: "Disabled", ReschedulePolicy: &ReschedulePolicy{ Attempts: 0, Interval: 0 * time.Second}, - err: nil, }, { + desc: "Disabled", ReschedulePolicy: &ReschedulePolicy{ - Attempts: 1, + Attempts: -1, Interval: 5 * time.Minute}, - err: nil, }, { + desc: "Valid Linear Delay", ReschedulePolicy: &ReschedulePolicy{ - Attempts: -1, - Interval: 5 * time.Minute}, - err: nil, + Attempts: 1, + Interval: 5 * time.Minute, + Delay: 10 * time.Second, + DelayFunction: "linear"}, + }, + { + desc: "Valid Exponential Delay", + ReschedulePolicy: &ReschedulePolicy{ + Attempts: 5, + Interval: 1 * time.Hour, + Delay: 30 * time.Second, + MaxDelay: 5 * time.Minute, + DelayFunction: "exponential"}, + }, + { + desc: "Valid Fibonacci Delay", + ReschedulePolicy: &ReschedulePolicy{ + Attempts: 5, + Interval: 15 * time.Minute, + Delay: 10 * time.Second, + MaxDelay: 5 * time.Minute, + DelayFunction: "fibonacci"}, + }, + { + desc: "Invalid delay function", + ReschedulePolicy: &ReschedulePolicy{ + Attempts: 1, + Interval: 1 * time.Second, + DelayFunction: "blah"}, + errors: []error{ + fmt.Errorf("Interval cannot be less than %v (got %v)", ReschedulePolicyMinInterval, time.Second), + fmt.Errorf("Delay cannot be less than %v (got %v)", ReschedulePolicyMinDelay, 0*time.Second), + fmt.Errorf("Invalid delay function %q, must be one of %q", "blah", RescheduleDelayFunctions), + }, + }, + { + desc: "Invalid delay ceiling", + ReschedulePolicy: &ReschedulePolicy{ + Attempts: 1, + Interval: 8 * time.Second, + DelayFunction: "exponential", + Delay: 15 * time.Second, + MaxDelay: 5 * time.Second}, + errors: []error{ + fmt.Errorf("Delay Ceiling cannot be less than Delay %v (got %v)", + 15*time.Second, 5*time.Second), + }, }, { + desc: "Invalid delay and interval", ReschedulePolicy: &ReschedulePolicy{ - Attempts: 1, - Interval: 1 * time.Second}, - err: fmt.Errorf("Interval cannot be less than %v (got %v)", RestartPolicyMinInterval, time.Second), + Attempts: 1, + Interval: 1 * time.Second, + DelayFunction: "linear"}, + errors: []error{ + fmt.Errorf("Interval cannot be less than %v (got %v)", ReschedulePolicyMinInterval, time.Second), + fmt.Errorf("Delay cannot be less than %v (got %v)", ReschedulePolicyMinDelay, 0*time.Second), + }, + }, { + // Should suggest 2h40m as the interval + desc: "Invalid Attempts - linear delay", + ReschedulePolicy: &ReschedulePolicy{ + Attempts: 10, + Interval: 1 * time.Hour, + Delay: 20 * time.Minute, + DelayFunction: "linear", + }, + errors: []error{ + fmt.Errorf("Nomad can only make %v attempts in %v with initial delay %v and"+ + " delay function %q", 3, time.Hour, 20*time.Minute, "linear"), + fmt.Errorf("Set the interval to at least %v to accommodate %v attempts", + 200*time.Minute, 10), + }, + }, + { + // Should suggest 4h40m as the interval + // Delay progression in minutes {5, 10, 20, 40, 40, 40, 40, 40, 40, 40} + desc: "Invalid Attempts - exponential delay", + ReschedulePolicy: &ReschedulePolicy{ + Attempts: 10, + Interval: 30 * time.Minute, + Delay: 5 * time.Minute, + MaxDelay: 40 * time.Minute, + DelayFunction: "exponential", + }, + errors: []error{ + fmt.Errorf("Nomad can only make %v attempts in %v with initial delay %v, "+ + "delay function %q, and delay ceiling %v", 3, 30*time.Minute, 5*time.Minute, + "exponential", 40*time.Minute), + fmt.Errorf("Set the interval to at least %v to accommodate %v attempts", + 280*time.Minute, 10), + }, + }, + { + // Should suggest 8h as the interval + // Delay progression in minutes {20, 20, 40, 60, 80, 80, 80, 80, 80, 80} + desc: "Invalid Attempts - fibonacci delay", + ReschedulePolicy: &ReschedulePolicy{ + Attempts: 10, + Interval: 1 * time.Hour, + Delay: 20 * time.Minute, + MaxDelay: 80 * time.Minute, + DelayFunction: "fibonacci", + }, + errors: []error{ + fmt.Errorf("Nomad can only make %v attempts in %v with initial delay %v, "+ + "delay function %q, and delay ceiling %v", 4, 1*time.Hour, 20*time.Minute, + "fibonacci", 80*time.Minute), + fmt.Errorf("Set the interval to at least %v to accommodate %v attempts", + 480*time.Minute, 10), + }, + }, + { + desc: "Valid Unlimited config", + ReschedulePolicy: &ReschedulePolicy{ + Attempts: 1, + Unlimited: true, + DelayFunction: "exponential", + Delay: 5 * time.Minute, + MaxDelay: 1 * time.Hour, + }, + }, + { + desc: "Invalid Unlimited config", + ReschedulePolicy: &ReschedulePolicy{ + Attempts: 1, + Interval: 1 * time.Second, + Unlimited: true, + DelayFunction: "exponential", + }, + errors: []error{ + fmt.Errorf("Delay cannot be less than %v (got %v)", ReschedulePolicyMinDelay, 0*time.Second), + fmt.Errorf("Delay Ceiling cannot be less than %v (got %v)", ReschedulePolicyMinDelay, 0*time.Second), + }, }, } - assert := assert.New(t) - for _, tc := range testCases { - if tc.err != nil { - assert.Contains(tc.ReschedulePolicy.Validate().Error(), tc.err.Error()) - } else { - assert.Nil(tc.err) - } + t.Run(tc.desc, func(t *testing.T) { + require := require.New(t) + gotErr := tc.ReschedulePolicy.Validate() + if tc.errors != nil { + // Validate all errors + for _, err := range tc.errors { + require.Contains(gotErr.Error(), err.Error()) + } + } else { + require.Nil(gotErr) + } + }) } } @@ -2719,7 +2859,7 @@ func TestAllocation_ShouldReschedule(t *testing.T) { ClientStatus: AllocClientStatusFailed, DesiredStatus: AllocDesiredStatusRun, FailTime: fail, - ReschedulePolicy: &ReschedulePolicy{0, 1 * time.Minute}, + ReschedulePolicy: &ReschedulePolicy{Attempts: 0, Interval: 1 * time.Minute}, ShouldReschedule: false, }, { @@ -2751,7 +2891,7 @@ func TestAllocation_ShouldReschedule(t *testing.T) { ClientStatus: AllocClientStatusComplete, DesiredStatus: AllocDesiredStatusRun, FailTime: fail, - ReschedulePolicy: &ReschedulePolicy{1, 1 * time.Minute}, + ReschedulePolicy: &ReschedulePolicy{Attempts: 1, Interval: 1 * time.Minute}, ShouldReschedule: false, }, { @@ -2759,14 +2899,14 @@ func TestAllocation_ShouldReschedule(t *testing.T) { ClientStatus: AllocClientStatusFailed, DesiredStatus: AllocDesiredStatusRun, FailTime: fail, - ReschedulePolicy: &ReschedulePolicy{1, 1 * time.Minute}, + ReschedulePolicy: &ReschedulePolicy{Attempts: 1, Interval: 1 * time.Minute}, ShouldReschedule: true, }, { Desc: "Reschedule with leftover attempts", ClientStatus: AllocClientStatusFailed, DesiredStatus: AllocDesiredStatusRun, - ReschedulePolicy: &ReschedulePolicy{2, 5 * time.Minute}, + ReschedulePolicy: &ReschedulePolicy{Attempts: 2, Interval: 5 * time.Minute}, FailTime: fail, RescheduleTrackers: []*RescheduleEvent{ { @@ -2780,7 +2920,7 @@ func TestAllocation_ShouldReschedule(t *testing.T) { ClientStatus: AllocClientStatusFailed, DesiredStatus: AllocDesiredStatusRun, FailTime: fail, - ReschedulePolicy: &ReschedulePolicy{1, 5 * time.Minute}, + ReschedulePolicy: &ReschedulePolicy{Attempts: 1, Interval: 5 * time.Minute}, RescheduleTrackers: []*RescheduleEvent{ { RescheduleTime: fail.Add(-6 * time.Minute).UTC().UnixNano(), @@ -2793,7 +2933,7 @@ func TestAllocation_ShouldReschedule(t *testing.T) { ClientStatus: AllocClientStatusFailed, DesiredStatus: AllocDesiredStatusRun, FailTime: fail, - ReschedulePolicy: &ReschedulePolicy{2, 5 * time.Minute}, + ReschedulePolicy: &ReschedulePolicy{Attempts: 2, Interval: 5 * time.Minute}, RescheduleTrackers: []*RescheduleEvent{ { RescheduleTime: fail.Add(-3 * time.Minute).UTC().UnixNano(), @@ -2821,6 +2961,533 @@ func TestAllocation_ShouldReschedule(t *testing.T) { } } +func TestAllocation_LastEventTime(t *testing.T) { + type testCase struct { + desc string + taskState map[string]*TaskState + expectedLastEventTime time.Time + } + var timeZero time.Time + + t1 := time.Now() + + testCases := []testCase{ + { + desc: "nil task state", + expectedLastEventTime: timeZero, + }, + { + desc: "empty task state", + taskState: make(map[string]*TaskState), + expectedLastEventTime: timeZero, + }, + { + desc: "Finished At not set", + taskState: map[string]*TaskState{"foo": {State: "start", + StartedAt: t1.Add(-2 * time.Hour)}}, + expectedLastEventTime: timeZero, + }, + { + desc: "One finished event", + taskState: map[string]*TaskState{"foo": {State: "start", + StartedAt: t1.Add(-2 * time.Hour), + FinishedAt: t1.Add(-1 * time.Hour)}}, + expectedLastEventTime: t1.Add(-1 * time.Hour), + }, + { + desc: "Multiple events", + taskState: map[string]*TaskState{"foo": {State: "start", + StartedAt: t1.Add(-2 * time.Hour), + FinishedAt: t1.Add(-1 * time.Hour)}, + "bar": {State: "start", + StartedAt: t1.Add(-2 * time.Hour), + FinishedAt: t1.Add(-40 * time.Minute)}}, + expectedLastEventTime: t1.Add(-40 * time.Minute), + }, + } + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + alloc := &Allocation{} + alloc.TaskStates = tc.taskState + require.Equal(t, tc.expectedLastEventTime, alloc.LastEventTime()) + }) + } +} + +func TestAllocation_NextDelay(t *testing.T) { + type testCase struct { + desc string + reschedulePolicy *ReschedulePolicy + alloc *Allocation + expectedRescheduleTime time.Time + expectedRescheduleEligible bool + } + now := time.Now() + testCases := []testCase{ + { + desc: "Allocation hasn't failed yet", + reschedulePolicy: &ReschedulePolicy{ + DelayFunction: "linear", + Delay: 5 * time.Second, + }, + alloc: &Allocation{}, + expectedRescheduleTime: time.Time{}, + expectedRescheduleEligible: false, + }, + { + desc: "Allocation lacks task state", + reschedulePolicy: &ReschedulePolicy{ + DelayFunction: "linear", + Delay: 5 * time.Second, + }, + alloc: &Allocation{ClientStatus: AllocClientStatusFailed}, + expectedRescheduleTime: time.Time{}, + expectedRescheduleEligible: false, + }, + { + desc: "linear delay, unlimited restarts, no reschedule tracker", + reschedulePolicy: &ReschedulePolicy{ + DelayFunction: "linear", + Delay: 5 * time.Second, + Unlimited: true, + }, + alloc: &Allocation{ + ClientStatus: AllocClientStatusFailed, + TaskStates: map[string]*TaskState{"foo": {State: "dead", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-2 * time.Second)}}, + }, + expectedRescheduleTime: now.Add(-2 * time.Second).Add(5 * time.Second), + expectedRescheduleEligible: true, + }, + { + desc: "linear delay with reschedule tracker", + reschedulePolicy: &ReschedulePolicy{ + DelayFunction: "linear", + Delay: 5 * time.Second, + Interval: 10 * time.Minute, + Attempts: 2, + }, + alloc: &Allocation{ + ClientStatus: AllocClientStatusFailed, + TaskStates: map[string]*TaskState{"foo": {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-2 * time.Second)}}, + RescheduleTracker: &RescheduleTracker{ + Events: []*RescheduleEvent{{ + RescheduleTime: now.Add(-2 * time.Minute).UTC().UnixNano(), + Delay: 5 * time.Second, + }}, + }}, + expectedRescheduleTime: now.Add(-2 * time.Second).Add(5 * time.Second), + expectedRescheduleEligible: true, + }, + { + desc: "linear delay with reschedule tracker, attempts exhausted", + reschedulePolicy: &ReschedulePolicy{ + DelayFunction: "linear", + Delay: 5 * time.Second, + Interval: 10 * time.Minute, + Attempts: 2, + }, + alloc: &Allocation{ + ClientStatus: AllocClientStatusFailed, + TaskStates: map[string]*TaskState{"foo": {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-2 * time.Second)}}, + RescheduleTracker: &RescheduleTracker{ + Events: []*RescheduleEvent{ + { + RescheduleTime: now.Add(-3 * time.Minute).UTC().UnixNano(), + Delay: 5 * time.Second, + }, + { + RescheduleTime: now.Add(-2 * time.Minute).UTC().UnixNano(), + Delay: 5 * time.Second, + }, + }, + }}, + expectedRescheduleTime: now.Add(-2 * time.Second).Add(5 * time.Second), + expectedRescheduleEligible: false, + }, + { + desc: "exponential delay - no reschedule tracker", + reschedulePolicy: &ReschedulePolicy{ + DelayFunction: "exponential", + Delay: 5 * time.Second, + MaxDelay: 90 * time.Second, + Unlimited: true, + }, + alloc: &Allocation{ + ClientStatus: AllocClientStatusFailed, + TaskStates: map[string]*TaskState{"foo": {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-2 * time.Second)}}, + }, + expectedRescheduleTime: now.Add(-2 * time.Second).Add(5 * time.Second), + expectedRescheduleEligible: true, + }, + { + desc: "exponential delay with reschedule tracker", + reschedulePolicy: &ReschedulePolicy{ + DelayFunction: "exponential", + Delay: 5 * time.Second, + MaxDelay: 90 * time.Second, + Unlimited: true, + }, + alloc: &Allocation{ + ClientStatus: AllocClientStatusFailed, + TaskStates: map[string]*TaskState{"foo": {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-2 * time.Second)}}, + RescheduleTracker: &RescheduleTracker{ + Events: []*RescheduleEvent{ + { + RescheduleTime: now.Add(-2 * time.Hour).UTC().UnixNano(), + Delay: 5 * time.Second, + }, + { + RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + Delay: 10 * time.Second, + }, + { + RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + Delay: 20 * time.Second, + }, + }, + }}, + expectedRescheduleTime: now.Add(-2 * time.Second).Add(40 * time.Second), + expectedRescheduleEligible: true, + }, + { + desc: "exponential delay with delay ceiling reached", + reschedulePolicy: &ReschedulePolicy{ + DelayFunction: "exponential", + Delay: 5 * time.Second, + MaxDelay: 90 * time.Second, + Unlimited: true, + }, + alloc: &Allocation{ + ClientStatus: AllocClientStatusFailed, + TaskStates: map[string]*TaskState{"foo": {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-15 * time.Second)}}, + RescheduleTracker: &RescheduleTracker{ + Events: []*RescheduleEvent{ + { + RescheduleTime: now.Add(-2 * time.Hour).UTC().UnixNano(), + Delay: 5 * time.Second, + }, + { + RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + Delay: 10 * time.Second, + }, + { + RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + Delay: 20 * time.Second, + }, + { + RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + Delay: 40 * time.Second, + }, + { + RescheduleTime: now.Add(-40 * time.Second).UTC().UnixNano(), + Delay: 80 * time.Second, + }, + }, + }}, + expectedRescheduleTime: now.Add(-15 * time.Second).Add(90 * time.Second), + expectedRescheduleEligible: true, + }, + { + // Test case where most recent reschedule ran longer than delay ceiling + desc: "exponential delay, delay ceiling reset condition met", + reschedulePolicy: &ReschedulePolicy{ + DelayFunction: "exponential", + Delay: 5 * time.Second, + MaxDelay: 90 * time.Second, + Unlimited: true, + }, + alloc: &Allocation{ + ClientStatus: AllocClientStatusFailed, + TaskStates: map[string]*TaskState{"foo": {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-15 * time.Minute)}}, + RescheduleTracker: &RescheduleTracker{ + Events: []*RescheduleEvent{ + { + RescheduleTime: now.Add(-2 * time.Hour).UTC().UnixNano(), + Delay: 5 * time.Second, + }, + { + RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + Delay: 10 * time.Second, + }, + { + RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + Delay: 20 * time.Second, + }, + { + RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + Delay: 40 * time.Second, + }, + { + RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + Delay: 80 * time.Second, + }, + { + RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + Delay: 90 * time.Second, + }, + { + RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + Delay: 90 * time.Second, + }, + }, + }}, + expectedRescheduleTime: now.Add(-15 * time.Minute).Add(5 * time.Second), + expectedRescheduleEligible: true, + }, + { + desc: "fibonacci delay - no reschedule tracker", + reschedulePolicy: &ReschedulePolicy{ + DelayFunction: "fibonacci", + Delay: 5 * time.Second, + MaxDelay: 90 * time.Second, + Unlimited: true, + }, + alloc: &Allocation{ + ClientStatus: AllocClientStatusFailed, + TaskStates: map[string]*TaskState{"foo": {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-2 * time.Second)}}}, + expectedRescheduleTime: now.Add(-2 * time.Second).Add(5 * time.Second), + expectedRescheduleEligible: true, + }, + { + desc: "fibonacci delay with reschedule tracker", + reschedulePolicy: &ReschedulePolicy{ + DelayFunction: "fibonacci", + Delay: 5 * time.Second, + MaxDelay: 90 * time.Second, + Unlimited: true, + }, + alloc: &Allocation{ + ClientStatus: AllocClientStatusFailed, + TaskStates: map[string]*TaskState{"foo": {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-2 * time.Second)}}, + RescheduleTracker: &RescheduleTracker{ + Events: []*RescheduleEvent{ + { + RescheduleTime: now.Add(-2 * time.Hour).UTC().UnixNano(), + Delay: 5 * time.Second, + }, + { + RescheduleTime: now.Add(-5 * time.Second).UTC().UnixNano(), + Delay: 5 * time.Second, + }, + }, + }}, + expectedRescheduleTime: now.Add(-2 * time.Second).Add(10 * time.Second), + expectedRescheduleEligible: true, + }, + { + desc: "fibonacci delay with more events", + reschedulePolicy: &ReschedulePolicy{ + DelayFunction: "fibonacci", + Delay: 5 * time.Second, + MaxDelay: 90 * time.Second, + Unlimited: true, + }, + alloc: &Allocation{ + ClientStatus: AllocClientStatusFailed, + TaskStates: map[string]*TaskState{"foo": {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-2 * time.Second)}}, + RescheduleTracker: &RescheduleTracker{ + Events: []*RescheduleEvent{ + { + RescheduleTime: now.Add(-2 * time.Hour).UTC().UnixNano(), + Delay: 5 * time.Second, + }, + { + RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + Delay: 5 * time.Second, + }, + { + RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + Delay: 10 * time.Second, + }, + { + RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + Delay: 15 * time.Second, + }, + { + RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + Delay: 25 * time.Second, + }, + }, + }}, + expectedRescheduleTime: now.Add(-2 * time.Second).Add(40 * time.Second), + expectedRescheduleEligible: true, + }, + { + desc: "fibonacci delay with delay ceiling reached", + reschedulePolicy: &ReschedulePolicy{ + DelayFunction: "fibonacci", + Delay: 5 * time.Second, + MaxDelay: 50 * time.Second, + Unlimited: true, + }, + alloc: &Allocation{ + ClientStatus: AllocClientStatusFailed, + TaskStates: map[string]*TaskState{"foo": {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-15 * time.Second)}}, + RescheduleTracker: &RescheduleTracker{ + Events: []*RescheduleEvent{ + { + RescheduleTime: now.Add(-2 * time.Hour).UTC().UnixNano(), + Delay: 5 * time.Second, + }, + { + RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + Delay: 5 * time.Second, + }, + { + RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + Delay: 10 * time.Second, + }, + { + RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + Delay: 15 * time.Second, + }, + { + RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + Delay: 25 * time.Second, + }, + { + RescheduleTime: now.Add(-40 * time.Second).UTC().UnixNano(), + Delay: 40 * time.Second, + }, + }, + }}, + expectedRescheduleTime: now.Add(-15 * time.Second).Add(50 * time.Second), + expectedRescheduleEligible: true, + }, + { + desc: "fibonacci delay with delay reset condition met", + reschedulePolicy: &ReschedulePolicy{ + DelayFunction: "fibonacci", + Delay: 5 * time.Second, + MaxDelay: 50 * time.Second, + Unlimited: true, + }, + alloc: &Allocation{ + ClientStatus: AllocClientStatusFailed, + TaskStates: map[string]*TaskState{"foo": {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-5 * time.Minute)}}, + RescheduleTracker: &RescheduleTracker{ + Events: []*RescheduleEvent{ + { + RescheduleTime: now.Add(-2 * time.Hour).UTC().UnixNano(), + Delay: 5 * time.Second, + }, + { + RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + Delay: 5 * time.Second, + }, + { + RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + Delay: 10 * time.Second, + }, + { + RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + Delay: 15 * time.Second, + }, + { + RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + Delay: 25 * time.Second, + }, + { + RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + Delay: 40 * time.Second, + }, + }, + }}, + expectedRescheduleTime: now.Add(-5 * time.Minute).Add(5 * time.Second), + expectedRescheduleEligible: true, + }, + { + desc: "fibonacci delay with the most recent event that reset delay value", + reschedulePolicy: &ReschedulePolicy{ + DelayFunction: "fibonacci", + Delay: 5 * time.Second, + MaxDelay: 50 * time.Second, + Unlimited: true, + }, + alloc: &Allocation{ + ClientStatus: AllocClientStatusFailed, + TaskStates: map[string]*TaskState{"foo": {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-5 * time.Second)}}, + RescheduleTracker: &RescheduleTracker{ + Events: []*RescheduleEvent{ + { + RescheduleTime: now.Add(-2 * time.Hour).UTC().UnixNano(), + Delay: 5 * time.Second, + }, + { + RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + Delay: 5 * time.Second, + }, + { + RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + Delay: 10 * time.Second, + }, + { + RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + Delay: 15 * time.Second, + }, + { + RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + Delay: 25 * time.Second, + }, + { + RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + Delay: 40 * time.Second, + }, + { + RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + Delay: 50 * time.Second, + }, + { + RescheduleTime: now.Add(-1 * time.Minute).UTC().UnixNano(), + Delay: 5 * time.Second, + }, + }, + }}, + expectedRescheduleTime: now.Add(-5 * time.Second).Add(5 * time.Second), + expectedRescheduleEligible: true, + }, + } + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + require := require.New(t) + j := testJob() + j.TaskGroups[0].ReschedulePolicy = tc.reschedulePolicy + tc.alloc.Job = j + tc.alloc.TaskGroup = j.TaskGroups[0].Name + reschedTime, allowed := tc.alloc.NextRescheduleTime() + require.Equal(tc.expectedRescheduleEligible, allowed) + require.Equal(tc.expectedRescheduleTime, reschedTime) + }) + } + +} + func TestRescheduleTracker_Copy(t *testing.T) { type testCase struct { original *RescheduleTracker @@ -2830,9 +3497,15 @@ func TestRescheduleTracker_Copy(t *testing.T) { cases := []testCase{ {nil, nil}, {&RescheduleTracker{Events: []*RescheduleEvent{ - {2, "12", "12"}, + {RescheduleTime: 2, + PrevAllocID: "12", + PrevNodeID: "12", + Delay: 30 * time.Second}, }}, &RescheduleTracker{Events: []*RescheduleEvent{ - {2, "12", "12"}, + {RescheduleTime: 2, + PrevAllocID: "12", + PrevNodeID: "12", + Delay: 30 * time.Second}, }}}, } diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 5830c5d11cd..94dbc8a4b60 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -42,6 +42,10 @@ const ( // blockedEvalFailedPlacements is the description used for blocked evals // that are a result of failing to place all allocations. blockedEvalFailedPlacements = "created to place remaining allocations" + + // maxPastRescheduleEvents is the maximum number of past reschedule event + // that we track when unlimited rescheduling is enabled + maxPastRescheduleEvents = 5 ) // SetStatusError is used to set the status of the evaluation to the given error @@ -72,8 +76,10 @@ type GenericScheduler struct { ctx *EvalContext stack *GenericStack + // Deprecated, was used in pre Nomad 0.7 rolling update stanza and in node draining prior to Nomad 0.8 followupEvalWait time.Duration nextEval *structs.Evaluation + followUpEvals []*structs.Evaluation deployment *structs.Deployment @@ -204,6 +210,7 @@ func (s *GenericScheduler) process() (bool, error) { numTaskGroups = len(s.job.TaskGroups) } s.queuedAllocs = make(map[string]int, numTaskGroups) + s.followUpEvals = nil // Create a plan s.plan = s.eval.MakePlan(s.job) @@ -261,6 +268,19 @@ func (s *GenericScheduler) process() (bool, error) { s.logger.Printf("[DEBUG] sched: %#v: rolling migration limit reached, next eval '%s' created", s.eval, s.nextEval.ID) } + // Create follow up evals for any delayed reschedule eligible allocations + if len(s.followUpEvals) > 0 { + for _, eval := range s.followUpEvals { + eval.PreviousEval = s.eval.ID + // TODO(preetha) this should be batching evals before inserting them + if err := s.planner.CreateEval(eval); err != nil { + s.logger.Printf("[ERR] sched: %#v failed to make next eval for rescheduling: %v", s.eval, err) + return false, err + } + s.logger.Printf("[DEBUG] sched: %#v: found reschedulable allocs, next eval '%s' created", s.eval, eval.ID) + } + } + // Submit the plan and store the results. result, newState, err := s.planner.SubmitPlan(s.plan) s.planResult = result @@ -336,6 +356,12 @@ func (s *GenericScheduler) computeJobAllocs() error { // follow up eval to handle node draining. s.followupEvalWait = results.followupEvalWait + // Store all the follow up evaluations from rescheduled allocations + if len(results.desiredFollowupEvals) > 0 { + for _, evals := range results.desiredFollowupEvals { + s.followUpEvals = append(s.followUpEvals, evals...) + } + } // Update the stored deployment if results.deployment != nil { s.deployment = results.deployment @@ -403,6 +429,9 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul // Update the set of placement nodes s.stack.SetNodes(nodes) + // Capture current time to use as the start time for any rescheduled allocations + now := time.Now() + // Have to handle destructive changes first as we need to discount their // resources. To understand this imagine the resources were reduced and the // count was scaled up. @@ -467,7 +496,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul if prevAllocation != nil { alloc.PreviousAllocation = prevAllocation.ID if missing.IsRescheduling() { - updateRescheduleTracker(alloc, prevAllocation) + updateRescheduleTracker(alloc, prevAllocation, now) } } @@ -524,14 +553,38 @@ func getSelectOptions(prevAllocation *structs.Allocation, preferredNode *structs } // updateRescheduleTracker carries over previous restart attempts and adds the most recent restart -func updateRescheduleTracker(alloc *structs.Allocation, prev *structs.Allocation) { +func updateRescheduleTracker(alloc *structs.Allocation, prev *structs.Allocation, now time.Time) { + reschedPolicy := prev.ReschedulePolicy() var rescheduleEvents []*structs.RescheduleEvent if prev.RescheduleTracker != nil { - for _, reschedEvent := range prev.RescheduleTracker.Events { - rescheduleEvents = append(rescheduleEvents, reschedEvent.Copy()) + var interval time.Duration + if reschedPolicy != nil { + interval = reschedPolicy.Interval + } + // If attempts is set copy all events in the interval range + if reschedPolicy.Attempts > 0 { + for _, reschedEvent := range prev.RescheduleTracker.Events { + timeDiff := now.UnixNano() - reschedEvent.RescheduleTime + // Only copy over events that are within restart interval + // This keeps the list of events small in cases where there's a long chain of old restart events + if interval > 0 && timeDiff <= interval.Nanoseconds() { + rescheduleEvents = append(rescheduleEvents, reschedEvent.Copy()) + } + } + } else { + // Only copy the last n if unlimited is set + start := 0 + if len(prev.RescheduleTracker.Events) > maxPastRescheduleEvents { + start = len(prev.RescheduleTracker.Events) - maxPastRescheduleEvents + } + for i := start; i < len(prev.RescheduleTracker.Events); i++ { + reschedEvent := prev.RescheduleTracker.Events[i] + rescheduleEvents = append(rescheduleEvents, reschedEvent.Copy()) + } } } - rescheduleEvent := structs.NewRescheduleEvent(time.Now().UTC().UnixNano(), prev.ID, prev.NodeID) + nextDelay := prev.NextDelay() + rescheduleEvent := structs.NewRescheduleEvent(now.UnixNano(), prev.ID, prev.NodeID, nextDelay) rescheduleEvents = append(rescheduleEvents, rescheduleEvent) alloc.RescheduleTracker = &structs.RescheduleTracker{Events: rescheduleEvents} } diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 1443a231415..5b21034eb9c 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -2715,7 +2715,7 @@ func TestServiceSched_RetryLimit(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusFailed) } -func TestServiceSched_Reschedule_Once(t *testing.T) { +func TestServiceSched_Reschedule_OnceNow(t *testing.T) { h := NewHarness(t) // Create some nodes @@ -2730,9 +2730,15 @@ func TestServiceSched_Reschedule_Once(t *testing.T) { job := mock.Job() job.TaskGroups[0].Count = 2 job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{ - Attempts: 1, - Interval: 15 * time.Minute, + Attempts: 1, + Interval: 15 * time.Minute, + Delay: 5 * time.Second, + MaxDelay: 1 * time.Minute, + DelayFunction: "linear", } + tgName := job.TaskGroups[0].Name + now := time.Now() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) var allocs []*structs.Allocation @@ -2746,6 +2752,9 @@ func TestServiceSched_Reschedule_Once(t *testing.T) { } // Mark one of the allocations as failed allocs[1].ClientStatus = structs.AllocClientStatusFailed + allocs[1].TaskStates = map[string]*structs.TaskState{tgName: {State: "dead", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-10 * time.Second)}} failedAllocID := allocs[1].ID successAllocID := allocs[0].ID @@ -2817,7 +2826,96 @@ func TestServiceSched_Reschedule_Once(t *testing.T) { } -func TestServiceSched_Reschedule_Multiple(t *testing.T) { +// Tests that alloc reschedulable at a future time creates a follow up eval +func TestServiceSched_Reschedule_Later(t *testing.T) { + h := NewHarness(t) + require := require.New(t) + // Create some nodes + var nodes []*structs.Node + for i := 0; i < 10; i++ { + node := mock.Node() + nodes = append(nodes, node) + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Generate a fake job with allocations and an update policy. + job := mock.Job() + job.TaskGroups[0].Count = 2 + delayDuration := 15 * time.Second + job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{ + Attempts: 1, + Interval: 15 * time.Minute, + Delay: delayDuration, + MaxDelay: 1 * time.Minute, + DelayFunction: "linear", + } + tgName := job.TaskGroups[0].Name + now := time.Now() + + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + var allocs []*structs.Allocation + for i := 0; i < 2; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = nodes[i].ID + alloc.Name = fmt.Sprintf("my-job.web[%d]", i) + allocs = append(allocs, alloc) + } + // Mark one of the allocations as failed + allocs[1].ClientStatus = structs.AllocClientStatusFailed + allocs[1].TaskStates = map[string]*structs.TaskState{tgName: {State: "dead", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now}} + failedAllocID := allocs[1].ID + + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Create a mock evaluation + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: 50, + TriggeredBy: structs.EvalTriggerNodeUpdate, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure multiple plans + if len(h.Plans) == 0 { + t.Fatalf("bad: %#v", h.Plans) + } + + // Lookup the allocations by JobID + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false) + noErr(t, err) + + // Verify no new allocs were created + require.Equal(2, len(out)) + + // Verify follow up eval was created for the failed alloc + alloc, err := h.State.AllocByID(ws, failedAllocID) + require.Nil(err) + require.NotEmpty(alloc.FollowupEvalID) + + // Ensure there is a follow up eval. + if len(h.CreateEvals) != 1 || h.CreateEvals[0].Status != structs.EvalStatusPending { + t.Fatalf("bad: %#v", h.CreateEvals) + } + followupEval := h.CreateEvals[0] + require.Equal(now.Add(delayDuration), followupEval.WaitUntil) +} + +func TestServiceSched_Reschedule_MultipleNow(t *testing.T) { h := NewHarness(t) // Create some nodes @@ -2833,9 +2931,14 @@ func TestServiceSched_Reschedule_Multiple(t *testing.T) { job := mock.Job() job.TaskGroups[0].Count = 2 job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{ - Attempts: maxRestartAttempts, - Interval: 30 * time.Minute, + Attempts: maxRestartAttempts, + Interval: 30 * time.Minute, + Delay: 5 * time.Second, + DelayFunction: "linear", } + tgName := job.TaskGroups[0].Name + now := time.Now() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) var allocs []*structs.Allocation @@ -2850,6 +2953,9 @@ func TestServiceSched_Reschedule_Multiple(t *testing.T) { } // Mark one of the allocations as failed allocs[1].ClientStatus = structs.AllocClientStatusFailed + allocs[1].TaskStates = map[string]*structs.TaskState{tgName: {State: "dead", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-10 * time.Second)}} noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) @@ -2915,6 +3021,9 @@ func TestServiceSched_Reschedule_Multiple(t *testing.T) { // Mark this alloc as failed again newAlloc.ClientStatus = structs.AllocClientStatusFailed + newAlloc.TaskStates = map[string]*structs.TaskState{tgName: {State: "dead", + StartedAt: now.Add(-12 * time.Second), + FinishedAt: now.Add(-10 * time.Second)}} failedAllocId = newAlloc.ID failedNodeID = newAlloc.NodeID @@ -2946,6 +3055,136 @@ func TestServiceSched_Reschedule_Multiple(t *testing.T) { assert.Equal(5, len(out)) // 2 original, plus 3 reschedule attempts } +// Tests that old reschedule attempts are pruned +func TestServiceSched_Reschedule_PruneEvents(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + var nodes []*structs.Node + for i := 0; i < 10; i++ { + node := mock.Node() + nodes = append(nodes, node) + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Generate a fake job with allocations and an update policy. + job := mock.Job() + job.TaskGroups[0].Count = 2 + job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{ + DelayFunction: "exponential", + MaxDelay: 1 * time.Hour, + Delay: 5 * time.Second, + Unlimited: true, + } + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + var allocs []*structs.Allocation + for i := 0; i < 2; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = nodes[i].ID + alloc.Name = fmt.Sprintf("my-job.web[%d]", i) + allocs = append(allocs, alloc) + } + now := time.Now() + // Mark allocations as failed with restart info + allocs[1].TaskStates = map[string]*structs.TaskState{job.TaskGroups[0].Name: {State: "dead", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-15 * time.Minute)}} + allocs[1].ClientStatus = structs.AllocClientStatusFailed + + allocs[1].RescheduleTracker = &structs.RescheduleTracker{ + Events: []*structs.RescheduleEvent{ + {RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + PrevAllocID: uuid.Generate(), + PrevNodeID: uuid.Generate(), + Delay: 5 * time.Second, + }, + {RescheduleTime: now.Add(-40 * time.Minute).UTC().UnixNano(), + PrevAllocID: allocs[0].ID, + PrevNodeID: uuid.Generate(), + Delay: 10 * time.Second, + }, + {RescheduleTime: now.Add(-30 * time.Minute).UTC().UnixNano(), + PrevAllocID: allocs[0].ID, + PrevNodeID: uuid.Generate(), + Delay: 20 * time.Second, + }, + {RescheduleTime: now.Add(-20 * time.Minute).UTC().UnixNano(), + PrevAllocID: allocs[0].ID, + PrevNodeID: uuid.Generate(), + Delay: 40 * time.Second, + }, + {RescheduleTime: now.Add(-10 * time.Minute).UTC().UnixNano(), + PrevAllocID: allocs[0].ID, + PrevNodeID: uuid.Generate(), + Delay: 80 * time.Second, + }, + {RescheduleTime: now.Add(-3 * time.Minute).UTC().UnixNano(), + PrevAllocID: allocs[0].ID, + PrevNodeID: uuid.Generate(), + Delay: 160 * time.Second, + }, + }, + } + expectedFirstRescheduleEvent := allocs[1].RescheduleTracker.Events[1] + expectedDelay := 320 * time.Second + failedAllocID := allocs[1].ID + successAllocID := allocs[0].ID + + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Create a mock evaluation + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: 50, + TriggeredBy: structs.EvalTriggerNodeUpdate, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure multiple plans + if len(h.Plans) == 0 { + t.Fatalf("bad: %#v", h.Plans) + } + + // Lookup the allocations by JobID + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false) + noErr(t, err) + + // Verify that one new allocation got created with its restart tracker info + assert := assert.New(t) + assert.Equal(3, len(out)) + var newAlloc *structs.Allocation + for _, alloc := range out { + if alloc.ID != successAllocID && alloc.ID != failedAllocID { + newAlloc = alloc + } + } + + assert.Equal(failedAllocID, newAlloc.PreviousAllocation) + // Verify that the new alloc copied the last 5 reschedule attempts + assert.Equal(6, len(newAlloc.RescheduleTracker.Events)) + assert.Equal(expectedFirstRescheduleEvent, newAlloc.RescheduleTracker.Events[0]) + + mostRecentRescheduleEvent := newAlloc.RescheduleTracker.Events[5] + // Verify that the failed alloc ID is in the most recent reschedule event + assert.Equal(failedAllocID, mostRecentRescheduleEvent.PrevAllocID) + // Verify that the delay value was captured correctly + assert.Equal(expectedDelay, mostRecentRescheduleEvent.Delay) + +} + // Tests that deployments with failed allocs don't result in placements func TestDeployment_FailedAllocs_NoReschedule(t *testing.T) { h := NewHarness(t) @@ -3079,6 +3318,9 @@ func TestBatchSched_Run_FailedAlloc(t *testing.T) { job.TaskGroups[0].Count = 1 noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + tgName := job.TaskGroups[0].Name + now := time.Now() + // Create a failed alloc alloc := mock.Alloc() alloc.Job = job @@ -3086,6 +3328,9 @@ func TestBatchSched_Run_FailedAlloc(t *testing.T) { alloc.NodeID = node.ID alloc.Name = "my-job.web[0]" alloc.ClientStatus = structs.AllocClientStatusFailed + alloc.TaskStates = map[string]*structs.TaskState{tgName: {State: "dead", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-10 * time.Second)}} noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc})) // Create a mock evaluation to register the job @@ -3231,6 +3476,9 @@ func TestBatchSched_Run_FailedAllocQueuedAllocations(t *testing.T) { job.TaskGroups[0].Count = 1 noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + tgName := job.TaskGroups[0].Name + now := time.Now() + // Create a failed alloc alloc := mock.Alloc() alloc.Job = job @@ -3238,6 +3486,9 @@ func TestBatchSched_Run_FailedAllocQueuedAllocations(t *testing.T) { alloc.NodeID = node.ID alloc.Name = "my-job.web[0]" alloc.ClientStatus = structs.AllocClientStatusFailed + alloc.TaskStates = map[string]*structs.TaskState{tgName: {State: "dead", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-10 * time.Second)}} noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc})) // Create a mock evaluation to register the job @@ -3963,3 +4214,233 @@ func TestServiceSched_CancelDeployment_NewerJob(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } + +// Various table driven tests for carry forward +// of past reschedule events +func Test_updateRescheduleTracker(t *testing.T) { + + t1 := time.Now().UTC() + alloc := mock.Alloc() + prevAlloc := mock.Alloc() + + type testCase struct { + desc string + prevAllocEvents []*structs.RescheduleEvent + reschedPolicy *structs.ReschedulePolicy + expectedRescheduleEvents []*structs.RescheduleEvent + reschedTime time.Time + } + + testCases := []testCase{ + { + desc: "No past events", + prevAllocEvents: nil, + reschedPolicy: &structs.ReschedulePolicy{Unlimited: false, Interval: 24 * time.Hour, Attempts: 2, Delay: 5 * time.Second}, + reschedTime: t1, + expectedRescheduleEvents: []*structs.RescheduleEvent{{t1.UnixNano(), prevAlloc.ID, prevAlloc.NodeID, 5 * time.Second}}, + }, + { + desc: "one past event, linear delay", + prevAllocEvents: []*structs.RescheduleEvent{ + {RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 5 * time.Second}}, + reschedPolicy: &structs.ReschedulePolicy{Unlimited: false, Interval: 24 * time.Hour, Attempts: 2, Delay: 5 * time.Second}, + reschedTime: t1, + expectedRescheduleEvents: []*structs.RescheduleEvent{ + { + RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 5 * time.Second, + }, + { + RescheduleTime: t1.UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 5 * time.Second, + }, + }, + }, + { + desc: "one past event, fibonacci delay", + prevAllocEvents: []*structs.RescheduleEvent{ + {RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 5 * time.Second}}, + reschedPolicy: &structs.ReschedulePolicy{Unlimited: false, Interval: 24 * time.Hour, Attempts: 2, Delay: 5 * time.Second, DelayFunction: "fibonacci", MaxDelay: 60 * time.Second}, + reschedTime: t1, + expectedRescheduleEvents: []*structs.RescheduleEvent{ + { + RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 5 * time.Second, + }, + { + RescheduleTime: t1.UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 5 * time.Second, + }, + }, + }, + { + desc: "eight past events, fibonacci delay, unlimited", + prevAllocEvents: []*structs.RescheduleEvent{ + { + RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 5 * time.Second, + }, + { + RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 5 * time.Second, + }, + { + RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 10 * time.Second, + }, + { + RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 15 * time.Second, + }, + { + RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 25 * time.Second, + }, + { + RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 40 * time.Second, + }, + { + RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 65 * time.Second, + }, + { + RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 105 * time.Second, + }, + }, + reschedPolicy: &structs.ReschedulePolicy{Unlimited: true, Delay: 5 * time.Second, DelayFunction: "fibonacci", MaxDelay: 240 * time.Second}, + reschedTime: t1, + expectedRescheduleEvents: []*structs.RescheduleEvent{ + { + RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 15 * time.Second, + }, + { + RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 25 * time.Second, + }, + { + RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 40 * time.Second, + }, + { + RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 65 * time.Second, + }, + { + RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 105 * time.Second, + }, + { + RescheduleTime: t1.UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 170 * time.Second, + }, + }, + }, + { + desc: " old attempts past interval, exponential delay, limited", + prevAllocEvents: []*structs.RescheduleEvent{ + { + RescheduleTime: t1.Add(-2 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 5 * time.Second, + }, + { + RescheduleTime: t1.Add(-70 * time.Minute).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 10 * time.Second, + }, + { + RescheduleTime: t1.Add(-30 * time.Minute).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 20 * time.Second, + }, + { + RescheduleTime: t1.Add(-10 * time.Minute).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 40 * time.Second, + }, + }, + reschedPolicy: &structs.ReschedulePolicy{Unlimited: false, Interval: 1 * time.Hour, Attempts: 5, Delay: 5 * time.Second, DelayFunction: "exponential", MaxDelay: 240 * time.Second}, + reschedTime: t1, + expectedRescheduleEvents: []*structs.RescheduleEvent{ + { + RescheduleTime: t1.Add(-30 * time.Minute).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 20 * time.Second, + }, + { + RescheduleTime: t1.Add(-10 * time.Minute).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 40 * time.Second, + }, + { + RescheduleTime: t1.UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 80 * time.Second, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + require := require.New(t) + prevAlloc.RescheduleTracker = &structs.RescheduleTracker{Events: tc.prevAllocEvents} + prevAlloc.Job.LookupTaskGroup(prevAlloc.TaskGroup).ReschedulePolicy = tc.reschedPolicy + updateRescheduleTracker(alloc, prevAlloc, tc.reschedTime) + require.Equal(tc.expectedRescheduleEvents, alloc.RescheduleTracker.Events) + }) + } + +} diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index 35eebdcb12a..3bfd1a89e14 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -5,10 +5,19 @@ import ( "log" "time" + "sort" + "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/structs" ) +const ( + // batchedFailedAllocWindowSize is the window size used + // to batch up failed allocations before creating an eval + batchedFailedAllocWindowSize = 5 * time.Second +) + // allocUpdateType takes an existing allocation and a new job definition and // returns whether the allocation can ignore the change, requires a destructive // update, or can be inplace updated. If it can be inplace updated, an updated @@ -92,7 +101,23 @@ type reconcileResults struct { // followupEvalWait is set if there should be a followup eval run after the // given duration + // Deprecated, the delay strategy that sets this is not available after nomad 0.7.0 followupEvalWait time.Duration + + // desiredFollowupEvals is the map of follow up evaluations to create per task group + // This is used to create a delayed evaluation for rescheduling failed allocations. + desiredFollowupEvals map[string][]*structs.Evaluation +} + +// delayedRescheduleInfo contains the allocation id and a time when its eligible to be rescheduled. +// this is used to create follow up evaluations +type delayedRescheduleInfo struct { + + // allocID is the ID of the allocation eligible to be rescheduled + allocID string + + // rescheduleTime is the time to use in the delayed evaluation + rescheduleTime time.Time } func (r *reconcileResults) GoString() string { @@ -136,7 +161,8 @@ func NewAllocReconciler(logger *log.Logger, allocUpdateFn allocUpdateType, batch existingAllocs: existingAllocs, taintedNodes: taintedNodes, result: &reconcileResults{ - desiredTGUpdates: make(map[string]*structs.DesiredUpdates), + desiredTGUpdates: make(map[string]*structs.DesiredUpdates), + desiredFollowupEvals: make(map[string][]*structs.Evaluation), }, } } @@ -318,11 +344,17 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { untainted, migrate, lost := all.filterByTainted(a.taintedNodes) // Determine what set of terminal allocations need to be rescheduled - untainted, reschedule := untainted.filterByReschedulable(a.batch, tg.ReschedulePolicy) + untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch) + + // Create batched follow up evaluations for allocations that are reschedulable later + var rescheduleLaterAllocs map[string]*structs.Allocation + if len(rescheduleLater) > 0 { + rescheduleLaterAllocs = a.handleDelayedReschedules(rescheduleLater, all, tg.Name) + } // Create a structure for choosing names. Seed with the taken names which is // the union of untainted and migrating nodes (includes canaries) - nameIndex := newAllocNameIndex(a.jobID, group, tg.Count, untainted.union(migrate, reschedule)) + nameIndex := newAllocNameIndex(a.jobID, group, tg.Count, untainted.union(migrate, rescheduleNow)) // Stop any unneeded allocations and update the untainted set to not // included stopped allocations. @@ -341,7 +373,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { // Do inplace upgrades where possible and capture the set of upgrades that // need to be done destructively. - ignore, inplace, destructive := a.computeUpdates(tg, untainted) + ignore, inplace, destructive := a.computeUpdates(tg, untainted, rescheduleLaterAllocs) desiredChanges.Ignore += uint64(len(ignore)) desiredChanges.InPlaceUpdate += uint64(len(inplace)) if !existingDeployment { @@ -379,7 +411,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { // * The deployment is not paused or failed // * Not placing any canaries // * If there are any canaries that they have been promoted - place := a.computePlacements(tg, nameIndex, untainted, migrate, reschedule) + place := a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow) if !existingDeployment { dstate.DesiredTotal += len(place) } @@ -774,7 +806,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc // 2. Those that can be upgraded in-place. These are added to the results // automatically since the function contains the correct state to do so, // 3. Those that require destructive updates -func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted allocSet) (ignore, inplace, destructive allocSet) { +func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted, rescheduleLaterAllocs allocSet) (ignore, inplace, destructive allocSet) { // Determine the set of allocations that need to be updated ignore = make(map[string]*structs.Allocation) inplace = make(map[string]*structs.Allocation) @@ -782,7 +814,13 @@ func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted all for _, alloc := range untainted { ignoreChange, destructiveChange, inplaceAlloc := a.allocUpdateFn(alloc, a.job, group) - if ignoreChange { + // Also check if the alloc is marked for later rescheduling. + // If so it should be in the inplace list + reschedLaterAlloc, isRescheduleLater := rescheduleLaterAllocs[alloc.ID] + if isRescheduleLater { + inplace[alloc.ID] = alloc + a.result.inplaceUpdate = append(a.result.inplaceUpdate, reschedLaterAlloc) + } else if ignoreChange { ignore[alloc.ID] = alloc } else if destructiveChange { destructive[alloc.ID] = alloc @@ -796,3 +834,64 @@ func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted all return } + +// handleDelayedReschedules creates batched followup evaluations with the WaitUntil field set +// for allocations that are eligible to be rescheduled later +func (a *allocReconciler) handleDelayedReschedules(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) allocSet { + // Sort by time + sort.Slice(rescheduleLater, func(i, j int) bool { + return rescheduleLater[i].rescheduleTime.Before(rescheduleLater[j].rescheduleTime) + }) + + var evals []*structs.Evaluation + nextReschedTime := rescheduleLater[0].rescheduleTime + allocIDToFollowupEvalID := make(map[string]string, len(rescheduleLater)) + // Create a new eval for the first batch + eval := &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: a.job.Namespace, + Priority: a.job.Priority, + Type: a.job.Type, + TriggeredBy: structs.EvalTriggerRetryFailedAlloc, + JobID: a.job.ID, + JobModifyIndex: a.job.ModifyIndex, + Status: structs.EvalStatusPending, + WaitUntil: nextReschedTime, + } + evals = append(evals, eval) + for _, allocReschedInfo := range rescheduleLater { + if allocReschedInfo.rescheduleTime.Sub(nextReschedTime) < batchedFailedAllocWindowSize { + allocIDToFollowupEvalID[allocReschedInfo.allocID] = eval.ID + } else { + // Start a new batch + nextReschedTime = allocReschedInfo.rescheduleTime + // Create a new eval for the new batch + eval = &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: a.job.Namespace, + Priority: a.job.Priority, + Type: a.job.Type, + TriggeredBy: structs.EvalTriggerRetryFailedAlloc, + JobID: a.job.ID, + JobModifyIndex: a.job.ModifyIndex, + Status: structs.EvalStatusPending, + WaitUntil: nextReschedTime, + } + evals = append(evals, eval) + // Set the evalID for the first alloc in this new batch + allocIDToFollowupEvalID[allocReschedInfo.allocID] = eval.ID + } + } + + a.result.desiredFollowupEvals[tgName] = evals + + // Create in-place updates for every alloc ID that needs to be updated with its follow up eval ID + rescheduleLaterAllocs := make(map[string]*structs.Allocation) + for allocID, evalID := range allocIDToFollowupEvalID { + existingAlloc := all[allocID] + updatedAlloc := existingAlloc.Copy() + updatedAlloc.FollowupEvalID = evalID + rescheduleLaterAllocs[allocID] = updatedAlloc + } + return rescheduleLaterAllocs +} diff --git a/scheduler/reconcile_test.go b/scheduler/reconcile_test.go index 910039d038d..34f6eddbfa0 100644 --- a/scheduler/reconcile_test.go +++ b/scheduler/reconcile_test.go @@ -15,6 +15,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/kr/pretty" + "github.com/stretchr/testify/require" ) /* @@ -38,9 +39,12 @@ Basic Tests: √ Handle task group being removed √ Handle job being stopped both as .Stopped and nil √ Place more that one group -√ Handle rescheduling failed allocs for batch jobs -√ Handle rescheduling failed allocs for service jobs +√ Handle delayed rescheduling failed allocs for batch jobs +√ Handle delayed rescheduling failed allocs for service jobs +√ Handle eligible now rescheduling failed allocs for batch jobs +√ Handle eligible now rescheduling failed allocs for service jobs √ Previously rescheduled allocs should not be rescheduled again +√ Aggregated evaluations for allocations that fail close together Update stanza Tests: √ Stopped job cancels any active deployment @@ -1203,15 +1207,177 @@ func TestReconciler_MultiTG(t *testing.T) { assertNamesHaveIndexes(t, intRange(2, 9, 0, 9), placeResultsToNames(r.place)) } -// Tests rescheduling failed batch allocations -func TestReconciler_Reschedule_Batch(t *testing.T) { +// Tests delayed rescheduling of failed batch allocations +func TestReconciler_RescheduleLater_Batch(t *testing.T) { + require := require.New(t) // Set desired 4 job := mock.Job() job.TaskGroups[0].Count = 4 + now := time.Now() + // Set up reschedule policy + delayDur := 15 * time.Second + job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 3, Interval: 24 * time.Hour, Delay: delayDur, DelayFunction: "linear"} + tgName := job.TaskGroups[0].Name + // Create 6 existing allocations - 2 running, 1 complete and 3 failed + var allocs []*structs.Allocation + for i := 0; i < 6; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = uuid.Generate() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + allocs = append(allocs, alloc) + alloc.ClientStatus = structs.AllocClientStatusRunning + } + // Mark 3 as failed with restart tracking info + allocs[0].ClientStatus = structs.AllocClientStatusFailed + allocs[0].NextAllocation = allocs[1].ID + allocs[1].ClientStatus = structs.AllocClientStatusFailed + allocs[1].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{ + {RescheduleTime: time.Now().Add(-1 * time.Hour).UTC().UnixNano(), + PrevAllocID: allocs[0].ID, + PrevNodeID: uuid.Generate(), + }, + }} + allocs[1].NextAllocation = allocs[2].ID + allocs[2].ClientStatus = structs.AllocClientStatusFailed + allocs[2].TaskStates = map[string]*structs.TaskState{tgName: {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now}} + allocs[2].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{ + {RescheduleTime: time.Now().Add(-2 * time.Hour).UTC().UnixNano(), + PrevAllocID: allocs[0].ID, + PrevNodeID: uuid.Generate(), + }, + {RescheduleTime: time.Now().Add(-1 * time.Hour).UTC().UnixNano(), + PrevAllocID: allocs[1].ID, + PrevNodeID: uuid.Generate(), + }, + }} + // Mark one as complete + allocs[5].ClientStatus = structs.AllocClientStatusComplete + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, true, job.ID, job, nil, allocs, nil) + r := reconciler.Compute() + + // Two reschedule attempts were already made, one more can be made at a future time + + // Verify that the follow up eval has the expected waitUntil time + evals := r.desiredFollowupEvals[tgName] + require.NotNil(evals) + require.Equal(1, len(evals)) + require.Equal(now.Add(delayDur), evals[0].WaitUntil) + + // Alloc 5 should not be replaced because it is terminal + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: nil, + place: 0, + inplace: 1, + stop: 0, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Place: 0, + InPlaceUpdate: 1, + Ignore: 3, + }, + }, + }) + assertNamesHaveIndexes(t, intRange(2, 2), allocsToNames(r.inplaceUpdate)) + // verify that the followup evalID field is set correctly + r.inplaceUpdate[0].EvalID = evals[0].ID +} + +// Tests delayed rescheduling of failed batch allocations and batching of allocs +// with fail times that are close together +func TestReconciler_RescheduleLaterWithBatchedEvals_Batch(t *testing.T) { + require := require.New(t) + // Set desired 4 + job := mock.Job() + job.TaskGroups[0].Count = 10 + now := time.Now() // Set up reschedule policy - job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 3, Interval: 24 * time.Hour} + delayDur := 15 * time.Second + job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 3, Interval: 24 * time.Hour, Delay: delayDur, DelayFunction: "linear"} + tgName := job.TaskGroups[0].Name + // Create 10 existing allocations + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = uuid.Generate() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + allocs = append(allocs, alloc) + alloc.ClientStatus = structs.AllocClientStatusRunning + } + // Mark 5 as failed with fail times very close together + for i := 0; i < 5; i++ { + allocs[i].ClientStatus = structs.AllocClientStatusFailed + allocs[i].TaskStates = map[string]*structs.TaskState{tgName: {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(time.Duration(50*i) * time.Millisecond)}} + } + + // Mark two more as failed several seconds later + for i := 5; i < 7; i++ { + allocs[i].ClientStatus = structs.AllocClientStatusFailed + allocs[i].TaskStates = map[string]*structs.TaskState{tgName: {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(10 * time.Second)}} + } + + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, true, job.ID, job, nil, allocs, nil) + r := reconciler.Compute() + + // Verify that two follow up evals were created + evals := r.desiredFollowupEvals[tgName] + require.NotNil(evals) + require.Equal(2, len(evals)) + + // Verify expected WaitUntil values for both batched evals + require.Equal(now.Add(delayDur), evals[0].WaitUntil) + secondBatchDuration := delayDur + 10*time.Second + require.Equal(now.Add(secondBatchDuration), evals[1].WaitUntil) + // Alloc 5 should not be replaced because it is terminal + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: nil, + place: 0, + inplace: 7, + stop: 0, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Place: 0, + InPlaceUpdate: 7, + Ignore: 3, + }, + }, + }) + assertNamesHaveIndexes(t, intRange(0, 6), allocsToNames(r.inplaceUpdate)) + // verify that the followup evalID field is set correctly + for _, alloc := range r.inplaceUpdate { + if allocNameToIndex(alloc.Name) < 5 { + require.Equal(evals[0].ID, alloc.FollowupEvalID) + } else if allocNameToIndex(alloc.Name) < 7 { + require.Equal(evals[1].ID, alloc.FollowupEvalID) + } else { + t.Fatalf("Unexpected alloc name in Inplace results %v", alloc.Name) + } + } +} + +// Tests rescheduling failed batch allocations +func TestReconciler_RescheduleNow_Batch(t *testing.T) { + require := require.New(t) + // Set desired 4 + job := mock.Job() + job.TaskGroups[0].Count = 4 + now := time.Now() + // Set up reschedule policy + job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 3, Interval: 24 * time.Hour, Delay: 5 * time.Second, DelayFunction: "linear"} + tgName := job.TaskGroups[0].Name // Create 6 existing allocations - 2 running, 1 complete and 3 failed var allocs []*structs.Allocation for i := 0; i < 6; i++ { @@ -1235,6 +1401,9 @@ func TestReconciler_Reschedule_Batch(t *testing.T) { }} allocs[1].NextAllocation = allocs[2].ID allocs[2].ClientStatus = structs.AllocClientStatusFailed + allocs[2].TaskStates = map[string]*structs.TaskState{tgName: {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-10 * time.Second)}} allocs[2].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{ {RescheduleTime: time.Now().Add(-2 * time.Hour).UTC().UnixNano(), PrevAllocID: allocs[0].ID, @@ -1251,7 +1420,11 @@ func TestReconciler_Reschedule_Batch(t *testing.T) { reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, true, job.ID, job, nil, allocs, nil) r := reconciler.Compute() - // Two reschedule attempts were made, one more can be made + // Verify that no follow up evals were created + evals := r.desiredFollowupEvals[tgName] + require.Nil(evals) + + // Two reschedule attempts were made, one more can be made now // Alloc 5 should not be replaced because it is terminal assertResults(t, r, &resultExpectation{ createDeployment: nil, @@ -1266,19 +1439,24 @@ func TestReconciler_Reschedule_Batch(t *testing.T) { }, }, }) + assertNamesHaveIndexes(t, intRange(2, 2), placeResultsToNames(r.place)) assertPlaceResultsHavePreviousAllocs(t, 1, r.place) assertPlacementsAreRescheduled(t, 1, r.place) + } // Tests rescheduling failed service allocations with desired state stop -func TestReconciler_Reschedule_Service(t *testing.T) { +func TestReconciler_RescheduleLater_Service(t *testing.T) { + require := require.New(t) // Set desired 5 job := mock.Job() job.TaskGroups[0].Count = 5 - + tgName := job.TaskGroups[0].Name + now := time.Now() // Set up reschedule policy - job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 1, Interval: 24 * time.Hour} + delayDur := 15 * time.Second + job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 1, Interval: 24 * time.Hour, Delay: delayDur, MaxDelay: 1 * time.Hour} // Create 5 existing allocations var allocs []*structs.Allocation @@ -1293,15 +1471,87 @@ func TestReconciler_Reschedule_Service(t *testing.T) { } // Mark two as failed allocs[0].ClientStatus = structs.AllocClientStatusFailed + // Mark one of them as already rescheduled once + allocs[0].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{ + {RescheduleTime: time.Now().Add(-1 * time.Hour).UTC().UnixNano(), + PrevAllocID: uuid.Generate(), + PrevNodeID: uuid.Generate(), + }, + }} + allocs[1].TaskStates = map[string]*structs.TaskState{tgName: {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now}} allocs[1].ClientStatus = structs.AllocClientStatusFailed + // Mark one as desired state stop + allocs[4].DesiredStatus = structs.AllocDesiredStatusStop + + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil) + r := reconciler.Compute() + + // Should place a new placement and create a follow up eval for the delayed reschedule + // Verify that the follow up eval has the expected waitUntil time + evals := r.desiredFollowupEvals[tgName] + require.NotNil(evals) + require.Equal(1, len(evals)) + require.Equal(now.Add(delayDur), evals[0].WaitUntil) + + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: nil, + place: 1, + inplace: 1, + stop: 0, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Place: 1, + InPlaceUpdate: 1, + Ignore: 3, + }, + }, + }) + + assertNamesHaveIndexes(t, intRange(4, 4), placeResultsToNames(r.place)) + assertNamesHaveIndexes(t, intRange(1, 1), allocsToNames(r.inplaceUpdate)) + // verify that the followup evalID field is set correctly + r.inplaceUpdate[0].EvalID = evals[0].ID +} + +// Tests rescheduling failed service allocations with desired state stop +func TestReconciler_RescheduleNow_Service(t *testing.T) { + require := require.New(t) + // Set desired 5 + job := mock.Job() + job.TaskGroups[0].Count = 5 + tgName := job.TaskGroups[0].Name + now := time.Now() + // Set up reschedule policy + job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 1, Interval: 24 * time.Hour, Delay: 5 * time.Second, MaxDelay: 1 * time.Hour} + + // Create 5 existing allocations + var allocs []*structs.Allocation + for i := 0; i < 5; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = uuid.Generate() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + allocs = append(allocs, alloc) + alloc.ClientStatus = structs.AllocClientStatusRunning + } + // Mark two as failed + allocs[0].ClientStatus = structs.AllocClientStatusFailed // Mark one of them as already rescheduled once - allocs[1].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{ + allocs[0].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{ {RescheduleTime: time.Now().Add(-1 * time.Hour).UTC().UnixNano(), PrevAllocID: uuid.Generate(), PrevNodeID: uuid.Generate(), }, }} + allocs[1].TaskStates = map[string]*structs.TaskState{tgName: {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-10 * time.Second)}} + allocs[1].ClientStatus = structs.AllocClientStatusFailed // Mark one as desired state stop allocs[4].DesiredStatus = structs.AllocDesiredStatusStop @@ -1309,7 +1559,11 @@ func TestReconciler_Reschedule_Service(t *testing.T) { reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil) r := reconciler.Compute() - // Should place 2, one is rescheduled, one is past its reschedule limit and one is a new placement + // Verify that no follow up evals were created + evals := r.desiredFollowupEvals[tgName] + require.Nil(evals) + + // Verify that one rescheduled alloc and one replacement for terminal alloc were placed assertResults(t, r, &resultExpectation{ createDeployment: nil, deploymentUpdates: nil, @@ -1324,8 +1578,8 @@ func TestReconciler_Reschedule_Service(t *testing.T) { }, }) - assertNamesHaveIndexes(t, intRange(0, 0, 4, 4), placeResultsToNames(r.place)) - // 2 rescheduled allocs should have previous allocs + assertNamesHaveIndexes(t, intRange(1, 1, 4, 4), placeResultsToNames(r.place)) + // Rescheduled allocs should have previous allocs assertPlaceResultsHavePreviousAllocs(t, 1, r.place) assertPlacementsAreRescheduled(t, 1, r.place) } @@ -3374,6 +3628,8 @@ func TestReconciler_FailedDeployment_DontReschedule(t *testing.T) { job := mock.Job() job.TaskGroups[0].Update = noCanaryUpdate + tgName := job.TaskGroups[0].Name + now := time.Now() // Create an existing failed deployment that has some placed allocs d := structs.NewDeployment(job) d.Status = structs.DeploymentStatusFailed @@ -3394,8 +3650,17 @@ func TestReconciler_FailedDeployment_DontReschedule(t *testing.T) { alloc.TaskGroup = job.TaskGroups[0].Name allocs = append(allocs, alloc) } + + //create some allocations that are reschedulable now allocs[2].ClientStatus = structs.AllocClientStatusFailed + allocs[2].TaskStates = map[string]*structs.TaskState{tgName: {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-10 * time.Second)}} + allocs[3].ClientStatus = structs.AllocClientStatusFailed + allocs[3].TaskStates = map[string]*structs.TaskState{tgName: {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-10 * time.Second)}} reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, d, allocs, nil) r := reconciler.Compute() @@ -3417,6 +3682,8 @@ func TestReconciler_FailedDeployment_DontReschedule(t *testing.T) { func TestReconciler_DeploymentWithFailedAllocs_DontReschedule(t *testing.T) { job := mock.Job() job.TaskGroups[0].Update = noCanaryUpdate + tgName := job.TaskGroups[0].Name + now := time.Now() // Mock deployment with failed allocs, but deployment watcher hasn't marked it as failed yet d := structs.NewDeployment(job) @@ -3439,8 +3706,17 @@ func TestReconciler_DeploymentWithFailedAllocs_DontReschedule(t *testing.T) { alloc.DeploymentID = d.ID allocs = append(allocs, alloc) } + + // Create allocs that are reschedulable now allocs[2].ClientStatus = structs.AllocClientStatusFailed + allocs[2].TaskStates = map[string]*structs.TaskState{tgName: {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-10 * time.Second)}} + allocs[3].ClientStatus = structs.AllocClientStatusFailed + allocs[3].TaskStates = map[string]*structs.TaskState{tgName: {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-10 * time.Second)}} reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, d, allocs, nil) r := reconciler.Compute() diff --git a/scheduler/reconcile_util.go b/scheduler/reconcile_util.go index cff5c264812..db3a5ff1e3d 100644 --- a/scheduler/reconcile_util.go +++ b/scheduler/reconcile_util.go @@ -227,14 +227,17 @@ func (a allocSet) filterByTainted(nodes map[string]*structs.Node) (untainted, mi return } -// filterByReschedulable filters the allocation set to return the set of allocations that are either -// terminal or running, and a set of allocations that must be rescheduled -func (a allocSet) filterByReschedulable(isBatch bool, reschedulePolicy *structs.ReschedulePolicy) (untainted, reschedule allocSet) { +// filterByRescheduleable filters the allocation set to return the set of allocations that are either +// terminal or running, and a set of allocations that must be rescheduled now. Allocations that can be rescheduled +// at a future time are also returned so that we can create follow up evaluations for them +func (a allocSet) filterByRescheduleable(isBatch bool) (untainted, rescheduleNow allocSet, rescheduleLater []*delayedRescheduleInfo) { untainted = make(map[string]*structs.Allocation) - reschedule = make(map[string]*structs.Allocation) + rescheduleNow = make(map[string]*structs.Allocation) now := time.Now() for _, alloc := range a { + var isUntainted, eligibleNow, eligibleLater bool + var rescheduleTime time.Time if isBatch { // Allocs from batch jobs should be filtered when the desired status // is terminal and the client did not finish or when the client @@ -249,26 +252,47 @@ func (a allocSet) filterByReschedulable(isBatch bool, reschedulePolicy *structs. default: } if alloc.NextAllocation == "" { - if alloc.ShouldReschedule(reschedulePolicy, now) { - reschedule[alloc.ID] = alloc - } else { - untainted[alloc.ID] = alloc - } + // Ignore allocs that have already been rescheduled + isUntainted, eligibleNow, eligibleLater, rescheduleTime = updateByReschedulable(alloc, now, true) } } else { - //ignore allocs that have already been rescheduled + // Ignore allocs that have already been rescheduled if alloc.NextAllocation == "" { - // ignore allocs whose desired state is stop/evict - // everything else is either reschedulable or untainted - if alloc.ShouldReschedule(reschedulePolicy, now) { - reschedule[alloc.ID] = alloc - } else if alloc.DesiredStatus != structs.AllocDesiredStatusStop && alloc.DesiredStatus != structs.AllocDesiredStatusEvict { - untainted[alloc.ID] = alloc - } + isUntainted, eligibleNow, eligibleLater, rescheduleTime = updateByReschedulable(alloc, now, false) } } + if isUntainted { + untainted[alloc.ID] = alloc + } + if eligibleNow { + rescheduleNow[alloc.ID] = alloc + } else if eligibleLater { + rescheduleLater = append(rescheduleLater, &delayedRescheduleInfo{alloc.ID, rescheduleTime}) + } } + return +} +// updateByReschedulable is a helper method that encapsulates logic for whether a failed allocation +// should be rescheduled now, later or left in the untainted set +func updateByReschedulable(alloc *structs.Allocation, now time.Time, batch bool) (untainted, rescheduleNow, rescheduleLater bool, rescheduleTime time.Time) { + shouldAllow := true + if !batch { + // For service type jobs we ignore allocs whose desired state is stop/evict + // everything else is either rescheduleable or untainted + shouldAllow = alloc.DesiredStatus != structs.AllocDesiredStatusStop && alloc.DesiredStatus != structs.AllocDesiredStatusEvict + } + rescheduleTime, eligible := alloc.NextRescheduleTime() + // We consider a time difference of less than 5 seconds to be eligible + // because we collapse allocations that failed within 5 seconds into a single evaluation + if eligible && now.After(rescheduleTime) { + rescheduleNow = true + } else if shouldAllow { + untainted = true + if eligible && alloc.FollowupEvalID == "" { + rescheduleLater = true + } + } return }