diff --git a/examples/queue/file_system.go b/examples/queue/file_system.go index 39beda1..e651e97 100644 --- a/examples/queue/file_system.go +++ b/examples/queue/file_system.go @@ -39,7 +39,7 @@ func main() { jobQueue := newJobQueue() scheduler := quartz.NewStdSchedulerWithOptions(quartz.StdSchedulerOptions{ OutdatedThreshold: time.Second, // considering file system I/O latency - }, jobQueue) + }, jobQueue, nil) scheduler.Start(ctx) jobQueueSize, err := jobQueue.Size() diff --git a/quartz/scheduler.go b/quartz/scheduler.go index 497f199..41f38a6 100644 --- a/quartz/scheduler.go +++ b/quartz/scheduler.go @@ -72,6 +72,7 @@ type StdScheduler struct { mtx sync.Mutex wg sync.WaitGroup queue JobQueue + queueMtx sync.Locker interrupt chan struct{} cancel context.CancelFunc feeder chan ScheduledJob @@ -125,30 +126,37 @@ type StdSchedulerOptions struct { // Verify StdScheduler satisfies the Scheduler interface. var _ Scheduler = (*StdScheduler)(nil) -// NewStdScheduler returns a new StdScheduler with the default -// configuration. +// NewStdScheduler returns a new StdScheduler with the default configuration. func NewStdScheduler() Scheduler { return NewStdSchedulerWithOptions(StdSchedulerOptions{ OutdatedThreshold: 100 * time.Millisecond, RetryInterval: 100 * time.Millisecond, - }, nil) + }, nil, nil) } -// NewStdSchedulerWithOptions returns a new StdScheduler configured -// as specified. -// A custom JobQueue implementation can be provided to manage scheduled -// jobs. This can be useful when distributed mode is required, so that -// jobs can be stored in persistent storage. -// Pass in nil to use the internal in-memory implementation. +// NewStdSchedulerWithOptions returns a new StdScheduler configured as specified. +// +// A custom [JobQueue] implementation may be provided to manage scheduled jobs. +// This is useful when distributed mode is required, so that jobs can be stored +// in persistent storage. Pass in nil to use the internal in-memory implementation. +// +// A custom [sync.Locker] may also be provided to ensure that scheduler operations +// on the job queue are atomic when used in distributed mode. Pass in nil to use +// the default [sync.Mutex]. func NewStdSchedulerWithOptions( opts StdSchedulerOptions, jobQueue JobQueue, + jobQueueMtx sync.Locker, ) *StdScheduler { if jobQueue == nil { jobQueue = newJobQueue() } + if jobQueueMtx == nil { + jobQueueMtx = &sync.Mutex{} + } return &StdScheduler{ queue: jobQueue, + queueMtx: jobQueueMtx, interrupt: make(chan struct{}, 1), feeder: make(chan ScheduledJob), dispatch: make(chan ScheduledJob), @@ -161,8 +169,8 @@ func (sched *StdScheduler) ScheduleJob( jobDetail *JobDetail, trigger Trigger, ) error { - sched.mtx.Lock() - defer sched.mtx.Unlock() + sched.queueMtx.Lock() + defer sched.queueMtx.Unlock() if jobDetail == nil { return illegalArgumentError("jobDetail is nil") @@ -244,8 +252,8 @@ func (sched *StdScheduler) IsStarted() bool { // For a job key to be returned, the job must satisfy all of the matchers specified. // Given no matchers, it returns the keys of all scheduled jobs. func (sched *StdScheduler) GetJobKeys(matchers ...Matcher[ScheduledJob]) ([]*JobKey, error) { - sched.mtx.Lock() - defer sched.mtx.Unlock() + sched.queueMtx.Lock() + defer sched.queueMtx.Unlock() scheduledJobs, err := sched.queue.ScheduledJobs(matchers) if err != nil { @@ -260,8 +268,8 @@ func (sched *StdScheduler) GetJobKeys(matchers ...Matcher[ScheduledJob]) ([]*Job // GetScheduledJob returns the ScheduledJob with the specified key. func (sched *StdScheduler) GetScheduledJob(jobKey *JobKey) (ScheduledJob, error) { - sched.mtx.Lock() - defer sched.mtx.Unlock() + sched.queueMtx.Lock() + defer sched.queueMtx.Unlock() if jobKey == nil { return nil, illegalArgumentError("jobKey is nil") @@ -271,8 +279,8 @@ func (sched *StdScheduler) GetScheduledJob(jobKey *JobKey) (ScheduledJob, error) // DeleteJob removes the Job with the specified key if present. func (sched *StdScheduler) DeleteJob(jobKey *JobKey) error { - sched.mtx.Lock() - defer sched.mtx.Unlock() + sched.queueMtx.Lock() + defer sched.queueMtx.Unlock() if jobKey == nil { return illegalArgumentError("jobKey is nil") @@ -290,8 +298,8 @@ func (sched *StdScheduler) DeleteJob(jobKey *JobKey) error { // PauseJob suspends the job with the specified key from being // executed by the scheduler. func (sched *StdScheduler) PauseJob(jobKey *JobKey) error { - sched.mtx.Lock() - defer sched.mtx.Unlock() + sched.queueMtx.Lock() + defer sched.queueMtx.Unlock() if jobKey == nil { return illegalArgumentError("jobKey is nil") @@ -324,8 +332,8 @@ func (sched *StdScheduler) PauseJob(jobKey *JobKey) error { // ResumeJob restarts the suspended job with the specified key. func (sched *StdScheduler) ResumeJob(jobKey *JobKey) error { - sched.mtx.Lock() - defer sched.mtx.Unlock() + sched.queueMtx.Lock() + defer sched.queueMtx.Unlock() if jobKey == nil { return illegalArgumentError("jobKey is nil") @@ -362,8 +370,8 @@ func (sched *StdScheduler) ResumeJob(jobKey *JobKey) error { // Clear removes all of the scheduled jobs. func (sched *StdScheduler) Clear() error { - sched.mtx.Lock() - defer sched.mtx.Unlock() + sched.queueMtx.Lock() + defer sched.queueMtx.Unlock() // reset the job queue err := sched.queue.Clear() @@ -545,8 +553,8 @@ func (sched *StdScheduler) validateJob(job ScheduledJob) (bool, func() (int64, e } func (sched *StdScheduler) fetchAndReschedule() (ScheduledJob, bool) { - sched.mtx.Lock() - defer sched.mtx.Unlock() + sched.queueMtx.Lock() + defer sched.queueMtx.Unlock() // fetch a job for processing job, err := sched.queue.Pop() diff --git a/quartz/scheduler_test.go b/quartz/scheduler_test.go index 04dc809..01372fa 100644 --- a/quartz/scheduler_test.go +++ b/quartz/scheduler_test.go @@ -110,7 +110,7 @@ func TestScheduler_BlockingSemantics(t *testing.T) { opts.OutdatedThreshold = 10 * time.Millisecond - sched := quartz.NewStdSchedulerWithOptions(opts, nil) + sched := quartz.NewStdSchedulerWithOptions(opts, nil, nil) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() sched.Start(ctx) @@ -394,7 +394,7 @@ func TestScheduler_MisfiredJob(t *testing.T) { OutdatedThreshold: time.Millisecond, RetryInterval: time.Millisecond, MisfiredChan: misfiredChan, - }, nil) + }, nil, nil) jobDetail := quartz.NewJobDetail(funcJob, quartz.NewJobKey("funcJob")) err := sched.ScheduleJob(jobDetail, quartz.NewSimpleTrigger(2*time.Millisecond))