Skip to content

Commit

Permalink
setting up test structure
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnRoesler committed Sep 29, 2023
1 parent 865b396 commit 2f0f9c0
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 35 deletions.
107 changes: 80 additions & 27 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,65 @@ type executor struct {
shutdownTimeout time.Duration
done chan error
singletonRunners map[uuid.UUID]singletonRunner
limitMode *limitMode
}

type singletonRunner struct {
in chan uuid.UUID
done chan struct{}
}

type limitMode struct {
started bool
mode LimitMode
limit int
rescheduleLimiter chan struct{}
in chan uuid.UUID
done chan struct{}
}

func (e *executor) start() {
wg := sync.WaitGroup{}
for {
select {
case id := <-e.jobsIDsIn:
j := requestJob(id, e.jobOutRequest)
if j.singletonMode {
runner, ok := e.singletonRunners[id]
if !ok {
runner.in = make(chan uuid.UUID, 1000)
runner.done = make(chan struct{})
e.singletonRunners[id] = runner
go e.singletonRunner(runner.in, runner.done)
if e.limitMode != nil {
if !e.limitMode.started {
for i := e.limitMode.limit; i > 0; i-- {
go e.limitModeRunner(e.limitMode.in, e.limitMode.done)
}
}
if e.limitMode.mode == LimitModeReschedule {
select {
case e.limitMode.rescheduleLimiter <- struct{}{}:
e.limitMode.in <- id
default:
// all runners are busy, reschedule the work for later
// which means we just skip it here and do nothing
// TODO when metrics are added, this should increment a rescheduled metric
}
} else {
// TODO when metrics are added, this should increment a wait metric
e.limitMode.in <- id
}
runner.in <- id
} else {
wg.Add(1)
go func(j job) {
e.runJob(j)
wg.Done()
}(j)
j := requestJob(id, e.jobOutRequest)
if j.singletonMode {
runner, ok := e.singletonRunners[id]
if !ok {
runner.in = make(chan uuid.UUID, 1000)
runner.done = make(chan struct{})
e.singletonRunners[id] = runner
go e.singletonRunner(runner.in, runner.done)
}
runner.in <- id
} else {
wg.Add(1)
go func(j job) {
e.runJob(j)
wg.Done()
}(j)
}
}

case <-e.schCtx.Done():
Expand Down Expand Up @@ -87,22 +118,44 @@ func (e *executor) start() {
}

func (e *executor) singletonRunner(in chan uuid.UUID, done chan struct{}) {
select {
case id := <-in:
j := requestJob(id, e.jobOutRequest)
e.runJob(j)
case <-e.ctx.Done():
done <- struct{}{}
for {
select {
case id := <-in:
j := requestJob(id, e.jobOutRequest)
e.runJob(j)
case <-e.ctx.Done():
done <- struct{}{}
return
}
}
}

func (e *executor) limitModeRunner(in chan uuid.UUID, done chan struct{}) {
for {
select {
case id := <-in:
j := requestJob(id, e.jobOutRequest)
e.runJob(j)
// remove the limiter block to allow another job to be scheduled
<-e.limitMode.rescheduleLimiter
case <-e.ctx.Done():
done <- struct{}{}
return
}
}
}

func (e *executor) runJob(j job) {
_ = callJobFuncWithParams(j.beforeJobRuns, j.id)
err := callJobFuncWithParams(j.function, j.parameters...)
if err != nil {
_ = callJobFuncWithParams(j.afterJobRunsWithError, j.id, err)
} else {
_ = callJobFuncWithParams(j.afterJobRuns, j.id)
select {
case <-j.ctx.Done():
default:
_ = callJobFuncWithParams(j.beforeJobRuns, j.id)
e.jobIDsOut <- j.id
err := callJobFuncWithParams(j.function, j.parameters...)
if err != nil {
_ = callJobFuncWithParams(j.afterJobRunsWithError, j.id, err)
} else {
_ = callJobFuncWithParams(j.afterJobRuns, j.id)
}
}
e.jobIDsOut <- j.id
}
12 changes: 6 additions & 6 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func CronJob(crontab string, withSeconds bool, task Task, options ...JobOption)
var _ JobDefinition = (*durationJobDefinition)(nil)

type durationJobDefinition struct {
duration string
duration time.Duration
opts []JobOption
tas Task
}
Expand All @@ -116,20 +116,19 @@ func (d durationJobDefinition) options() []JobOption {
}

func (d durationJobDefinition) setup(j *job, location *time.Location) error {
dur, err := time.ParseDuration(d.duration)
if err != nil {
return fmt.Errorf("gocron: failed to parse duration: %w", err)
if d.duration <= 0 {
return fmt.Errorf("gocron: duration must be greater than 0")
}

j.jobSchedule = &durationJob{duration: dur}
j.jobSchedule = &durationJob{duration: d.duration}
return nil
}

func (d durationJobDefinition) task() Task {
return d.tas
}

func DurationJob(duration string, task Task, options ...JobOption) JobDefinition {
func DurationJob(duration time.Duration, task Task, options ...JobOption) JobDefinition {
return durationJobDefinition{
duration: duration,
opts: options,
Expand Down Expand Up @@ -181,6 +180,7 @@ func LimitRunsTo(runLimit int) JobOption {

func SingletonMode() JobOption {
return func(j *job) error {
j.singletonMode = true
return nil
}
}
Expand Down
5 changes: 3 additions & 2 deletions job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package gocron

import (
"testing"
"time"
)

func TestDurationJob(t *testing.T) {
tests := []struct {
name string
duration string
duration time.Duration
expectedErr *string
}{
{"success", "1s", nil},
{"success", time.Second, nil},
}

for _, tt := range tests {
Expand Down
12 changes: 12 additions & 0 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ const (

func WithLimit(limit int, mode LimitMode) SchedulerOption {
return func(s *scheduler) error {
if limit <= 0 {
return fmt.Errorf("gocron: WithLimit: limit must be greater than 0")
}
s.exec.limitMode = &limitMode{
mode: mode,
limit: limit,
in: make(chan uuid.UUID, 1000),
done: make(chan struct{}),
}
if mode == LimitModeReschedule {
s.exec.limitMode.rescheduleLimiter = make(chan struct{}, limit)
}
return nil
}
}
Expand Down
122 changes: 122 additions & 0 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,125 @@ func TestScheduler_Cron_Start_Stop(t *testing.T) {
err = s.Stop()
assert.NoError(t, err)
}

func TestScheduler(t *testing.T) {
t.Parallel()

cronNoOptionsCh := make(chan struct{})
cronSingletonCh := make(chan struct{})
durationNoOptionsCh := make(chan struct{})

type testJob struct {
name string
ch chan struct{}
jd JobDefinition
}

tests := []struct {
name string
testJobs []testJob
options []SchedulerOption
runCount int
expectedTimeMin time.Duration
expectedTimeMax time.Duration
}{
{
"no scheduler options 1 second jobs",
[]testJob{
{
"cron",
cronNoOptionsCh,
CronJob(
"* * * * * *",
true,
Task{
Function: func() {
cronNoOptionsCh <- struct{}{}
},
},
),
},
{
"duration",
durationNoOptionsCh,
DurationJob(
time.Second,
Task{
Function: func() {
durationNoOptionsCh <- struct{}{}
},
},
),
},
},
nil,

1,
time.Millisecond * 1,
time.Second,
},
{
"cron - singleton mode",
[]testJob{
{
"cron",
cronSingletonCh,
CronJob(
"* * * * * *",
true,
Task{
Function: func() {
time.Sleep(2 * time.Second)
cronSingletonCh <- struct{}{}
},
},
SingletonMode(),
),
},
},
nil,
2,
time.Second * 2,
time.Second * 6,
},
}

for _, tt := range tests {

for _, tj := range tt.testJobs {
t.Run(tt.name+"_"+tj.name, func(t *testing.T) {
t.Parallel()

s, err := NewScheduler(tt.options...)
require.NoError(t, err)

_, err = s.NewJob(tj.jd)
require.NoError(t, err)

s.Start()

startTime := time.Now()
var runCount int
for runCount < tt.runCount {
<-tj.ch
runCount++
}
err = s.Stop()
require.NoError(t, err)
stopTime := time.Now()

select {
case <-tj.ch:
t.Fatal("job ran after scheduler was stopped")
case <-time.After(time.Millisecond * 50):
}

runDuration := stopTime.Sub(startTime)
assert.GreaterOrEqual(t, runDuration, tt.expectedTimeMin)
assert.LessOrEqual(t, runDuration, tt.expectedTimeMax)

})
}
}

}

0 comments on commit 2f0f9c0

Please sign in to comment.