Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delayed rescheduling #3981

Merged
merged 25 commits into from
Mar 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
3dd8cd1
New delayed rescheduling options, validation function and unit tests
Feb 22, 2018
e65d870
Add new reschedule options to API layer and unit tests
Feb 23, 2018
0274117
Formatting and linting fixes
Feb 23, 2018
aecf3f7
Fix formatting
Feb 26, 2018
9b609c7
Add new reschedule stanza fields to list of valid fields
Mar 2, 2018
447b5a8
Added FollowupEvalID field and helper methods to calculate reschedule…
Mar 2, 2018
9628454
Scheduler and Reconciler changes to support delayed rescheduling
Mar 2, 2018
f83ddd4
Added a delay heap to track evals with WaitUntil set, and use in eval…
Mar 2, 2018
3eebacb
Remove unnecessary check against 5 second window for determining imme…
Mar 2, 2018
c841dc0
Address some code review comments
Mar 6, 2018
733c40e
Add parsing test cases
Mar 7, 2018
f57f586
Move delayheap to lib package
Mar 7, 2018
94e252e
Extra comments, remove unnecessary if condition
Mar 8, 2018
e5e2ec5
More small review feedback
Mar 8, 2018
e80d0d8
Cleaner handling of batched evals
Mar 8, 2018
8736b9f
Update comment about WaitTime
Mar 8, 2018
abeab12
Get reschedule policy from the alloc directly
Mar 8, 2018
9bccc53
Avoids unnecessary timer object creation
Mar 8, 2018
854ae91
Address more code review feedback
Mar 12, 2018
73783e0
Update comment
Mar 12, 2018
d4056c4
Rename DelayCeiling to MaxDelay
Mar 13, 2018
74bd789
Fix formatting
Mar 13, 2018
69c4a58
Show reschedule eligibility time in alloc-status when followup evalid…
Mar 2, 2018
2ab03a1
fix method comment
Mar 13, 2018
b1fd173
Fix linting warning
Mar 14, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type Allocation struct {
TaskStates map[string]*TaskState
DeploymentID string
DeploymentStatus *AllocDeploymentStatus
FollowupEvalID string
PreviousAllocation string
NextAllocation string
RescheduleTracker *RescheduleTracker
Expand Down Expand Up @@ -129,6 +130,7 @@ type AllocationListStub struct {
TaskStates map[string]*TaskState
DeploymentStatus *AllocDeploymentStatus
RescheduleTracker *RescheduleTracker
FollowupEvalID string
CreateIndex uint64
ModifyIndex uint64
CreateTime int64
Expand Down
1 change: 1 addition & 0 deletions api/evaluations.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type Evaluation struct {
Status string
StatusDescription string
Wait time.Duration
WaitUntil time.Time
NextEval string
PreviousEval string
BlockedEval string
Expand Down
40 changes: 30 additions & 10 deletions api/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,12 @@ func TestJobs_Canonicalize(t *testing.T) {
Mode: helper.StringToPtr("fail"),
},
ReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(2),
Interval: helper.TimeToPtr(1 * time.Hour),
Attempts: helper.IntToPtr(0),
Interval: helper.TimeToPtr(0),
DelayFunction: helper.StringToPtr("exponential"),
Delay: helper.TimeToPtr(30 * time.Second),
MaxDelay: helper.TimeToPtr(1 * time.Hour),
Unlimited: helper.BoolToPtr(true),
},
Tasks: []*Task{
{
Expand Down Expand Up @@ -202,8 +206,12 @@ func TestJobs_Canonicalize(t *testing.T) {
Mode: helper.StringToPtr("fail"),
},
ReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(2),
Interval: helper.TimeToPtr(1 * time.Hour),
Attempts: helper.IntToPtr(0),
Interval: helper.TimeToPtr(0),
DelayFunction: helper.StringToPtr("exponential"),
Delay: helper.TimeToPtr(30 * time.Second),
MaxDelay: helper.TimeToPtr(1 * time.Hour),
Unlimited: helper.BoolToPtr(true),
},
Tasks: []*Task{
{
Expand Down Expand Up @@ -335,8 +343,12 @@ func TestJobs_Canonicalize(t *testing.T) {
Mode: helper.StringToPtr("delay"),
},
ReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(2),
Interval: helper.TimeToPtr(1 * time.Hour),
Attempts: helper.IntToPtr(0),
Interval: helper.TimeToPtr(0),
DelayFunction: helper.StringToPtr("exponential"),
Delay: helper.TimeToPtr(30 * time.Second),
MaxDelay: helper.TimeToPtr(1 * time.Hour),
Unlimited: helper.BoolToPtr(true),
},
EphemeralDisk: &EphemeralDisk{
Sticky: helper.BoolToPtr(false),
Expand Down Expand Up @@ -550,8 +562,12 @@ func TestJobs_Canonicalize(t *testing.T) {
Mode: helper.StringToPtr("fail"),
},
ReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(2),
Interval: helper.TimeToPtr(1 * time.Hour),
Attempts: helper.IntToPtr(0),
Interval: helper.TimeToPtr(0),
DelayFunction: helper.StringToPtr("exponential"),
Delay: helper.TimeToPtr(30 * time.Second),
MaxDelay: helper.TimeToPtr(1 * time.Hour),
Unlimited: helper.BoolToPtr(true),
},
Update: &UpdateStrategy{
Stagger: helper.TimeToPtr(2 * time.Second),
Expand Down Expand Up @@ -586,8 +602,12 @@ func TestJobs_Canonicalize(t *testing.T) {
Mode: helper.StringToPtr("fail"),
},
ReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(2),
Interval: helper.TimeToPtr(1 * time.Hour),
Attempts: helper.IntToPtr(0),
Interval: helper.TimeToPtr(0),
DelayFunction: helper.StringToPtr("exponential"),
Delay: helper.TimeToPtr(30 * time.Second),
MaxDelay: helper.TimeToPtr(1 * time.Hour),
Unlimited: helper.BoolToPtr(true),
},
Update: &UpdateStrategy{
Stagger: helper.TimeToPtr(1 * time.Second),
Expand Down
15 changes: 8 additions & 7 deletions api/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,29 @@ import (
"testing"

"github.com/hashicorp/nomad/api/contexts"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestSearch_List(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
t.Parallel()

c, s := makeClient(t, nil, nil)
defer s.Stop()

job := testJob()
_, _, err := c.Jobs().Register(job, nil)
assert.Nil(err)
require.Nil(err)

id := *job.ID
prefix := id[:len(id)-2]
resp, qm, err := c.Search().PrefixSearch(prefix, contexts.Jobs, nil)

assert.Nil(err)
assert.NotNil(qm)
require.Nil(err)
require.NotNil(qm)
require.NotNil(qm)

jobMatches := resp.Matches[contexts.Jobs]
assert.Equal(1, len(jobMatches))
assert.Equal(id, jobMatches[0])
require.Equal(1, len(jobMatches))
require.Equal(id, jobMatches[0])
}
42 changes: 38 additions & 4 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,20 @@ type ReschedulePolicy struct {

// Interval is a duration in which we can limit the number of reschedule attempts.
Interval *time.Duration `mapstructure:"interval"`

// Delay is a minimum duration to wait between reschedule attempts.
// The delay function determines how much subsequent reschedule attempts are delayed by.
Delay *time.Duration `mapstructure:"delay"`

// DelayFunction determines how the delay progressively changes on subsequent reschedule
// attempts. Valid values are "exponential", "linear", and "fibonacci".
DelayFunction *string `mapstructure:"delay_function"`

// MaxDelay is an upper bound on the delay.
MaxDelay *time.Duration `mapstructure:"max_delay"`

// Unlimited allows rescheduling attempts until they succeed
Unlimited *bool `mapstructure:"unlimited"`
}

func (r *ReschedulePolicy) Merge(rp *ReschedulePolicy) {
Expand All @@ -95,6 +109,18 @@ func (r *ReschedulePolicy) Merge(rp *ReschedulePolicy) {
if rp.Attempts != nil {
r.Attempts = rp.Attempts
}
if rp.Delay != nil {
r.Delay = rp.Delay
}
if rp.DelayFunction != nil {
r.DelayFunction = rp.DelayFunction
}
if rp.MaxDelay != nil {
r.MaxDelay = rp.MaxDelay
}
if rp.Unlimited != nil {
r.Unlimited = rp.Unlimited
}
}

func (r *ReschedulePolicy) Copy() *ReschedulePolicy {
Expand Down Expand Up @@ -316,13 +342,21 @@ func (g *TaskGroup) Canonicalize(job *Job) {
switch *job.Type {
case "service":
defaultReschedulePolicy = &ReschedulePolicy{
Attempts: helper.IntToPtr(structs.DefaultServiceJobReschedulePolicy.Attempts),
Interval: helper.TimeToPtr(structs.DefaultServiceJobReschedulePolicy.Interval),
Attempts: helper.IntToPtr(structs.DefaultServiceJobReschedulePolicy.Attempts),
Interval: helper.TimeToPtr(structs.DefaultServiceJobReschedulePolicy.Interval),
Delay: helper.TimeToPtr(structs.DefaultServiceJobReschedulePolicy.Delay),
DelayFunction: helper.StringToPtr(structs.DefaultServiceJobReschedulePolicy.DelayFunction),
MaxDelay: helper.TimeToPtr(structs.DefaultServiceJobReschedulePolicy.MaxDelay),
Unlimited: helper.BoolToPtr(structs.DefaultServiceJobReschedulePolicy.Unlimited),
}
case "batch":
defaultReschedulePolicy = &ReschedulePolicy{
Attempts: helper.IntToPtr(structs.DefaultBatchJobReschedulePolicy.Attempts),
Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval),
Attempts: helper.IntToPtr(structs.DefaultBatchJobReschedulePolicy.Attempts),
Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval),
Delay: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Delay),
DelayFunction: helper.StringToPtr(structs.DefaultBatchJobReschedulePolicy.DelayFunction),
MaxDelay: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.MaxDelay),
Unlimited: helper.BoolToPtr(structs.DefaultBatchJobReschedulePolicy.Unlimited),
}
default:
defaultReschedulePolicy = &ReschedulePolicy{
Expand Down
93 changes: 71 additions & 22 deletions api/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,70 +284,115 @@ func TestTaskGroup_Canonicalize_ReschedulePolicy(t *testing.T) {
jobReschedulePolicy: nil,
taskReschedulePolicy: nil,
expected: &ReschedulePolicy{
Attempts: helper.IntToPtr(structs.DefaultBatchJobReschedulePolicy.Attempts),
Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval),
Attempts: helper.IntToPtr(structs.DefaultBatchJobReschedulePolicy.Attempts),
Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval),
Delay: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Delay),
DelayFunction: helper.StringToPtr(structs.DefaultBatchJobReschedulePolicy.DelayFunction),
MaxDelay: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.MaxDelay),
Unlimited: helper.BoolToPtr(structs.DefaultBatchJobReschedulePolicy.Unlimited),
},
},
{
desc: "Empty job reschedule policy",
jobReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(0),
Interval: helper.TimeToPtr(0),
Attempts: helper.IntToPtr(0),
Interval: helper.TimeToPtr(0),
Delay: helper.TimeToPtr(0),
MaxDelay: helper.TimeToPtr(0),
DelayFunction: helper.StringToPtr(""),
Unlimited: helper.BoolToPtr(false),
},
taskReschedulePolicy: nil,
expected: &ReschedulePolicy{
Attempts: helper.IntToPtr(0),
Interval: helper.TimeToPtr(0),
Attempts: helper.IntToPtr(0),
Interval: helper.TimeToPtr(0),
Delay: helper.TimeToPtr(0),
MaxDelay: helper.TimeToPtr(0),
DelayFunction: helper.StringToPtr(""),
Unlimited: helper.BoolToPtr(false),
},
},
{
desc: "Inherit from job",
jobReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(1),
Interval: helper.TimeToPtr(20 * time.Second),
Attempts: helper.IntToPtr(1),
Interval: helper.TimeToPtr(20 * time.Second),
Delay: helper.TimeToPtr(20 * time.Second),
MaxDelay: helper.TimeToPtr(10 * time.Minute),
DelayFunction: helper.StringToPtr("linear"),
Unlimited: helper.BoolToPtr(false),
},
taskReschedulePolicy: nil,
expected: &ReschedulePolicy{
Attempts: helper.IntToPtr(1),
Interval: helper.TimeToPtr(20 * time.Second),
Attempts: helper.IntToPtr(1),
Interval: helper.TimeToPtr(20 * time.Second),
Delay: helper.TimeToPtr(20 * time.Second),
MaxDelay: helper.TimeToPtr(10 * time.Minute),
DelayFunction: helper.StringToPtr("linear"),
Unlimited: helper.BoolToPtr(false),
},
},
{
desc: "Set in task",
jobReschedulePolicy: nil,
taskReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(5),
Interval: helper.TimeToPtr(2 * time.Minute),
Attempts: helper.IntToPtr(5),
Interval: helper.TimeToPtr(2 * time.Minute),
Delay: helper.TimeToPtr(20 * time.Second),
MaxDelay: helper.TimeToPtr(10 * time.Minute),
DelayFunction: helper.StringToPtr("linear"),
Unlimited: helper.BoolToPtr(false),
},
expected: &ReschedulePolicy{
Attempts: helper.IntToPtr(5),
Interval: helper.TimeToPtr(2 * time.Minute),
Attempts: helper.IntToPtr(5),
Interval: helper.TimeToPtr(2 * time.Minute),
Delay: helper.TimeToPtr(20 * time.Second),
MaxDelay: helper.TimeToPtr(10 * time.Minute),
DelayFunction: helper.StringToPtr("linear"),
Unlimited: helper.BoolToPtr(false),
},
},
{
desc: "Merge from job",
jobReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(1),
Delay: helper.TimeToPtr(20 * time.Second),
MaxDelay: helper.TimeToPtr(10 * time.Minute),
},
taskReschedulePolicy: &ReschedulePolicy{
Interval: helper.TimeToPtr(5 * time.Minute),
Interval: helper.TimeToPtr(5 * time.Minute),
DelayFunction: helper.StringToPtr("linear"),
Unlimited: helper.BoolToPtr(false),
},
expected: &ReschedulePolicy{
Attempts: helper.IntToPtr(1),
Interval: helper.TimeToPtr(5 * time.Minute),
Attempts: helper.IntToPtr(1),
Interval: helper.TimeToPtr(5 * time.Minute),
Delay: helper.TimeToPtr(20 * time.Second),
MaxDelay: helper.TimeToPtr(10 * time.Minute),
DelayFunction: helper.StringToPtr("linear"),
Unlimited: helper.BoolToPtr(false),
},
},
{
desc: "Override from group",
jobReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(1),
MaxDelay: helper.TimeToPtr(10 * time.Second),
},
taskReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(5),
Attempts: helper.IntToPtr(5),
Delay: helper.TimeToPtr(20 * time.Second),
MaxDelay: helper.TimeToPtr(20 * time.Minute),
DelayFunction: helper.StringToPtr("linear"),
Unlimited: helper.BoolToPtr(false),
},
expected: &ReschedulePolicy{
Attempts: helper.IntToPtr(5),
Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval),
Attempts: helper.IntToPtr(5),
Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval),
Delay: helper.TimeToPtr(20 * time.Second),
MaxDelay: helper.TimeToPtr(20 * time.Minute),
DelayFunction: helper.StringToPtr("linear"),
Unlimited: helper.BoolToPtr(false),
},
},
{
Expand All @@ -357,8 +402,12 @@ func TestTaskGroup_Canonicalize_ReschedulePolicy(t *testing.T) {
},
taskReschedulePolicy: nil,
expected: &ReschedulePolicy{
Attempts: helper.IntToPtr(1),
Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval),
Attempts: helper.IntToPtr(1),
Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval),
Delay: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Delay),
DelayFunction: helper.StringToPtr(structs.DefaultBatchJobReschedulePolicy.DelayFunction),
MaxDelay: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.MaxDelay),
Unlimited: helper.BoolToPtr(structs.DefaultBatchJobReschedulePolicy.Unlimited),
},
},
}
Expand Down
8 changes: 6 additions & 2 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,8 +639,12 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) {
}

tg.ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: *taskGroup.ReschedulePolicy.Attempts,
Interval: *taskGroup.ReschedulePolicy.Interval,
Attempts: *taskGroup.ReschedulePolicy.Attempts,
Interval: *taskGroup.ReschedulePolicy.Interval,
Delay: *taskGroup.ReschedulePolicy.Delay,
DelayFunction: *taskGroup.ReschedulePolicy.DelayFunction,
MaxDelay: *taskGroup.ReschedulePolicy.MaxDelay,
Unlimited: *taskGroup.ReschedulePolicy.Unlimited,
}

tg.EphemeralDisk = &structs.EphemeralDisk{
Expand Down
16 changes: 12 additions & 4 deletions command/agent/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1172,8 +1172,12 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Mode: helper.StringToPtr("delay"),
},
ReschedulePolicy: &api.ReschedulePolicy{
Interval: helper.TimeToPtr(12 * time.Hour),
Attempts: helper.IntToPtr(5),
Interval: helper.TimeToPtr(12 * time.Hour),
Attempts: helper.IntToPtr(5),
DelayFunction: helper.StringToPtr("linear"),
Delay: helper.TimeToPtr(30 * time.Second),
Unlimited: helper.BoolToPtr(true),
MaxDelay: helper.TimeToPtr(20 * time.Minute),
},
EphemeralDisk: &api.EphemeralDisk{
SizeMB: helper.IntToPtr(100),
Expand Down Expand Up @@ -1384,8 +1388,12 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Mode: "delay",
},
ReschedulePolicy: &structs.ReschedulePolicy{
Interval: 12 * time.Hour,
Attempts: 5,
Interval: 12 * time.Hour,
Attempts: 5,
DelayFunction: "linear",
Delay: 30 * time.Second,
Unlimited: true,
MaxDelay: 20 * time.Minute,
},
EphemeralDisk: &structs.EphemeralDisk{
SizeMB: 100,
Expand Down
Loading