From 746b38cf19698c4be695d331a25c9b24259f8fa4 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Tue, 10 Oct 2023 13:48:57 -0500 Subject: [PATCH] add timeout to stop in scheduler, split job out channels --- scheduler.go | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/scheduler.go b/scheduler.go index 3468155b..473f0536 100644 --- a/scheduler.go +++ b/scheduler.go @@ -58,15 +58,13 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { ctx, cancel := context.WithCancel(context.Background()) execCtx, execCancel := context.WithCancel(context.Background()) - jobOutRequestChan := make(chan jobOutRequest) - exec := executor{ ctx: execCtx, cancel: execCancel, schCtx: ctx, jobsIDsIn: make(chan uuid.UUID), jobIDsOut: make(chan uuid.UUID), - jobOutRequest: jobOutRequestChan, + jobOutRequest: make(chan jobOutRequest), shutdownTimeout: time.Second * 10, done: make(chan error), singletonRunners: make(map[uuid.UUID]singletonRunner), @@ -81,7 +79,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { removeJobs: make(chan uuid.UUID), removeJobsByTags: make(chan []string), start: make(chan struct{}), - jobOutRequest: jobOutRequestChan, + jobOutRequest: make(chan jobOutRequest), allJobsOutRequest: make(chan allJobsOutRequest), location: time.Local, clock: clockwork.NewRealClock(), @@ -142,6 +140,14 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { } } + case out := <-s.exec.jobOutRequest: + if j, ok := s.jobs[out.id]; ok { + out.outChan <- j + close(out.outChan) + } else { + close(out.outChan) + } + case out := <-s.jobOutRequest: if j, ok := s.jobs[out.id]; ok { out.outChan <- j @@ -316,7 +322,12 @@ func (s *scheduler) Start() { func (s *scheduler) Stop() error { s.cancel() - return <-s.exec.done + select { + case err := <-s.exec.done: + return err + case <-time.After(s.exec.shutdownTimeout + time.Second): + return ErrStopTimedOut + } } func (s *scheduler) Update(id uuid.UUID, jobDefinition JobDefinition) (Job, error) {