Skip to content

Commit

Permalink
Failure Policy: Adds FailurePolicy API field to Job
Browse files Browse the repository at this point in the history
Based of dapr/proposals#66

Adds a `FailurePolicy` option to the `Job` API to allow re-triggering
job which are marked as failed by the caller. Adds two types of policy;
`Drop` and `Constant`. `Drop` has no retry policy, `Constant` will
constantly retry the job trigger for a configurable delay, up to a
configurable maximum number of retries (which could be infinite).

Note that the failure policy retry cadence has no effect on the actual
Job schedule, meaning if a job was to be retired and eventually
succeeded, the Job would continue to trigger at the origin configured
schedule.

By default, all Jobs will have a `Constant` policy with a delay of 1s.

Signed-off-by: joshvanl <[email protected]>
  • Loading branch information
JoshVanL committed Oct 15, 2024
1 parent 05f7c29 commit 4e1dd87
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 1 deletion.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ A Job itself is made up of the following fields:
Optional.
- `FailurePolicy` Controls whether the Job should be retired if the trigger
function returns false. `Drop` doesn't retry the job, `Constant `Constant` will
constantly retry the job trigger for a configurable internal, up to a configurable
constantly retry the job trigger for a configurable interval, up to a configurable
maximum number of retries (which could be infinite). By default, Jobs have a
`Constant` policy, with a 1s interval and 3 maximum retries.

Expand Down
175 changes: 175 additions & 0 deletions cron/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,181 @@ func Test_FailurePolicy(t *testing.T) {
})
}

func Test_FailurePolicy(t *testing.T) {
t.Parallel()

t.Run("default policy should retry 3 times with a 1sec delay", func(t *testing.T) {
t.Parallel()

gotCh := make(chan *api.TriggerRequest, 1)
var got atomic.Uint32
cron := testCronWithOptions(t, testCronOptions{
total: 1,
client: tests.EmbeddedETCDBareClient(t),
triggerFn: func(*api.TriggerRequest) bool {
assert.GreaterOrEqual(t, uint32(8), got.Add(1))
return false
},
gotCh: gotCh,
})

require.NoError(t, cron.api.Add(context.Background(), "test", &api.Job{
DueTime: ptr.Of(time.Now().Format(time.RFC3339)),
Schedule: ptr.Of("@every 1s"),
Repeats: ptr.Of(uint32(2)),
}))

for range 8 {
resp, err := cron.api.Get(context.Background(), "test")
require.NoError(t, err)
assert.NotNil(t, resp)
select {
case <-gotCh:
case <-time.After(time.Second * 3):
assert.Fail(t, "timeout waiting for trigger")
}
}

assert.EventuallyWithT(t, func(c *assert.CollectT) {
resp, err := cron.api.Get(context.Background(), "test")
assert.NoError(c, err)
assert.Nil(c, resp)
}, time.Second*5, time.Millisecond*10)
})

t.Run("drop policy should not retry triggering", func(t *testing.T) {
t.Parallel()

gotCh := make(chan *api.TriggerRequest, 1)
var got atomic.Uint32
cron := testCronWithOptions(t, testCronOptions{
total: 1,
client: tests.EmbeddedETCDBareClient(t),
triggerFn: func(*api.TriggerRequest) bool {
assert.GreaterOrEqual(t, uint32(2), got.Add(1))
return false
},
gotCh: gotCh,
})

require.NoError(t, cron.api.Add(context.Background(), "test", &api.Job{
DueTime: ptr.Of(time.Now().Format(time.RFC3339)),
Schedule: ptr.Of("@every 1s"),
Repeats: ptr.Of(uint32(2)),
FailurePolicy: &api.FailurePolicy{
Policy: new(api.FailurePolicy_Drop),
},
}))

for range 2 {
resp, err := cron.api.Get(context.Background(), "test")
require.NoError(t, err)
assert.NotNil(t, resp)
select {
case <-gotCh:
case <-time.After(time.Second * 3):
assert.Fail(t, "timeout waiting for trigger")
}
}

assert.EventuallyWithT(t, func(c *assert.CollectT) {
resp, err := cron.api.Get(context.Background(), "test")
assert.NoError(c, err)
assert.Nil(c, resp)
}, time.Second*5, time.Millisecond*10)
})

t.Run("constant policy should only retry when it fails ", func(t *testing.T) {
t.Parallel()

gotCh := make(chan *api.TriggerRequest, 1)
var got atomic.Uint32
cron := testCronWithOptions(t, testCronOptions{
total: 1,
client: tests.EmbeddedETCDBareClient(t),
triggerFn: func(*api.TriggerRequest) bool {
assert.GreaterOrEqual(t, uint32(5), got.Add(1))
return got.Load() == 3
},
gotCh: gotCh,
})

require.NoError(t, cron.api.Add(context.Background(), "test", &api.Job{
DueTime: ptr.Of(time.Now().Format(time.RFC3339)),
Schedule: ptr.Of("@every 1s"),
Repeats: ptr.Of(uint32(3)),
FailurePolicy: &api.FailurePolicy{
Policy: &api.FailurePolicy_Constant{
Constant: &api.FailurePolicyConstant{
Delay: durationpb.New(time.Millisecond), MaxRetries: ptr.Of(uint32(1)),
},
},
},
}))

for range 5 {
resp, err := cron.api.Get(context.Background(), "test")
require.NoError(t, err)
assert.NotNil(t, resp)
select {
case <-gotCh:
case <-time.After(time.Second * 3):
assert.Fail(t, "timeout waiting for trigger")
}
}

assert.EventuallyWithT(t, func(c *assert.CollectT) {
resp, err := cron.api.Get(context.Background(), "test")
assert.NoError(c, err)
assert.Nil(c, resp)
}, time.Second*5, time.Millisecond*10)
})

t.Run("constant policy can retry forever until it succeeds", func(t *testing.T) {
t.Parallel()

gotCh := make(chan *api.TriggerRequest, 1)
var got atomic.Uint32
cron := testCronWithOptions(t, testCronOptions{
total: 1,
client: tests.EmbeddedETCDBareClient(t),
triggerFn: func(*api.TriggerRequest) bool {
assert.GreaterOrEqual(t, uint32(100), got.Add(1))
return got.Load() == 100
},
gotCh: gotCh,
})

require.NoError(t, cron.api.Add(context.Background(), "test", &api.Job{
DueTime: ptr.Of(time.Now().Format(time.RFC3339)),
FailurePolicy: &api.FailurePolicy{
Policy: &api.FailurePolicy_Constant{
Constant: &api.FailurePolicyConstant{
Delay: durationpb.New(time.Millisecond),
},
},
},
}))

for range 100 {
resp, err := cron.api.Get(context.Background(), "test")
require.NoError(t, err)
assert.NotNil(t, resp)
select {
case <-gotCh:
case <-time.After(time.Second * 3):
assert.Fail(t, "timeout waiting for trigger")
}
}

assert.EventuallyWithT(t, func(c *assert.CollectT) {
resp, err := cron.api.Get(context.Background(), "test")
assert.NoError(c, err)
assert.Nil(c, resp)
}, time.Second*5, time.Millisecond*10)
})
}

type testCronOptions struct {
total uint32
gotCh chan *api.TriggerRequest
Expand Down

0 comments on commit 4e1dd87

Please sign in to comment.