Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Fix Data race when using RunByTag #356

Merged
merged 2 commits into from
Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions gocron.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var (
ErrInvalidInterval = errors.New(".Every() interval must be greater than 0")
ErrInvalidIntervalType = errors.New(".Every() interval must be int, time.Duration, or string")
ErrInvalidIntervalUnitsSelection = errors.New(".Every(time.Duration) and .Cron() cannot be used with units (e.g. .Seconds())")
ErrInvalidFunctionParameters = errors.New("length of function parameters must match job function parameters")

ErrAtTimeNotSupported = errors.New("the At() method is not supported for this time unit")
ErrWeekdayNotSupported = errors.New("weekday is not supported for time unit")
Expand Down
13 changes: 12 additions & 1 deletion job.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,6 @@ func (j *Job) SingletonMode() {
defer j.mu.Unlock()
j.runConfig.mode = singletonMode
j.jobFunction.limiter = &singleflight.Group{}

}

// shouldRun evaluates if this job should run again
Expand All @@ -410,10 +409,14 @@ func (j *Job) shouldRun() bool {

// LastRun returns the time the job was run last
func (j *Job) LastRun() time.Time {
j.mu.RLock()
defer j.mu.RUnlock()
return j.lastRun
}

func (j *Job) setLastRun(t time.Time) {
j.mu.Lock()
defer j.mu.Unlock()
j.lastRun = t
}

Expand All @@ -432,9 +435,17 @@ func (j *Job) setNextRun(t time.Time) {

// RunCount returns the number of time the job ran so far
func (j *Job) RunCount() int {
j.mu.RLock()
defer j.mu.RUnlock()
return j.runCount
}

func (j *Job) incrementRunCount() {
j.mu.Lock()
defer j.mu.Unlock()
j.runCount++
}

func (j *Job) stop() {
j.mu.Lock()
defer j.mu.Unlock()
Expand Down
24 changes: 18 additions & 6 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (s *Scheduler) StartAsync() {
}
}

//start starts the scheduler, scheduling and running jobs
// start starts the scheduler, scheduling and running jobs
func (s *Scheduler) start() {
go s.executor.start()
s.setRunning(true)
Expand Down Expand Up @@ -342,7 +342,6 @@ func (s *Scheduler) calculateTotalDaysDifference(lastRun time.Time, daysToWeekda
}

func (s *Scheduler) calculateDays(job *Job, lastRun time.Time) nextRun {

if job.getInterval() == 1 {
lastRunDayPlusJobAtTime := s.roundToMidnight(lastRun).Add(job.getAtTime(lastRun))

Expand Down Expand Up @@ -533,6 +532,21 @@ func (s *Scheduler) run(job *Job) {
return
}

job = s.addJobDetails(job)
if job.error != nil {
// delete the job from the scheduler as this job
// cannot be executed
s.RemoveByReference(job)
return
// return job.error
}

s.executor.jobFunctions <- job.jobFunction.copy()
job.setLastRun(s.now())
job.incrementRunCount()
}

func (s *Scheduler) addJobDetails(job *Job) *Job {
job.mu.Lock()
defer job.mu.Unlock()

Expand All @@ -544,13 +558,11 @@ func (s *Scheduler) run(job *Job) {
job.parameters[job.parametersLen] = job.copy()
default:
// something is really wrong and we should never get here
return
job.error = wrapOrError(job.error, ErrInvalidFunctionParameters)
}
}

s.executor.jobFunctions <- job.jobFunction.copy()
job.setLastRun(s.now())
job.runCount++
return job
}

func (s *Scheduler) runContinuous(job *Job) {
Expand Down
56 changes: 21 additions & 35 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ func TestImmediateExecution(t *testing.T) {
case <-semaphore:
// test passed
}

}

func TestScheduler_Every_InvalidInterval(t *testing.T) {
Expand All @@ -75,7 +74,6 @@ func TestScheduler_Every_InvalidInterval(t *testing.T) {
assert.EqualError(t, err, tc.expectedError)
})
}

}

func TestScheduler_EveryRandom(t *testing.T) {
Expand Down Expand Up @@ -172,7 +170,6 @@ func TestScheduler_Every(t *testing.T) {
s.Stop()
assert.Equal(t, 2, counter)
})

}

func TestExecutionSeconds(t *testing.T) {
Expand Down Expand Up @@ -392,7 +389,6 @@ func TestWeekdayAt(t *testing.T) {
}

func TestScheduler_Remove(t *testing.T) {

t.Run("remove from non-running", func(t *testing.T) {
s := NewScheduler(time.UTC)
s.TagsUnique()
Expand Down Expand Up @@ -784,7 +780,6 @@ func TestClearUnique(t *testing.T) {
}

func TestSetUnit(t *testing.T) {

testCases := []struct {
desc string
timeUnit schedulingUnit
Expand Down Expand Up @@ -1036,7 +1031,7 @@ func _getMinutes(i int) time.Duration {
}

func TestScheduler_Do(t *testing.T) {
var testCases = []struct {
testCases := []struct {
description string
evalFunc func(*Scheduler)
}{
Expand Down Expand Up @@ -1239,7 +1234,6 @@ func TestCalculateMonths(t *testing.T) {
}

func TestScheduler_SingletonMode(t *testing.T) {

testCases := []struct {
description string
removeJob bool
Expand All @@ -1250,7 +1244,6 @@ func TestScheduler_SingletonMode(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {

s := NewScheduler(time.UTC)
var trigger int32

Expand All @@ -1273,11 +1266,9 @@ func TestScheduler_SingletonMode(t *testing.T) {
s.Stop()
})
}

}

func TestScheduler_SingletonModeAll(t *testing.T) {

testCases := []struct {
description string
removeJob bool
Expand All @@ -1288,7 +1279,6 @@ func TestScheduler_SingletonModeAll(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {

s := NewScheduler(time.UTC)
s.SingletonModeAll()

Expand All @@ -1313,7 +1303,6 @@ func TestScheduler_SingletonModeAll(t *testing.T) {
s.Stop()
})
}

}

func TestScheduler_LimitRunsTo(t *testing.T) {
Expand Down Expand Up @@ -1404,7 +1393,8 @@ func TestScheduler_SetMaxConcurrentJobs(t *testing.T) {
// 1s - job 1 hits the limit and is skipped
// 2s - job 1 & 2 run
// 3s - job 1 hits the limit and is skipped
{"reschedule mode", 2, RescheduleMode, 4, false,
{
"reschedule mode", 2, RescheduleMode, 4, false,
func() {
semaphore <- true
time.Sleep(200 * time.Millisecond)
Expand All @@ -1416,15 +1406,17 @@ func TestScheduler_SetMaxConcurrentJobs(t *testing.T) {
// 1s - job 1 runs twice, the blocked run and the regularly scheduled run
// 2s - jobs 1 & 3 run
// 3s - jobs 2 & 3 run, job 1 hits the limit and waits
{"wait mode", 2, WaitMode, 8, false,
{
"wait mode", 2, WaitMode, 8, false,
func() {
semaphore <- true
time.Sleep(100 * time.Millisecond)
},
},

// Same as above - this confirms the same behavior when jobs are removed rather than the scheduler being stopped
{"wait mode - with job removal", 2, WaitMode, 8, true,
{
"wait mode - with job removal", 2, WaitMode, 8, true,
func() {
semaphore <- true
time.Sleep(100 * time.Millisecond)
Expand All @@ -1434,7 +1426,6 @@ func TestScheduler_SetMaxConcurrentJobs(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {

s := NewScheduler(time.UTC)
s.SetMaxConcurrentJobs(tc.maxConcurrentJobs, tc.mode)

Expand Down Expand Up @@ -1510,7 +1501,6 @@ func TestScheduler_TagsUnique(t *testing.T) {

_, err = s.Every("1s").Tag(bar).Do(func() {})
assert.EqualError(t, err, ErrTagsUnique(bar).Error())

}

func TestScheduler_MultipleTagsChained(t *testing.T) {
Expand Down Expand Up @@ -1671,21 +1661,29 @@ func TestScheduler_Update(t *testing.T) {

func TestScheduler_RunByTag(t *testing.T) {
var (
s = NewScheduler(time.Local)
count = 0
wg sync.WaitGroup
s = NewScheduler(time.Local)
wg sync.WaitGroup
counterMutex sync.RWMutex
count = 0
)

s.Every(1).Day().StartAt(time.Now().Add(time.Hour)).Tag("tag").Do(func() {
counterMutex.Lock()
defer counterMutex.Unlock()
count++
wg.Done()
})
wg.Add(1)
wg.Add(3)
s.StartAsync()

assert.NoError(t, s.RunByTag("tag"))
assert.NoError(t, s.RunByTag("tag"))
assert.NoError(t, s.RunByTag("tag"))

wg.Wait()
assert.Equal(t, 1, count)
counterMutex.RLock()
defer counterMutex.RUnlock()
assert.Equal(t, 3, count)
assert.Error(t, s.RunByTag("wrong-tag"))
}

Expand Down Expand Up @@ -1837,7 +1835,6 @@ func TestScheduler_WaitForSchedules(t *testing.T) {
}

func TestScheduler_LenWeekDays(t *testing.T) {

testCases := []struct {
description string
weekDays []time.Weekday
Expand All @@ -1860,7 +1857,6 @@ func TestScheduler_LenWeekDays(t *testing.T) {
assert.Equal(t, len(j.scheduledWeekdays), tc.finalLen)
})
}

}

func TestScheduler_CallNextWeekDay(t *testing.T) {
Expand All @@ -1869,7 +1865,7 @@ func TestScheduler_CallNextWeekDay(t *testing.T) {
}

const wantTimeUntilNextRun = time.Hour * 24 * 2
var lastRun = januaryFirst2020At(0, 0, 0)
lastRun := januaryFirst2020At(0, 0, 0)

testCases := []struct {
description string
Expand All @@ -1881,7 +1877,6 @@ func TestScheduler_CallNextWeekDay(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {

s := NewScheduler(time.UTC)
s.Every(1)

Expand All @@ -1895,10 +1890,8 @@ func TestScheduler_CallNextWeekDay(t *testing.T) {

got := s.durationToNextRun(lastRun, job).duration
assert.Equal(t, wantTimeUntilNextRun, got)

})
}

}

func TestScheduler_Midday(t *testing.T) {
Expand Down Expand Up @@ -1986,7 +1979,6 @@ func TestScheduler_CheckNextWeekDay(t *testing.T) {
job.lastRun = secondLastRun
gotSecond := s.durationToNextRun(secondLastRun, job).duration
assert.Equal(t, wantTimeUntilNextSecondRun, gotSecond)

})
}

Expand Down Expand Up @@ -2043,18 +2035,14 @@ func TestScheduler_CheckEveryWeekHigherThanOne(t *testing.T) {
} else if tc.caseTest == 3 {
assert.Equal(t, wantTimeUntilNextRunTwoWeeksLessOneDay, got)
}

}
job.runCount++
}

})
}

}

func TestScheduler_StartImmediately(t *testing.T) {

testCases := []struct {
description string
scheduler *Scheduler
Expand Down Expand Up @@ -2110,7 +2098,6 @@ func TestScheduler_CheckCalculateDaysOfMonth(t *testing.T) {
func TestScheduler_CheckSetBehaviourBeforeJobCreated(t *testing.T) {
s := NewScheduler(time.UTC)
s.Month(1, 2).Every(1).Do(func() {})

}

func TestScheduler_MonthLastDayAtTime(t *testing.T) {
Expand All @@ -2124,7 +2111,6 @@ func TestScheduler_MonthLastDayAtTime(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {

s := NewScheduler(time.UTC)
got := s.durationToNextRun(tc.job.LastRun(), tc.job).duration
assert.Equalf(t, tc.wantTimeUntilNextRun, got, fmt.Sprintf("expected %s / got %s", tc.wantTimeUntilNextRun.String(), got.String()))
Expand Down