Skip to content

Commit

Permalink
add timeout to stop in scheduler, split job out channels
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnRoesler committed Oct 10, 2023
1 parent 069e164 commit 746b38c
Showing 1 changed file with 16 additions and 5 deletions.
21 changes: 16 additions & 5 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,13 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
ctx, cancel := context.WithCancel(context.Background())
execCtx, execCancel := context.WithCancel(context.Background())

jobOutRequestChan := make(chan jobOutRequest)

exec := executor{
ctx: execCtx,
cancel: execCancel,
schCtx: ctx,
jobsIDsIn: make(chan uuid.UUID),
jobIDsOut: make(chan uuid.UUID),
jobOutRequest: jobOutRequestChan,
jobOutRequest: make(chan jobOutRequest),
shutdownTimeout: time.Second * 10,
done: make(chan error),
singletonRunners: make(map[uuid.UUID]singletonRunner),
Expand All @@ -81,7 +79,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
removeJobs: make(chan uuid.UUID),
removeJobsByTags: make(chan []string),
start: make(chan struct{}),
jobOutRequest: jobOutRequestChan,
jobOutRequest: make(chan jobOutRequest),
allJobsOutRequest: make(chan allJobsOutRequest),
location: time.Local,
clock: clockwork.NewRealClock(),
Expand Down Expand Up @@ -142,6 +140,14 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
}
}

case out := <-s.exec.jobOutRequest:
if j, ok := s.jobs[out.id]; ok {
out.outChan <- j
close(out.outChan)
} else {
close(out.outChan)
}

case out := <-s.jobOutRequest:
if j, ok := s.jobs[out.id]; ok {
out.outChan <- j
Expand Down Expand Up @@ -316,7 +322,12 @@ func (s *scheduler) Start() {

func (s *scheduler) Stop() error {
s.cancel()
return <-s.exec.done
select {
case err := <-s.exec.done:
return err
case <-time.After(s.exec.shutdownTimeout + time.Second):
return ErrStopTimedOut
}
}

func (s *scheduler) Update(id uuid.UUID, jobDefinition JobDefinition) (Job, error) {
Expand Down

0 comments on commit 746b38c

Please sign in to comment.