Skip to content

Commit

Permalink
Merge pull request #3981 from hashicorp/f-delayed-scheduling
Browse files Browse the repository at this point in the history
Delayed rescheduling
  • Loading branch information
Preetha authored Mar 14, 2018
2 parents dc183ad + b1fd173 commit 3ece61c
Show file tree
Hide file tree
Showing 28 changed files with 2,810 additions and 184 deletions.
2 changes: 2 additions & 0 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type Allocation struct {
TaskStates map[string]*TaskState
DeploymentID string
DeploymentStatus *AllocDeploymentStatus
FollowupEvalID string
PreviousAllocation string
NextAllocation string
RescheduleTracker *RescheduleTracker
Expand Down Expand Up @@ -129,6 +130,7 @@ type AllocationListStub struct {
TaskStates map[string]*TaskState
DeploymentStatus *AllocDeploymentStatus
RescheduleTracker *RescheduleTracker
FollowupEvalID string
CreateIndex uint64
ModifyIndex uint64
CreateTime int64
Expand Down
1 change: 1 addition & 0 deletions api/evaluations.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type Evaluation struct {
Status string
StatusDescription string
Wait time.Duration
WaitUntil time.Time
NextEval string
PreviousEval string
BlockedEval string
Expand Down
40 changes: 30 additions & 10 deletions api/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,12 @@ func TestJobs_Canonicalize(t *testing.T) {
Mode: helper.StringToPtr("fail"),
},
ReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(2),
Interval: helper.TimeToPtr(1 * time.Hour),
Attempts: helper.IntToPtr(0),
Interval: helper.TimeToPtr(0),
DelayFunction: helper.StringToPtr("exponential"),
Delay: helper.TimeToPtr(30 * time.Second),
MaxDelay: helper.TimeToPtr(1 * time.Hour),
Unlimited: helper.BoolToPtr(true),
},
Tasks: []*Task{
{
Expand Down Expand Up @@ -202,8 +206,12 @@ func TestJobs_Canonicalize(t *testing.T) {
Mode: helper.StringToPtr("fail"),
},
ReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(2),
Interval: helper.TimeToPtr(1 * time.Hour),
Attempts: helper.IntToPtr(0),
Interval: helper.TimeToPtr(0),
DelayFunction: helper.StringToPtr("exponential"),
Delay: helper.TimeToPtr(30 * time.Second),
MaxDelay: helper.TimeToPtr(1 * time.Hour),
Unlimited: helper.BoolToPtr(true),
},
Tasks: []*Task{
{
Expand Down Expand Up @@ -335,8 +343,12 @@ func TestJobs_Canonicalize(t *testing.T) {
Mode: helper.StringToPtr("delay"),
},
ReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(2),
Interval: helper.TimeToPtr(1 * time.Hour),
Attempts: helper.IntToPtr(0),
Interval: helper.TimeToPtr(0),
DelayFunction: helper.StringToPtr("exponential"),
Delay: helper.TimeToPtr(30 * time.Second),
MaxDelay: helper.TimeToPtr(1 * time.Hour),
Unlimited: helper.BoolToPtr(true),
},
EphemeralDisk: &EphemeralDisk{
Sticky: helper.BoolToPtr(false),
Expand Down Expand Up @@ -550,8 +562,12 @@ func TestJobs_Canonicalize(t *testing.T) {
Mode: helper.StringToPtr("fail"),
},
ReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(2),
Interval: helper.TimeToPtr(1 * time.Hour),
Attempts: helper.IntToPtr(0),
Interval: helper.TimeToPtr(0),
DelayFunction: helper.StringToPtr("exponential"),
Delay: helper.TimeToPtr(30 * time.Second),
MaxDelay: helper.TimeToPtr(1 * time.Hour),
Unlimited: helper.BoolToPtr(true),
},
Update: &UpdateStrategy{
Stagger: helper.TimeToPtr(2 * time.Second),
Expand Down Expand Up @@ -586,8 +602,12 @@ func TestJobs_Canonicalize(t *testing.T) {
Mode: helper.StringToPtr("fail"),
},
ReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(2),
Interval: helper.TimeToPtr(1 * time.Hour),
Attempts: helper.IntToPtr(0),
Interval: helper.TimeToPtr(0),
DelayFunction: helper.StringToPtr("exponential"),
Delay: helper.TimeToPtr(30 * time.Second),
MaxDelay: helper.TimeToPtr(1 * time.Hour),
Unlimited: helper.BoolToPtr(true),
},
Update: &UpdateStrategy{
Stagger: helper.TimeToPtr(1 * time.Second),
Expand Down
15 changes: 8 additions & 7 deletions api/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,29 @@ import (
"testing"

"github.com/hashicorp/nomad/api/contexts"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestSearch_List(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
t.Parallel()

c, s := makeClient(t, nil, nil)
defer s.Stop()

job := testJob()
_, _, err := c.Jobs().Register(job, nil)
assert.Nil(err)
require.Nil(err)

id := *job.ID
prefix := id[:len(id)-2]
resp, qm, err := c.Search().PrefixSearch(prefix, contexts.Jobs, nil)

assert.Nil(err)
assert.NotNil(qm)
require.Nil(err)
require.NotNil(qm)
require.NotNil(qm)

jobMatches := resp.Matches[contexts.Jobs]
assert.Equal(1, len(jobMatches))
assert.Equal(id, jobMatches[0])
require.Equal(1, len(jobMatches))
require.Equal(id, jobMatches[0])
}
42 changes: 38 additions & 4 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,20 @@ type ReschedulePolicy struct {

// Interval is a duration in which we can limit the number of reschedule attempts.
Interval *time.Duration `mapstructure:"interval"`

// Delay is a minimum duration to wait between reschedule attempts.
// The delay function determines how much subsequent reschedule attempts are delayed by.
Delay *time.Duration `mapstructure:"delay"`

// DelayFunction determines how the delay progressively changes on subsequent reschedule
// attempts. Valid values are "exponential", "linear", and "fibonacci".
DelayFunction *string `mapstructure:"delay_function"`

// MaxDelay is an upper bound on the delay.
MaxDelay *time.Duration `mapstructure:"max_delay"`

// Unlimited allows rescheduling attempts until they succeed
Unlimited *bool `mapstructure:"unlimited"`
}

func (r *ReschedulePolicy) Merge(rp *ReschedulePolicy) {
Expand All @@ -95,6 +109,18 @@ func (r *ReschedulePolicy) Merge(rp *ReschedulePolicy) {
if rp.Attempts != nil {
r.Attempts = rp.Attempts
}
if rp.Delay != nil {
r.Delay = rp.Delay
}
if rp.DelayFunction != nil {
r.DelayFunction = rp.DelayFunction
}
if rp.MaxDelay != nil {
r.MaxDelay = rp.MaxDelay
}
if rp.Unlimited != nil {
r.Unlimited = rp.Unlimited
}
}

func (r *ReschedulePolicy) Copy() *ReschedulePolicy {
Expand Down Expand Up @@ -316,13 +342,21 @@ func (g *TaskGroup) Canonicalize(job *Job) {
switch *job.Type {
case "service":
defaultReschedulePolicy = &ReschedulePolicy{
Attempts: helper.IntToPtr(structs.DefaultServiceJobReschedulePolicy.Attempts),
Interval: helper.TimeToPtr(structs.DefaultServiceJobReschedulePolicy.Interval),
Attempts: helper.IntToPtr(structs.DefaultServiceJobReschedulePolicy.Attempts),
Interval: helper.TimeToPtr(structs.DefaultServiceJobReschedulePolicy.Interval),
Delay: helper.TimeToPtr(structs.DefaultServiceJobReschedulePolicy.Delay),
DelayFunction: helper.StringToPtr(structs.DefaultServiceJobReschedulePolicy.DelayFunction),
MaxDelay: helper.TimeToPtr(structs.DefaultServiceJobReschedulePolicy.MaxDelay),
Unlimited: helper.BoolToPtr(structs.DefaultServiceJobReschedulePolicy.Unlimited),
}
case "batch":
defaultReschedulePolicy = &ReschedulePolicy{
Attempts: helper.IntToPtr(structs.DefaultBatchJobReschedulePolicy.Attempts),
Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval),
Attempts: helper.IntToPtr(structs.DefaultBatchJobReschedulePolicy.Attempts),
Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval),
Delay: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Delay),
DelayFunction: helper.StringToPtr(structs.DefaultBatchJobReschedulePolicy.DelayFunction),
MaxDelay: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.MaxDelay),
Unlimited: helper.BoolToPtr(structs.DefaultBatchJobReschedulePolicy.Unlimited),
}
default:
defaultReschedulePolicy = &ReschedulePolicy{
Expand Down
93 changes: 71 additions & 22 deletions api/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,70 +284,115 @@ func TestTaskGroup_Canonicalize_ReschedulePolicy(t *testing.T) {
jobReschedulePolicy: nil,
taskReschedulePolicy: nil,
expected: &ReschedulePolicy{
Attempts: helper.IntToPtr(structs.DefaultBatchJobReschedulePolicy.Attempts),
Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval),
Attempts: helper.IntToPtr(structs.DefaultBatchJobReschedulePolicy.Attempts),
Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval),
Delay: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Delay),
DelayFunction: helper.StringToPtr(structs.DefaultBatchJobReschedulePolicy.DelayFunction),
MaxDelay: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.MaxDelay),
Unlimited: helper.BoolToPtr(structs.DefaultBatchJobReschedulePolicy.Unlimited),
},
},
{
desc: "Empty job reschedule policy",
jobReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(0),
Interval: helper.TimeToPtr(0),
Attempts: helper.IntToPtr(0),
Interval: helper.TimeToPtr(0),
Delay: helper.TimeToPtr(0),
MaxDelay: helper.TimeToPtr(0),
DelayFunction: helper.StringToPtr(""),
Unlimited: helper.BoolToPtr(false),
},
taskReschedulePolicy: nil,
expected: &ReschedulePolicy{
Attempts: helper.IntToPtr(0),
Interval: helper.TimeToPtr(0),
Attempts: helper.IntToPtr(0),
Interval: helper.TimeToPtr(0),
Delay: helper.TimeToPtr(0),
MaxDelay: helper.TimeToPtr(0),
DelayFunction: helper.StringToPtr(""),
Unlimited: helper.BoolToPtr(false),
},
},
{
desc: "Inherit from job",
jobReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(1),
Interval: helper.TimeToPtr(20 * time.Second),
Attempts: helper.IntToPtr(1),
Interval: helper.TimeToPtr(20 * time.Second),
Delay: helper.TimeToPtr(20 * time.Second),
MaxDelay: helper.TimeToPtr(10 * time.Minute),
DelayFunction: helper.StringToPtr("linear"),
Unlimited: helper.BoolToPtr(false),
},
taskReschedulePolicy: nil,
expected: &ReschedulePolicy{
Attempts: helper.IntToPtr(1),
Interval: helper.TimeToPtr(20 * time.Second),
Attempts: helper.IntToPtr(1),
Interval: helper.TimeToPtr(20 * time.Second),
Delay: helper.TimeToPtr(20 * time.Second),
MaxDelay: helper.TimeToPtr(10 * time.Minute),
DelayFunction: helper.StringToPtr("linear"),
Unlimited: helper.BoolToPtr(false),
},
},
{
desc: "Set in task",
jobReschedulePolicy: nil,
taskReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(5),
Interval: helper.TimeToPtr(2 * time.Minute),
Attempts: helper.IntToPtr(5),
Interval: helper.TimeToPtr(2 * time.Minute),
Delay: helper.TimeToPtr(20 * time.Second),
MaxDelay: helper.TimeToPtr(10 * time.Minute),
DelayFunction: helper.StringToPtr("linear"),
Unlimited: helper.BoolToPtr(false),
},
expected: &ReschedulePolicy{
Attempts: helper.IntToPtr(5),
Interval: helper.TimeToPtr(2 * time.Minute),
Attempts: helper.IntToPtr(5),
Interval: helper.TimeToPtr(2 * time.Minute),
Delay: helper.TimeToPtr(20 * time.Second),
MaxDelay: helper.TimeToPtr(10 * time.Minute),
DelayFunction: helper.StringToPtr("linear"),
Unlimited: helper.BoolToPtr(false),
},
},
{
desc: "Merge from job",
jobReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(1),
Delay: helper.TimeToPtr(20 * time.Second),
MaxDelay: helper.TimeToPtr(10 * time.Minute),
},
taskReschedulePolicy: &ReschedulePolicy{
Interval: helper.TimeToPtr(5 * time.Minute),
Interval: helper.TimeToPtr(5 * time.Minute),
DelayFunction: helper.StringToPtr("linear"),
Unlimited: helper.BoolToPtr(false),
},
expected: &ReschedulePolicy{
Attempts: helper.IntToPtr(1),
Interval: helper.TimeToPtr(5 * time.Minute),
Attempts: helper.IntToPtr(1),
Interval: helper.TimeToPtr(5 * time.Minute),
Delay: helper.TimeToPtr(20 * time.Second),
MaxDelay: helper.TimeToPtr(10 * time.Minute),
DelayFunction: helper.StringToPtr("linear"),
Unlimited: helper.BoolToPtr(false),
},
},
{
desc: "Override from group",
jobReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(1),
MaxDelay: helper.TimeToPtr(10 * time.Second),
},
taskReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(5),
Attempts: helper.IntToPtr(5),
Delay: helper.TimeToPtr(20 * time.Second),
MaxDelay: helper.TimeToPtr(20 * time.Minute),
DelayFunction: helper.StringToPtr("linear"),
Unlimited: helper.BoolToPtr(false),
},
expected: &ReschedulePolicy{
Attempts: helper.IntToPtr(5),
Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval),
Attempts: helper.IntToPtr(5),
Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval),
Delay: helper.TimeToPtr(20 * time.Second),
MaxDelay: helper.TimeToPtr(20 * time.Minute),
DelayFunction: helper.StringToPtr("linear"),
Unlimited: helper.BoolToPtr(false),
},
},
{
Expand All @@ -357,8 +402,12 @@ func TestTaskGroup_Canonicalize_ReschedulePolicy(t *testing.T) {
},
taskReschedulePolicy: nil,
expected: &ReschedulePolicy{
Attempts: helper.IntToPtr(1),
Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval),
Attempts: helper.IntToPtr(1),
Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval),
Delay: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Delay),
DelayFunction: helper.StringToPtr(structs.DefaultBatchJobReschedulePolicy.DelayFunction),
MaxDelay: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.MaxDelay),
Unlimited: helper.BoolToPtr(structs.DefaultBatchJobReschedulePolicy.Unlimited),
},
},
}
Expand Down
8 changes: 6 additions & 2 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,8 +639,12 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) {
}

tg.ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: *taskGroup.ReschedulePolicy.Attempts,
Interval: *taskGroup.ReschedulePolicy.Interval,
Attempts: *taskGroup.ReschedulePolicy.Attempts,
Interval: *taskGroup.ReschedulePolicy.Interval,
Delay: *taskGroup.ReschedulePolicy.Delay,
DelayFunction: *taskGroup.ReschedulePolicy.DelayFunction,
MaxDelay: *taskGroup.ReschedulePolicy.MaxDelay,
Unlimited: *taskGroup.ReschedulePolicy.Unlimited,
}

tg.EphemeralDisk = &structs.EphemeralDisk{
Expand Down
16 changes: 12 additions & 4 deletions command/agent/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1172,8 +1172,12 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Mode: helper.StringToPtr("delay"),
},
ReschedulePolicy: &api.ReschedulePolicy{
Interval: helper.TimeToPtr(12 * time.Hour),
Attempts: helper.IntToPtr(5),
Interval: helper.TimeToPtr(12 * time.Hour),
Attempts: helper.IntToPtr(5),
DelayFunction: helper.StringToPtr("linear"),
Delay: helper.TimeToPtr(30 * time.Second),
Unlimited: helper.BoolToPtr(true),
MaxDelay: helper.TimeToPtr(20 * time.Minute),
},
EphemeralDisk: &api.EphemeralDisk{
SizeMB: helper.IntToPtr(100),
Expand Down Expand Up @@ -1384,8 +1388,12 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Mode: "delay",
},
ReschedulePolicy: &structs.ReschedulePolicy{
Interval: 12 * time.Hour,
Attempts: 5,
Interval: 12 * time.Hour,
Attempts: 5,
DelayFunction: "linear",
Delay: 30 * time.Second,
Unlimited: true,
MaxDelay: 20 * time.Minute,
},
EphemeralDisk: &structs.EphemeralDisk{
SizeMB: 100,
Expand Down
Loading

0 comments on commit 3ece61c

Please sign in to comment.