Skip to content

Commit

Permalink
issue-740: expand oneTimeJob to support multiple times (go-co-op#741)
Browse files Browse the repository at this point in the history
  • Loading branch information
rbroggi authored Jun 21, 2024
1 parent 7c391d4 commit 64d6e48
Show file tree
Hide file tree
Showing 3 changed files with 256 additions and 15 deletions.
11 changes: 11 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,17 @@ func ExampleOneTimeJob() {
func() {},
),
)
// run job twice - once in 10 seconds and once in 55 minutes
n := time.Now()
_, _ = s.NewJob(
OneTimeJob(
OneTimeJobStartDateTimes(
n.Add(10*time.Second),
n.Add(55*time.Minute),
),
),
NewTask(func() {}),
)

s.Start()
}
Expand Down
78 changes: 63 additions & 15 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"math/rand"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -446,35 +447,47 @@ type oneTimeJobDefinition struct {
}

func (o oneTimeJobDefinition) setup(j *internalJob, _ *time.Location, now time.Time) error {
j.jobSchedule = oneTimeJob{}
if err := o.startAt(j); err != nil {
return err
sortedTimes := o.startAt(j)
sort.Slice(sortedTimes, func(i, j int) bool {
return sortedTimes[i].Before(sortedTimes[j])
})
// keep only schedules that are in the future
idx, found := slices.BinarySearchFunc(sortedTimes, now, timeCmp())
if found {
idx++
}
// in case we are not in the `startImmediately` case, our start-date must be in
// the future according to the scheduler clock
if !j.startImmediately && (j.startTime.IsZero() || j.startTime.Before(now)) {
sortedTimes = sortedTimes[idx:]
if !j.startImmediately && len(sortedTimes) == 0 {
return ErrOneTimeJobStartDateTimePast
}
j.jobSchedule = oneTimeJob{sortedTimes: sortedTimes}
return nil
}

// OneTimeJobStartAtOption defines when the one time job is run
type OneTimeJobStartAtOption func(*internalJob) error
type OneTimeJobStartAtOption func(*internalJob) []time.Time

// OneTimeJobStartImmediately tells the scheduler to run the one time job immediately.
func OneTimeJobStartImmediately() OneTimeJobStartAtOption {
return func(j *internalJob) error {
return func(j *internalJob) []time.Time {
j.startImmediately = true
return nil
return []time.Time{}
}
}

// OneTimeJobStartDateTime sets the date & time at which the job should run.
// This datetime must be in the future (according to the scheduler clock).
func OneTimeJobStartDateTime(start time.Time) OneTimeJobStartAtOption {
return func(j *internalJob) error {
j.startTime = start
return nil
return func(j *internalJob) []time.Time {
return []time.Time{start}
}
}

// OneTimeJobStartDateTimes sets the date & times at which the job should run.
// At least one of the date/times must be in the future (according to the scheduler clock).
func OneTimeJobStartDateTimes(times ...time.Time) OneTimeJobStartAtOption {
return func(j *internalJob) []time.Time {
return times
}
}

Expand All @@ -486,6 +499,18 @@ func OneTimeJob(startAt OneTimeJobStartAtOption) JobDefinition {
}
}

func timeCmp() func(element time.Time, target time.Time) int {
return func(element time.Time, target time.Time) int {
if element.Equal(target) {
return 0
}
if element.Before(target) {
return -1
}
return 1
}
}

// -----------------------------------------------
// -----------------------------------------------
// ----------------- Job Options -----------------
Expand Down Expand Up @@ -876,10 +901,33 @@ func (m monthlyJob) nextMonthDayAtTime(lastRun time.Time, days []int, firstPass

var _ jobSchedule = (*oneTimeJob)(nil)

type oneTimeJob struct{}
type oneTimeJob struct {
sortedTimes []time.Time
}

func (o oneTimeJob) next(_ time.Time) time.Time {
return time.Time{}
// next finds the next item in a sorted list of times using binary-search.
//
// example: sortedTimes: [2, 4, 6, 8]
//
// lastRun: 1 => [idx=0,found=false] => next is 2 - sorted[idx] idx=0
// lastRun: 2 => [idx=0,found=true] => next is 4 - sorted[idx+1] idx=1
// lastRun: 3 => [idx=1,found=false] => next is 4 - sorted[idx] idx=1
// lastRun: 4 => [idx=1,found=true] => next is 6 - sorted[idx+1] idx=2
// lastRun: 7 => [idx=3,found=false] => next is 8 - sorted[idx] idx=3
// lastRun: 8 => [idx=3,found=found] => next is none
// lastRun: 9 => [idx=3,found=found] => next is none
func (o oneTimeJob) next(lastRun time.Time) time.Time {
idx, found := slices.BinarySearchFunc(o.sortedTimes, lastRun, timeCmp())
// if found, the next run is the following index
if found {
idx++
}
// exhausted runs
if idx >= len(o.sortedTimes) {
return time.Time{}
}

return o.sortedTimes[idx]
}

// -----------------------------------------------
Expand Down
182 changes: 182 additions & 0 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2089,6 +2089,188 @@ func TestScheduler_OneTimeJob(t *testing.T) {
}
}

func TestScheduler_AtTimesJob(t *testing.T) {
defer verifyNoGoroutineLeaks(t)

n := time.Now().UTC()

tests := []struct {
name string
atTimes []time.Time
fakeClock clockwork.FakeClock
assertErr require.ErrorAssertionFunc
// asserts things about schedules, advance time and perform new assertions
advanceAndAsserts []func(
t *testing.T,
j Job,
clock clockwork.FakeClock,
runs *atomic.Uint32,
)
}{
{
name: "no at times",
atTimes: []time.Time{},
fakeClock: clockwork.NewFakeClock(),
assertErr: func(t require.TestingT, err error, i ...interface{}) {
require.ErrorIs(t, err, ErrOneTimeJobStartDateTimePast)
},
},
{
name: "all in the past",
atTimes: []time.Time{n.Add(-1 * time.Second)},
fakeClock: clockwork.NewFakeClockAt(n),
assertErr: func(t require.TestingT, err error, i ...interface{}) {
require.ErrorIs(t, err, ErrOneTimeJobStartDateTimePast)
},
},
{
name: "one run 1 millisecond in the future",
atTimes: []time.Time{n.Add(1 * time.Millisecond)},
fakeClock: clockwork.NewFakeClockAt(n),
advanceAndAsserts: []func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32){
func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) {
require.Equal(t, uint32(0), runs.Load())

// last not initialized
lastRunAt, err := j.LastRun()
require.NoError(t, err)
require.Equal(t, time.Time{}, lastRunAt)

// next is now
nextRunAt, err := j.NextRun()
require.NoError(t, err)
require.Equal(t, n.Add(1*time.Millisecond), nextRunAt)

// advance and eventually run
clock.Advance(2 * time.Millisecond)
require.Eventually(t, func() bool {
return assert.Equal(t, uint32(1), runs.Load())
}, 3*time.Second, 100*time.Millisecond)

// last was run
lastRunAt, err = j.LastRun()
require.NoError(t, err)
require.WithinDuration(t, n.Add(1*time.Millisecond), lastRunAt, 1*time.Millisecond)

nextRunAt, err = j.NextRun()
require.NoError(t, err)
require.Equal(t, time.Time{}, nextRunAt)
},
},
},
{
name: "one run in the past and one in the future",
atTimes: []time.Time{n.Add(-1 * time.Millisecond), n.Add(1 * time.Millisecond)},
fakeClock: clockwork.NewFakeClockAt(n),
advanceAndAsserts: []func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32){
func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) {
require.Equal(t, uint32(0), runs.Load())

// last not initialized
lastRunAt, err := j.LastRun()
require.NoError(t, err)
require.Equal(t, time.Time{}, lastRunAt)

// next is now
nextRunAt, err := j.NextRun()
require.NoError(t, err)
require.Equal(t, n.Add(1*time.Millisecond), nextRunAt)

// advance and eventually run
clock.Advance(2 * time.Millisecond)
require.Eventually(t, func() bool {
return assert.Equal(t, uint32(1), runs.Load())
}, 3*time.Second, 100*time.Millisecond)

// last was run
lastRunAt, err = j.LastRun()
require.NoError(t, err)
require.WithinDuration(t, n.Add(1*time.Millisecond), lastRunAt, 1*time.Millisecond)
},
},
},
{
name: "two runs in the future",
atTimes: []time.Time{n.Add(1 * time.Millisecond), n.Add(3 * time.Millisecond)},
fakeClock: clockwork.NewFakeClockAt(n),
advanceAndAsserts: []func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32){
func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) {
require.Equal(t, uint32(0), runs.Load())

// last not initialized
lastRunAt, err := j.LastRun()
require.NoError(t, err)
require.Equal(t, time.Time{}, lastRunAt)

// next is now
nextRunAt, err := j.NextRun()
require.NoError(t, err)
require.Equal(t, n.Add(1*time.Millisecond), nextRunAt)

// advance and eventually run
clock.Advance(2 * time.Millisecond)
require.Eventually(t, func() bool {
return assert.Equal(t, uint32(1), runs.Load())
}, 3*time.Second, 100*time.Millisecond)

// last was run
lastRunAt, err = j.LastRun()
require.NoError(t, err)
require.WithinDuration(t, n.Add(1*time.Millisecond), lastRunAt, 1*time.Millisecond)

nextRunAt, err = j.NextRun()
require.NoError(t, err)
require.Equal(t, n.Add(3*time.Millisecond), nextRunAt)
},

func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) {
// advance and eventually run
clock.Advance(2 * time.Millisecond)
require.Eventually(t, func() bool {
return assert.Equal(t, uint32(2), runs.Load())
}, 3*time.Second, 100*time.Millisecond)

// last was run
lastRunAt, err := j.LastRun()
require.NoError(t, err)
require.WithinDuration(t, n.Add(3*time.Millisecond), lastRunAt, 1*time.Millisecond)

nextRunAt, err := j.NextRun()
require.NoError(t, err)
require.Equal(t, time.Time{}, nextRunAt)
},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := newTestScheduler(t, WithClock(tt.fakeClock))
t.Cleanup(func() {
require.NoError(t, s.Shutdown())
})

runs := atomic.Uint32{}
j, err := s.NewJob(
OneTimeJob(OneTimeJobStartDateTimes(tt.atTimes...)),
NewTask(func() {
runs.Add(1)
}),
)
if tt.assertErr != nil {
tt.assertErr(t, err)
} else {
require.NoError(t, err)
s.Start()

for _, advanceAndAssert := range tt.advanceAndAsserts {
advanceAndAssert(t, j, tt.fakeClock, &runs)
}
}
})
}
}

func TestScheduler_WithLimitedRuns(t *testing.T) {
defer verifyNoGoroutineLeaks(t)

Expand Down

0 comments on commit 64d6e48

Please sign in to comment.