From d33880332ee15f514ad4f405669416183bb60b12 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Sat, 23 Sep 2023 14:11:23 -0500 Subject: [PATCH] shutdown timeout, job context --- executor.go | 16 +++++++++------- job.go | 36 ++++++++++++++++++------------------ scheduler.go | 29 ++++++++++++++++++++++------- time_helper.go | 33 --------------------------------- util.go | 16 ++++++++++++++++ 5 files changed, 65 insertions(+), 65 deletions(-) delete mode 100644 time_helper.go diff --git a/executor.go b/executor.go index eb9e1452..71c3ea34 100644 --- a/executor.go +++ b/executor.go @@ -4,17 +4,19 @@ import ( "context" "log" "sync" + "time" "github.com/google/uuid" ) type executor struct { - ctx context.Context - cancel context.CancelFunc - schCtx context.Context - jobsIDsIn chan uuid.UUID - jobIDsOut chan uuid.UUID - jobOutRequest chan jobOutRequest + ctx context.Context + cancel context.CancelFunc + schCtx context.Context + jobsIDsIn chan uuid.UUID + jobIDsOut chan uuid.UUID + jobOutRequest chan jobOutRequest + shutdownTimeout time.Duration } func (e *executor) start() { @@ -34,7 +36,7 @@ func (e *executor) start() { }() case <-e.schCtx.Done(): - wg.Wait() + waitTimeout(&wg, e.shutdownTimeout) e.cancel() return } diff --git a/job.go b/job.go index 193c6eb3..5740d07d 100644 --- a/job.go +++ b/job.go @@ -12,11 +12,6 @@ import ( "github.com/robfig/cron/v3" ) -var _ Job = (*job)(nil) - -type Job interface { -} - type job struct { ctx context.Context cancel context.CancelFunc @@ -41,7 +36,7 @@ type Task struct { type JobDefinition interface { options() []JobOption - setup(job, *time.Location) (job, error) + setup(*job, *time.Location) error task() Task } @@ -62,7 +57,7 @@ func (c cronJobDefinition) task() Task { return c.tas } -func (c cronJobDefinition) setup(j job, location *time.Location) (job, error) { +func (c cronJobDefinition) setup(j *job, location *time.Location) error { var withLocation string if strings.HasPrefix(c.crontab, "TZ=") || strings.HasPrefix(c.crontab, "CRON_TZ=") { withLocation = c.crontab @@ -84,11 +79,11 @@ func (c cronJobDefinition) setup(j job, location *time.Location) (job, error) { cronSchedule, err = cron.ParseStandard(withLocation) } if err != nil { - return j, fmt.Errorf("gocron: crontab pare failure: %w", err) + return fmt.Errorf("gocron: crontab pare failure: %w", err) } j.jobSchedule = &cronJob{cronSchedule: cronSchedule} - return j, nil + return nil } func CronJob(crontab string, withSeconds bool, task Task, options ...JobOption) JobDefinition { @@ -112,14 +107,14 @@ func (d durationJobDefinition) options() []JobOption { return d.opts } -func (d durationJobDefinition) setup(j job, location *time.Location) (job, error) { +func (d durationJobDefinition) setup(j *job, location *time.Location) error { dur, err := time.ParseDuration(d.duration) if err != nil { - return j, fmt.Errorf("gocron: failed to parse duration: %w", err) + return fmt.Errorf("gocron: failed to parse duration: %w", err) } j.jobSchedule = &durationJob{duration: dur} - return j, nil + return nil } func (d durationJobDefinition) task() Task { @@ -182,8 +177,13 @@ func SingletonMode() JobOption { } } -func WithContext(ctx context.Context) JobOption { +func WithContext(ctx context.Context, cancel context.CancelFunc) JobOption { return func(j *job) error { + if ctx == nil || cancel == nil { + return fmt.Errorf("gocron: context and cancel cannot be nil") + } + j.ctx = ctx + j.cancel = cancel return nil } } @@ -212,28 +212,28 @@ func WithTags(tags ...string) JobOption { // ----------------------------------------------- // ----------------------------------------------- -type EventListener func(Job) error +type EventListener func(*job) error func AfterJobRuns(eventListenerFunc func()) EventListener { - return func(j Job) error { + return func(j *job) error { return nil } } func BeforeJobRuns(eventListenerFunc func()) EventListener { - return func(j Job) error { + return func(j *job) error { return nil } } func WhenJobReturnsError(eventListenerFunc func()) EventListener { - return func(j Job) error { + return func(j *job) error { return nil } } func WhenJobReturnsNoError(eventListenerFunc func()) EventListener { - return func(j Job) error { + return func(j *job) error { return nil } } diff --git a/scheduler.go b/scheduler.go index b0ed8ed9..024cde45 100644 --- a/scheduler.go +++ b/scheduler.go @@ -37,6 +37,16 @@ func WithLocation(location *time.Location) SchedulerOption { } } +func WithShutdownTimeout(timeout time.Duration) SchedulerOption { + return func(s *scheduler) error { + if timeout == 0 { + return fmt.Errorf("gocron: shutdown timeout cannot be zero") + } + s.exec.shutdownTimeout = timeout + return nil + } +} + // ----------------------------------------------- // ----------------------------------------------- // ----------------- Scheduler ------------------- @@ -69,12 +79,13 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { jobOutRequestChan := make(chan jobOutRequest) exec := executor{ - ctx: execCtx, - cancel: execCancel, - schCtx: ctx, - jobsIDsIn: make(chan uuid.UUID), - jobIDsOut: make(chan uuid.UUID), - jobOutRequest: jobOutRequestChan, + ctx: execCtx, + cancel: execCancel, + schCtx: ctx, + jobsIDsIn: make(chan uuid.UUID), + jobIDsOut: make(chan uuid.UUID), + jobOutRequest: jobOutRequestChan, + shutdownTimeout: time.Second * 10, } s := &scheduler{ @@ -141,6 +152,9 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { s.jobs[id] = j } case <-s.ctx.Done(): + for _, j := range s.jobs { + j.cancel() + } return } } @@ -153,6 +167,7 @@ func (s *scheduler) NewJob(definition JobDefinition) (uuid.UUID, error) { j := job{ id: uuid.New(), } + j.ctx, j.cancel = context.WithCancel(context.Background()) task := definition.task() taskFunc := reflect.ValueOf(task.Function) @@ -174,7 +189,7 @@ func (s *scheduler) NewJob(definition JobDefinition) (uuid.UUID, error) { } } - j, err := definition.setup(j, s.location) + err := definition.setup(&j, s.location) if err != nil { return uuid.Nil, err } diff --git a/time_helper.go b/time_helper.go deleted file mode 100644 index 487a7a2a..00000000 --- a/time_helper.go +++ /dev/null @@ -1,33 +0,0 @@ -package gocron - -import "time" - -var _ TimeWrapper = (*trueTime)(nil) - -// TimeWrapper is an interface that wraps the Now, Sleep, and Unix methods of the time package. -// This allows the library and users to mock the time package for testing. -type TimeWrapper interface { - Now(*time.Location) time.Time - Unix(int64, int64) time.Time - Sleep(time.Duration) -} - -type trueTime struct{} - -func (t *trueTime) Now(location *time.Location) time.Time { - return time.Now().In(location) -} - -func (t *trueTime) Unix(sec int64, nsec int64) time.Time { - return time.Unix(sec, nsec) -} - -func (t *trueTime) Sleep(d time.Duration) { - time.Sleep(d) -} - -// afterFunc proxies the time.AfterFunc function. -// This allows it to be mocked for testing. -func afterFunc(d time.Duration, f func()) *time.Timer { - return time.AfterFunc(d, f) -} diff --git a/util.go b/util.go index 4169e212..85b72582 100644 --- a/util.go +++ b/util.go @@ -2,6 +2,8 @@ package gocron import ( "reflect" + "sync" + "time" "github.com/google/uuid" ) @@ -10,6 +12,20 @@ func strPtr(in string) *string { return &in } +func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) { + c := make(chan struct{}) + go func() { + defer close(c) + wg.Wait() + }() + select { + case <-c: + return + case <-time.After(timeout): + return + } +} + //func callJobFunc(jobFunc interface{}) { // if jobFunc == nil { // return