diff --git a/scheduler.go b/scheduler.go index 3468155b..473f0536 100644 --- a/scheduler.go +++ b/scheduler.go @@ -58,15 +58,13 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { ctx, cancel := context.WithCancel(context.Background()) execCtx, execCancel := context.WithCancel(context.Background()) - jobOutRequestChan := make(chan jobOutRequest) - exec := executor{ ctx: execCtx, cancel: execCancel, schCtx: ctx, jobsIDsIn: make(chan uuid.UUID), jobIDsOut: make(chan uuid.UUID), - jobOutRequest: jobOutRequestChan, + jobOutRequest: make(chan jobOutRequest), shutdownTimeout: time.Second * 10, done: make(chan error), singletonRunners: make(map[uuid.UUID]singletonRunner), @@ -81,7 +79,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { removeJobs: make(chan uuid.UUID), removeJobsByTags: make(chan []string), start: make(chan struct{}), - jobOutRequest: jobOutRequestChan, + jobOutRequest: make(chan jobOutRequest), allJobsOutRequest: make(chan allJobsOutRequest), location: time.Local, clock: clockwork.NewRealClock(), @@ -142,6 +140,14 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { } } + case out := <-s.exec.jobOutRequest: + if j, ok := s.jobs[out.id]; ok { + out.outChan <- j + close(out.outChan) + } else { + close(out.outChan) + } + case out := <-s.jobOutRequest: if j, ok := s.jobs[out.id]; ok { out.outChan <- j @@ -316,7 +322,12 @@ func (s *scheduler) Start() { func (s *scheduler) Stop() error { s.cancel() - return <-s.exec.done + select { + case err := <-s.exec.done: + return err + case <-time.After(s.exec.shutdownTimeout + time.Second): + return ErrStopTimedOut + } } func (s *scheduler) Update(id uuid.UUID, jobDefinition JobDefinition) (Job, error) {