Skip to content

Commit

Permalink
working on resolving shutdown races
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnRoesler committed Nov 3, 2023
1 parent ebdbe3c commit c2b6aaf
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 68 deletions.
115 changes: 62 additions & 53 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
type executor struct {
ctx context.Context
cancel context.CancelFunc
stopCh chan struct{}
jobsIDsIn chan uuid.UUID
jobIDsOut chan uuid.UUID
jobOutRequest chan jobOutRequest
Expand All @@ -37,85 +38,93 @@ type limitMode struct {
done chan struct{}
}

func (e *executor) start() {
wg := sync.WaitGroup{}
func (e *executor) start(shutdownCtx context.Context) {
e.ctx, e.cancel = context.WithCancel(shutdownCtx)
wg := waitGroupWithMutex{
wg: sync.WaitGroup{},
mu: sync.Mutex{},
}
for {
select {
case id := <-e.jobsIDsIn:
if e.limitMode != nil {
if !e.limitMode.started {
for i := e.limitMode.limit; i > 0; i-- {
go e.limitModeRunner(e.limitMode.in, e.limitMode.done)
}
}
if e.limitMode.mode == LimitModeReschedule {
select {
case e.limitMode.rescheduleLimiter <- struct{}{}:
e.limitMode.in <- id
default:
// all runners are busy, reschedule the work for later
// which means we just skip it here and do nothing
// TODO when metrics are added, this should increment a rescheduled metric
e.jobIDsOut <- id
}
} else {
// TODO when metrics are added, this should increment a wait metric
e.limitMode.in <- id
}
} else {
j := requestJobCtx(e.ctx, id, e.jobOutRequest)
if j == nil {
continue
}
if j.singletonMode {
runner, ok := e.singletonRunners[id]
if !ok {
runner.in = make(chan uuid.UUID, 1000)
runner.done = make(chan struct{})
runner.mode = j.singletonLimitMode
if runner.mode == LimitModeReschedule {
runner.rescheduleLimiter = make(chan struct{}, 1)
go func() {
if e.limitMode != nil {
if !e.limitMode.started {
for i := e.limitMode.limit; i > 0; i-- {
go e.limitModeRunner(e.limitMode.in, e.limitMode.done)
}
e.singletonRunners[id] = runner
go e.singletonRunner(runner)
}

if runner.mode == LimitModeReschedule {
if e.limitMode.mode == LimitModeReschedule {
select {
case runner.rescheduleLimiter <- struct{}{}:
runner.in <- id
case e.limitMode.rescheduleLimiter <- struct{}{}:
e.limitMode.in <- id
default:
// runner is busy, reschedule the work for later
// all runners are busy, reschedule the work for later
// which means we just skip it here and do nothing
// TODO when metrics are added, this should increment a rescheduled metric
e.jobIDsOut <- id
}
} else {
runner.in <- id
// TODO when metrics are added, this should increment a wait metric
e.limitMode.in <- id
}
} else {
wg.Add(1)
go func(j internalJob) {
e.runJob(j)
wg.Done()
}(*j)
}
}
j := requestJobCtx(e.ctx, id, e.jobOutRequest)
if j == nil {
return
}
if j.singletonMode {
runner, ok := e.singletonRunners[id]
if !ok {
runner.in = make(chan uuid.UUID, 1000)
runner.done = make(chan struct{})
runner.mode = j.singletonLimitMode
if runner.mode == LimitModeReschedule {
runner.rescheduleLimiter = make(chan struct{}, 1)
}
e.singletonRunners[id] = runner
go e.singletonRunner(runner)
}

case <-e.ctx.Done():
if runner.mode == LimitModeReschedule {
select {
case runner.rescheduleLimiter <- struct{}{}:
runner.in <- id
default:
// runner is busy, reschedule the work for later
// which means we just skip it here and do nothing
// TODO when metrics are added, this should increment a rescheduled metric
e.jobIDsOut <- id
}
} else {
runner.in <- id
}
} else {
wg.Add(1)
go func(j internalJob) {
e.runJob(j)
wg.Done()
}(*j)
}
}
}()
case <-e.stopCh:
e.cancel()
waitForJobs := make(chan struct{})
waitForSingletons := make(chan struct{})

waiterCtx, waiterCancel := context.WithCancel(context.Background())

go func() {
wg.Wait()
go func() {
wg.Wait()
waitForJobs <- struct{}{}
}()
select {
case <-waiterCtx.Done():
return
default:
}
waitForJobs <- struct{}{}
}()
go func() {
For:
Expand Down
5 changes: 1 addition & 4 deletions job_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package gocron

import (
"log"
"math/rand"
"testing"
"time"
Expand Down Expand Up @@ -323,9 +322,7 @@ func TestJob_LastRun(t *testing.T) {
time.Second,
),
NewTask(
func() {
log.Println("job ran")
},
func() {},
),
WithStartAt(WithStartImmediately()),
)
Expand Down
22 changes: 11 additions & 11 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Scheduler interface {

type scheduler struct {
shutdownCtx context.Context
cancel context.CancelFunc
shutdownCancel context.CancelFunc
exec executor
jobs map[uuid.UUID]internalJob
location *time.Location
Expand Down Expand Up @@ -63,6 +63,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
schCtx, cancel := context.WithCancel(context.Background())

exec := executor{
stopCh: make(chan struct{}),
stopTimeout: time.Second * 10,
singletonRunners: make(map[uuid.UUID]singletonRunner),

Expand All @@ -73,12 +74,12 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
}

s := &scheduler{
shutdownCtx: schCtx,
cancel: cancel,
exec: exec,
jobs: make(map[uuid.UUID]internalJob),
location: time.Local,
clock: clockwork.NewRealClock(),
shutdownCtx: schCtx,
shutdownCancel: cancel,
exec: exec,
jobs: make(map[uuid.UUID]internalJob),
location: time.Local,
clock: clockwork.NewRealClock(),

newJobCh: make(chan internalJob),
removeJobCh: make(chan uuid.UUID),
Expand Down Expand Up @@ -150,7 +151,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
// about jobs.

func (s *scheduler) stopScheduler() {
s.exec.cancel()
s.exec.stopCh <- struct{}{}
s.started = false
for _, j := range s.jobs {
j.stop()
Expand Down Expand Up @@ -261,9 +262,8 @@ func (s *scheduler) selectRemoveJobsByTags(tags []string) {
}

func (s *scheduler) selectStart() {
s.exec.ctx, s.exec.cancel = context.WithCancel(s.shutdownCtx)

go s.exec.start()
go s.exec.start(s.shutdownCtx)

s.started = true
for id, j := range s.jobs {
Expand Down Expand Up @@ -417,7 +417,7 @@ func (s *scheduler) StopJobs() error {
// the Scheduler or Job's as the Scheduler cannot
// be restarted after calling Shutdown.
func (s *scheduler) Shutdown() error {
s.cancel()
s.shutdownCancel()
select {
case err := <-s.exec.done:
return err
Expand Down
24 changes: 24 additions & 0 deletions util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gocron
import (
"context"
"reflect"
"sync"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -95,3 +96,26 @@ func convertAtTimesToDateTime(atTimes AtTimes, location *time.Location) ([]time.
})
return atTimesDate, nil
}

type waitGroupWithMutex struct {
wg sync.WaitGroup
mu sync.Mutex
}

func (w *waitGroupWithMutex) Add(delta int) {
w.mu.Lock()
defer w.mu.Unlock()
w.wg.Add(delta)
}

func (w *waitGroupWithMutex) Done() {
//w.mu.Lock()
//defer w.mu.Unlock()
w.wg.Done()
}

func (w *waitGroupWithMutex) Wait() {
w.mu.Lock()
defer w.mu.Unlock()
w.wg.Wait()
}

0 comments on commit c2b6aaf

Please sign in to comment.