Skip to content

Commit

Permalink
issue-742: bug in NextRun (go-co-op#743)
Browse files Browse the repository at this point in the history
* issue-742: bug in `NextRun`

* issue-742: bug in `NextRun` correction
  • Loading branch information
rbroggi authored Jun 21, 2024
1 parent fd18ca7 commit 7c391d4
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 16 deletions.
16 changes: 7 additions & 9 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,18 +357,16 @@ func (s *scheduler) selectExecJobsOutCompleted(id uuid.UUID) {
return
}

// if the job has more than one nextScheduled time,
// if the job has nextScheduled time in the past,
// we need to remove any that are in the past.
if len(j.nextScheduled) > 1 {
var newNextScheduled []time.Time
for _, t := range j.nextScheduled {
if t.Before(s.now()) {
continue
}
newNextScheduled = append(newNextScheduled, t)
var newNextScheduled []time.Time
for _, t := range j.nextScheduled {
if t.Before(s.now()) {
continue
}
j.nextScheduled = newNextScheduled
newNextScheduled = append(newNextScheduled, t)
}
j.nextScheduled = newNextScheduled

// if the job has a limited number of runs set, we need to
// check how many runs have occurred and stop running this
Expand Down
96 changes: 89 additions & 7 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"fmt"
"os"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/google/uuid"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
Expand Down Expand Up @@ -1185,6 +1187,92 @@ func TestScheduler_LimitModeAndSingleton(t *testing.T) {
}
}

func TestScheduler_OneTimeJob_DoesNotCleanupNext(t *testing.T) {
defer verifyNoGoroutineLeaks(t)

schedulerStartTime := time.Date(2024, time.April, 3, 4, 5, 0, 0, time.UTC)

tests := []struct {
name string
runAt time.Time
fakeClock clockwork.FakeClock
assertErr require.ErrorAssertionFunc
// asserts things about schedules, advance time and perform new assertions
advanceAndAsserts []func(
t *testing.T,
j Job,
clock clockwork.FakeClock,
runs *atomic.Uint32,
)
}{
{
name: "exhausted run do does not cleanup next item",
runAt: time.Date(2024, time.April, 22, 4, 5, 0, 0, time.UTC),
fakeClock: clockwork.NewFakeClockAt(schedulerStartTime),
advanceAndAsserts: []func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32){
func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) {
require.Equal(t, uint32(0), runs.Load())

// last not initialized
lastRunAt, err := j.LastRun()
require.NoError(t, err)
require.Equal(t, time.Time{}, lastRunAt)

// next is now
expected := time.Date(2024, time.April, 22, 4, 5, 0, 0, time.UTC)
nextRunAt, err := j.NextRun()
require.NoError(t, err)
require.Equal(t, expected, nextRunAt.UTC())

// advance and eventually run
oneSecondAfterNextRun := expected.Add(1 * time.Second)

clock.Advance(oneSecondAfterNextRun.Sub(schedulerStartTime))
require.Eventually(t, func() bool {
return assert.Equal(t, uint32(1), runs.Load())
}, 3*time.Second, 100*time.Millisecond)

// last was run
lastRunAt, err = j.LastRun()
require.NoError(t, err)
require.WithinDuration(t, expected, lastRunAt, 1*time.Second)

nextRunAt, err = j.NextRun()
require.NoError(t, err)
require.Equal(t, time.Time{}, nextRunAt)
},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := newTestScheduler(t, WithClock(tt.fakeClock), WithLocation(time.UTC))
t.Cleanup(func() {
require.NoError(t, s.Shutdown())
})

runs := atomic.Uint32{}
j, err := s.NewJob(
OneTimeJob(OneTimeJobStartDateTime(tt.runAt)),
NewTask(func() {
runs.Add(1)
}),
)
if tt.assertErr != nil {
tt.assertErr(t, err)
} else {
require.NoError(t, err)
s.Start()

for _, advanceAndAssert := range tt.advanceAndAsserts {
advanceAndAssert(t, j, tt.fakeClock, &runs)
}
}
})
}
}

var _ Elector = (*testElector)(nil)

type testElector struct {
Expand Down Expand Up @@ -1980,7 +2068,7 @@ func TestScheduler_OneTimeJob(t *testing.T) {

s := newTestScheduler(t)

j, err := s.NewJob(
_, err := s.NewJob(
OneTimeJob(tt.startAt()),
NewTask(func() {
jobRan <- struct{}{}
Expand All @@ -1996,12 +2084,6 @@ func TestScheduler_OneTimeJob(t *testing.T) {
t.Fatal("timed out waiting for job to run")
}

var nextRun time.Time
for ; nextRun.IsZero(); nextRun, err = j.NextRun() { //nolint:revive
}
assert.NoError(t, err)
assert.True(t, nextRun.Before(time.Now()))

assert.NoError(t, s.Shutdown())
})
}
Expand Down

0 comments on commit 7c391d4

Please sign in to comment.