Skip to content

Commit

Permalink
remove by tags, update, remove job
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnRoesler committed Oct 3, 2023
1 parent 03049be commit 80d620c
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 24 deletions.
3 changes: 2 additions & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import "fmt"
var (
ErrCronJobParse = fmt.Errorf("gocron: CronJob: crontab parse failure")
ErrDurationJobZero = fmt.Errorf("gocron: DurationJob: duration must be greater than 0")
ErrEventListenerFuncNil = fmt.Errorf("gocron: eventListenerFunc must not be nil")
ErrJobNotFound = fmt.Errorf("gocron: job not found")
ErrNewJobTask = fmt.Errorf("gocron: NewJob: Task.Function was not of kind reflect.Func")
ErrNewJobTask = fmt.Errorf("gocron: NewJob: Task.Function must be of kind reflect.Func")
ErrStopTimedOut = fmt.Errorf("gocron: timed out waiting for jobs to finish")
ErrWithContextNilContext = fmt.Errorf("gocron: WithContext: context must not be nil")
ErrWithContextNilCancel = fmt.Errorf("gocron: WithContext: cancel must not be nil")
Expand Down
12 changes: 12 additions & 0 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,18 +254,30 @@ type EventListener func(*job) error

func AfterJobRuns(eventListenerFunc func(jobID uuid.UUID)) EventListener {
return func(j *job) error {
if eventListenerFunc == nil {
return ErrEventListenerFuncNil
}
j.afterJobRuns = eventListenerFunc
return nil
}
}

func AfterJobRunsWithError(eventListenerFunc func(jobID uuid.UUID, err error)) EventListener {
return func(j *job) error {
if eventListenerFunc == nil {
return ErrEventListenerFuncNil
}
j.afterJobRunsWithError = eventListenerFunc
return nil
}
}

func BeforeJobRuns(eventListenerFunc func(jobID uuid.UUID)) EventListener {
return func(j *job) error {
if eventListenerFunc == nil {
return ErrEventListenerFuncNil
}
j.beforeJobRuns = eventListenerFunc
return nil
}
}
Expand Down
76 changes: 53 additions & 23 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var _ Scheduler = (*scheduler)(nil)

type Scheduler interface {
NewJob(JobDefinition) (Job, error)
RemoveByTags(...string) error
RemoveByTags(...string)
RemoveJob(uuid.UUID) error
Start()
Stop() error
Expand All @@ -31,9 +31,10 @@ type scheduler struct {
cancel context.CancelFunc
exec executor
jobs map[uuid.UUID]job
jobsOutRequest chan jobsOutRequest
jobOutRequest chan jobOutRequest
newJobs chan job
removeJobs chan uuid.UUID
jobOutRequest chan jobOutRequest
location *time.Location
clock clockwork.Clock
started bool
Expand All @@ -46,6 +47,10 @@ type jobOutRequest struct {
outChan chan job
}

type jobsOutRequest struct {
outChan chan map[uuid.UUID]job
}

func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
ctx, cancel := context.WithCancel(context.Background())
execCtx, execCancel := context.WithCancel(context.Background())
Expand All @@ -65,16 +70,17 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
}

s := &scheduler{
ctx: ctx,
cancel: cancel,
exec: exec,
jobs: make(map[uuid.UUID]job, 0),
newJobs: make(chan job),
removeJobs: make(chan uuid.UUID),
start: make(chan struct{}),
jobOutRequest: jobOutRequestChan,
location: time.Local,
clock: clockwork.NewRealClock(),
ctx: ctx,
cancel: cancel,
exec: exec,
jobs: make(map[uuid.UUID]job, 0),
newJobs: make(chan job),
removeJobs: make(chan uuid.UUID),
start: make(chan struct{}),
jobOutRequest: jobOutRequestChan,
jobsOutRequest: make(chan jobsOutRequest),
location: time.Local,
clock: clockwork.NewRealClock(),
}

for _, option := range options {
Expand Down Expand Up @@ -123,6 +129,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
}
j.stop()
delete(s.jobs, id)

case out := <-s.jobOutRequest:
if j, ok := s.jobs[out.id]; ok {
out.outChan <- j
Expand All @@ -131,6 +138,9 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
close(out.outChan)
}

case out := <-s.jobsOutRequest:
out.outChan <- s.jobs

case <-s.start:
s.started = true
for id, j := range s.jobs {
Expand All @@ -143,6 +153,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
})
s.jobs[id] = j
}

case <-s.ctx.Done():
for _, j := range s.jobs {
j.stop()
Expand All @@ -159,10 +170,19 @@ func (s *scheduler) now() time.Time {
return s.clock.Now().In(s.location)
}

func (s *scheduler) NewJob(definition JobDefinition) (Job, error) {
j := job{
id: uuid.New(),
func (s *scheduler) NewJob(jobDefinition JobDefinition) (Job, error) {
return s.addOrUpdateJob(uuid.Nil, jobDefinition)
}

func (s *scheduler) addOrUpdateJob(id uuid.UUID, definition JobDefinition) (Job, error) {
j := job{}
if id == uuid.Nil {
j.id = uuid.New()
} else {
s.removeJobs <- id
j.id = id
}

j.ctx, j.cancel = context.WithCancel(context.Background())

task := definition.task()
Expand Down Expand Up @@ -203,14 +223,25 @@ func (s *scheduler) NewJob(definition JobDefinition) (Job, error) {
}, nil
}

func (s *scheduler) RemoveByTags(s2 ...string) error {
//TODO implement me
panic("implement me")
func (s *scheduler) RemoveByTags(tags ...string) {
jr := jobsOutRequest{outChan: make(chan map[uuid.UUID]job)}
s.jobsOutRequest <- jr
jobs := <-jr.outChan

for _, j := range jobs {
if contains(j.tags, tags) {
s.removeJobs <- j.id
}
}
}

func (s *scheduler) RemoveJob(uuid2 uuid.UUID) error {
//TODO implement me
panic("implement me")
func (s *scheduler) RemoveJob(id uuid.UUID) error {
j := requestJob(id, s.jobOutRequest)
if j.id == uuid.Nil {
return ErrJobNotFound
}
s.removeJobs <- id
return nil
}

func (s *scheduler) Start() {
Expand All @@ -224,8 +255,7 @@ func (s *scheduler) Stop() error {
}

func (s *scheduler) Update(id uuid.UUID, jobDefinition JobDefinition) (Job, error) {
//TODO implement me
panic("implement me")
return s.addOrUpdateJob(id, jobDefinition)
}

// -----------------------------------------------
Expand Down
11 changes: 11 additions & 0 deletions util.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,14 @@ func requestJob(id uuid.UUID, ch chan jobOutRequest) job {
}
return j
}

func contains(m map[string]struct{}, sl []string) bool {
for x := range m {
for _, y := range sl {
if x == y {
return true
}
}
}
return false
}

0 comments on commit 80d620c

Please sign in to comment.