Skip to content

Commit

Permalink
shutdown timeout, job context
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnRoesler committed Sep 23, 2023
1 parent d30e83a commit d338803
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 65 deletions.
16 changes: 9 additions & 7 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@ import (
"context"
"log"
"sync"
"time"

"github.com/google/uuid"
)

type executor struct {
ctx context.Context
cancel context.CancelFunc
schCtx context.Context
jobsIDsIn chan uuid.UUID
jobIDsOut chan uuid.UUID
jobOutRequest chan jobOutRequest
ctx context.Context
cancel context.CancelFunc
schCtx context.Context
jobsIDsIn chan uuid.UUID
jobIDsOut chan uuid.UUID
jobOutRequest chan jobOutRequest
shutdownTimeout time.Duration
}

func (e *executor) start() {
Expand All @@ -34,7 +36,7 @@ func (e *executor) start() {
}()

case <-e.schCtx.Done():
wg.Wait()
waitTimeout(&wg, e.shutdownTimeout)
e.cancel()
return
}
Expand Down
36 changes: 18 additions & 18 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,6 @@ import (
"github.com/robfig/cron/v3"
)

var _ Job = (*job)(nil)

type Job interface {
}

type job struct {
ctx context.Context
cancel context.CancelFunc
Expand All @@ -41,7 +36,7 @@ type Task struct {

type JobDefinition interface {
options() []JobOption
setup(job, *time.Location) (job, error)
setup(*job, *time.Location) error
task() Task
}

Expand All @@ -62,7 +57,7 @@ func (c cronJobDefinition) task() Task {
return c.tas
}

func (c cronJobDefinition) setup(j job, location *time.Location) (job, error) {
func (c cronJobDefinition) setup(j *job, location *time.Location) error {
var withLocation string
if strings.HasPrefix(c.crontab, "TZ=") || strings.HasPrefix(c.crontab, "CRON_TZ=") {
withLocation = c.crontab
Expand All @@ -84,11 +79,11 @@ func (c cronJobDefinition) setup(j job, location *time.Location) (job, error) {
cronSchedule, err = cron.ParseStandard(withLocation)
}
if err != nil {
return j, fmt.Errorf("gocron: crontab pare failure: %w", err)
return fmt.Errorf("gocron: crontab pare failure: %w", err)
}

j.jobSchedule = &cronJob{cronSchedule: cronSchedule}
return j, nil
return nil
}

func CronJob(crontab string, withSeconds bool, task Task, options ...JobOption) JobDefinition {
Expand All @@ -112,14 +107,14 @@ func (d durationJobDefinition) options() []JobOption {
return d.opts
}

func (d durationJobDefinition) setup(j job, location *time.Location) (job, error) {
func (d durationJobDefinition) setup(j *job, location *time.Location) error {
dur, err := time.ParseDuration(d.duration)
if err != nil {
return j, fmt.Errorf("gocron: failed to parse duration: %w", err)
return fmt.Errorf("gocron: failed to parse duration: %w", err)
}

j.jobSchedule = &durationJob{duration: dur}
return j, nil
return nil
}

func (d durationJobDefinition) task() Task {
Expand Down Expand Up @@ -182,8 +177,13 @@ func SingletonMode() JobOption {
}
}

func WithContext(ctx context.Context) JobOption {
func WithContext(ctx context.Context, cancel context.CancelFunc) JobOption {
return func(j *job) error {
if ctx == nil || cancel == nil {
return fmt.Errorf("gocron: context and cancel cannot be nil")
}
j.ctx = ctx
j.cancel = cancel
return nil
}
}
Expand Down Expand Up @@ -212,28 +212,28 @@ func WithTags(tags ...string) JobOption {
// -----------------------------------------------
// -----------------------------------------------

type EventListener func(Job) error
type EventListener func(*job) error

func AfterJobRuns(eventListenerFunc func()) EventListener {
return func(j Job) error {
return func(j *job) error {
return nil
}
}

func BeforeJobRuns(eventListenerFunc func()) EventListener {
return func(j Job) error {
return func(j *job) error {
return nil
}
}

func WhenJobReturnsError(eventListenerFunc func()) EventListener {
return func(j Job) error {
return func(j *job) error {
return nil
}
}

func WhenJobReturnsNoError(eventListenerFunc func()) EventListener {
return func(j Job) error {
return func(j *job) error {
return nil
}
}
Expand Down
29 changes: 22 additions & 7 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ func WithLocation(location *time.Location) SchedulerOption {
}
}

func WithShutdownTimeout(timeout time.Duration) SchedulerOption {
return func(s *scheduler) error {
if timeout == 0 {
return fmt.Errorf("gocron: shutdown timeout cannot be zero")
}
s.exec.shutdownTimeout = timeout
return nil
}
}

// -----------------------------------------------
// -----------------------------------------------
// ----------------- Scheduler -------------------
Expand Down Expand Up @@ -69,12 +79,13 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
jobOutRequestChan := make(chan jobOutRequest)

exec := executor{
ctx: execCtx,
cancel: execCancel,
schCtx: ctx,
jobsIDsIn: make(chan uuid.UUID),
jobIDsOut: make(chan uuid.UUID),
jobOutRequest: jobOutRequestChan,
ctx: execCtx,
cancel: execCancel,
schCtx: ctx,
jobsIDsIn: make(chan uuid.UUID),
jobIDsOut: make(chan uuid.UUID),
jobOutRequest: jobOutRequestChan,
shutdownTimeout: time.Second * 10,
}

s := &scheduler{
Expand Down Expand Up @@ -141,6 +152,9 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
s.jobs[id] = j
}
case <-s.ctx.Done():
for _, j := range s.jobs {
j.cancel()
}
return
}
}
Expand All @@ -153,6 +167,7 @@ func (s *scheduler) NewJob(definition JobDefinition) (uuid.UUID, error) {
j := job{
id: uuid.New(),
}
j.ctx, j.cancel = context.WithCancel(context.Background())

task := definition.task()
taskFunc := reflect.ValueOf(task.Function)
Expand All @@ -174,7 +189,7 @@ func (s *scheduler) NewJob(definition JobDefinition) (uuid.UUID, error) {
}
}

j, err := definition.setup(j, s.location)
err := definition.setup(&j, s.location)
if err != nil {
return uuid.Nil, err
}
Expand Down
33 changes: 0 additions & 33 deletions time_helper.go

This file was deleted.

16 changes: 16 additions & 0 deletions util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package gocron

import (
"reflect"
"sync"
"time"

"github.com/google/uuid"
)
Expand All @@ -10,6 +12,20 @@ func strPtr(in string) *string {
return &in
}

func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) {
c := make(chan struct{})
go func() {
defer close(c)
wg.Wait()
}()
select {
case <-c:
return
case <-time.After(timeout):
return
}
}

//func callJobFunc(jobFunc interface{}) {
// if jobFunc == nil {
// return
Expand Down

0 comments on commit d338803

Please sign in to comment.