From c2b6aaf162dfed59cb63a1ac32b059af4864eb6f Mon Sep 17 00:00:00 2001 From: John Roesler Date: Fri, 3 Nov 2023 07:55:00 -0500 Subject: [PATCH] working on resolving shutdown races --- executor.go | 115 +++++++++++++++++++++++++++------------------------ job_test.go | 5 +-- scheduler.go | 22 +++++----- util.go | 24 +++++++++++ 4 files changed, 98 insertions(+), 68 deletions(-) diff --git a/executor.go b/executor.go index 74ecb3ca..72c98dc3 100644 --- a/executor.go +++ b/executor.go @@ -11,6 +11,7 @@ import ( type executor struct { ctx context.Context cancel context.CancelFunc + stopCh chan struct{} jobsIDsIn chan uuid.UUID jobIDsOut chan uuid.UUID jobOutRequest chan jobOutRequest @@ -37,85 +38,93 @@ type limitMode struct { done chan struct{} } -func (e *executor) start() { - wg := sync.WaitGroup{} +func (e *executor) start(shutdownCtx context.Context) { + e.ctx, e.cancel = context.WithCancel(shutdownCtx) + wg := waitGroupWithMutex{ + wg: sync.WaitGroup{}, + mu: sync.Mutex{}, + } for { select { case id := <-e.jobsIDsIn: - if e.limitMode != nil { - if !e.limitMode.started { - for i := e.limitMode.limit; i > 0; i-- { - go e.limitModeRunner(e.limitMode.in, e.limitMode.done) - } - } - if e.limitMode.mode == LimitModeReschedule { - select { - case e.limitMode.rescheduleLimiter <- struct{}{}: - e.limitMode.in <- id - default: - // all runners are busy, reschedule the work for later - // which means we just skip it here and do nothing - // TODO when metrics are added, this should increment a rescheduled metric - e.jobIDsOut <- id - } - } else { - // TODO when metrics are added, this should increment a wait metric - e.limitMode.in <- id - } - } else { - j := requestJobCtx(e.ctx, id, e.jobOutRequest) - if j == nil { - continue - } - if j.singletonMode { - runner, ok := e.singletonRunners[id] - if !ok { - runner.in = make(chan uuid.UUID, 1000) - runner.done = make(chan struct{}) - runner.mode = j.singletonLimitMode - if runner.mode == LimitModeReschedule { - runner.rescheduleLimiter = make(chan struct{}, 1) + go func() { + if e.limitMode != nil { + if !e.limitMode.started { + for i := e.limitMode.limit; i > 0; i-- { + go e.limitModeRunner(e.limitMode.in, e.limitMode.done) } - e.singletonRunners[id] = runner - go e.singletonRunner(runner) } - - if runner.mode == LimitModeReschedule { + if e.limitMode.mode == LimitModeReschedule { select { - case runner.rescheduleLimiter <- struct{}{}: - runner.in <- id + case e.limitMode.rescheduleLimiter <- struct{}{}: + e.limitMode.in <- id default: - // runner is busy, reschedule the work for later + // all runners are busy, reschedule the work for later // which means we just skip it here and do nothing // TODO when metrics are added, this should increment a rescheduled metric e.jobIDsOut <- id } } else { - runner.in <- id + // TODO when metrics are added, this should increment a wait metric + e.limitMode.in <- id } } else { - wg.Add(1) - go func(j internalJob) { - e.runJob(j) - wg.Done() - }(*j) - } - } + j := requestJobCtx(e.ctx, id, e.jobOutRequest) + if j == nil { + return + } + if j.singletonMode { + runner, ok := e.singletonRunners[id] + if !ok { + runner.in = make(chan uuid.UUID, 1000) + runner.done = make(chan struct{}) + runner.mode = j.singletonLimitMode + if runner.mode == LimitModeReschedule { + runner.rescheduleLimiter = make(chan struct{}, 1) + } + e.singletonRunners[id] = runner + go e.singletonRunner(runner) + } - case <-e.ctx.Done(): + if runner.mode == LimitModeReschedule { + select { + case runner.rescheduleLimiter <- struct{}{}: + runner.in <- id + default: + // runner is busy, reschedule the work for later + // which means we just skip it here and do nothing + // TODO when metrics are added, this should increment a rescheduled metric + e.jobIDsOut <- id + } + } else { + runner.in <- id + } + } else { + wg.Add(1) + go func(j internalJob) { + e.runJob(j) + wg.Done() + }(*j) + } + } + }() + case <-e.stopCh: + e.cancel() waitForJobs := make(chan struct{}) waitForSingletons := make(chan struct{}) waiterCtx, waiterCancel := context.WithCancel(context.Background()) go func() { - wg.Wait() + go func() { + wg.Wait() + waitForJobs <- struct{}{} + }() select { case <-waiterCtx.Done(): return default: } - waitForJobs <- struct{}{} }() go func() { For: diff --git a/job_test.go b/job_test.go index b2102280..ad902026 100644 --- a/job_test.go +++ b/job_test.go @@ -1,7 +1,6 @@ package gocron import ( - "log" "math/rand" "testing" "time" @@ -323,9 +322,7 @@ func TestJob_LastRun(t *testing.T) { time.Second, ), NewTask( - func() { - log.Println("job ran") - }, + func() {}, ), WithStartAt(WithStartImmediately()), ) diff --git a/scheduler.go b/scheduler.go index b4217dc2..099ba0be 100644 --- a/scheduler.go +++ b/scheduler.go @@ -32,7 +32,7 @@ type Scheduler interface { type scheduler struct { shutdownCtx context.Context - cancel context.CancelFunc + shutdownCancel context.CancelFunc exec executor jobs map[uuid.UUID]internalJob location *time.Location @@ -63,6 +63,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { schCtx, cancel := context.WithCancel(context.Background()) exec := executor{ + stopCh: make(chan struct{}), stopTimeout: time.Second * 10, singletonRunners: make(map[uuid.UUID]singletonRunner), @@ -73,12 +74,12 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { } s := &scheduler{ - shutdownCtx: schCtx, - cancel: cancel, - exec: exec, - jobs: make(map[uuid.UUID]internalJob), - location: time.Local, - clock: clockwork.NewRealClock(), + shutdownCtx: schCtx, + shutdownCancel: cancel, + exec: exec, + jobs: make(map[uuid.UUID]internalJob), + location: time.Local, + clock: clockwork.NewRealClock(), newJobCh: make(chan internalJob), removeJobCh: make(chan uuid.UUID), @@ -150,7 +151,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { // about jobs. func (s *scheduler) stopScheduler() { - s.exec.cancel() + s.exec.stopCh <- struct{}{} s.started = false for _, j := range s.jobs { j.stop() @@ -261,9 +262,8 @@ func (s *scheduler) selectRemoveJobsByTags(tags []string) { } func (s *scheduler) selectStart() { - s.exec.ctx, s.exec.cancel = context.WithCancel(s.shutdownCtx) - go s.exec.start() + go s.exec.start(s.shutdownCtx) s.started = true for id, j := range s.jobs { @@ -417,7 +417,7 @@ func (s *scheduler) StopJobs() error { // the Scheduler or Job's as the Scheduler cannot // be restarted after calling Shutdown. func (s *scheduler) Shutdown() error { - s.cancel() + s.shutdownCancel() select { case err := <-s.exec.done: return err diff --git a/util.go b/util.go index e18eee48..8169a070 100644 --- a/util.go +++ b/util.go @@ -3,6 +3,7 @@ package gocron import ( "context" "reflect" + "sync" "time" "github.com/google/uuid" @@ -95,3 +96,26 @@ func convertAtTimesToDateTime(atTimes AtTimes, location *time.Location) ([]time. }) return atTimesDate, nil } + +type waitGroupWithMutex struct { + wg sync.WaitGroup + mu sync.Mutex +} + +func (w *waitGroupWithMutex) Add(delta int) { + w.mu.Lock() + defer w.mu.Unlock() + w.wg.Add(delta) +} + +func (w *waitGroupWithMutex) Done() { + //w.mu.Lock() + //defer w.mu.Unlock() + w.wg.Done() +} + +func (w *waitGroupWithMutex) Wait() { + w.mu.Lock() + defer w.mu.Unlock() + w.wg.Wait() +}