Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(scheduler)!: support custom mutex for job queue operations #133

Merged
merged 1 commit into from
May 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/queue/file_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func main() {
jobQueue := newJobQueue()
scheduler := quartz.NewStdSchedulerWithOptions(quartz.StdSchedulerOptions{
OutdatedThreshold: time.Second, // considering file system I/O latency
}, jobQueue)
}, jobQueue, nil)
scheduler.Start(ctx)

jobQueueSize, err := jobQueue.Size()
Expand Down
58 changes: 33 additions & 25 deletions quartz/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type StdScheduler struct {
mtx sync.Mutex
wg sync.WaitGroup
queue JobQueue
queueMtx sync.Locker
interrupt chan struct{}
cancel context.CancelFunc
feeder chan ScheduledJob
Expand Down Expand Up @@ -125,30 +126,37 @@ type StdSchedulerOptions struct {
// Verify StdScheduler satisfies the Scheduler interface.
var _ Scheduler = (*StdScheduler)(nil)

// NewStdScheduler returns a new StdScheduler with the default
// configuration.
// NewStdScheduler returns a new StdScheduler with the default configuration.
func NewStdScheduler() Scheduler {
return NewStdSchedulerWithOptions(StdSchedulerOptions{
OutdatedThreshold: 100 * time.Millisecond,
RetryInterval: 100 * time.Millisecond,
}, nil)
}, nil, nil)
}

// NewStdSchedulerWithOptions returns a new StdScheduler configured
// as specified.
// A custom JobQueue implementation can be provided to manage scheduled
// jobs. This can be useful when distributed mode is required, so that
// jobs can be stored in persistent storage.
// Pass in nil to use the internal in-memory implementation.
// NewStdSchedulerWithOptions returns a new StdScheduler configured as specified.
//
// A custom [JobQueue] implementation may be provided to manage scheduled jobs.
// This is useful when distributed mode is required, so that jobs can be stored
// in persistent storage. Pass in nil to use the internal in-memory implementation.
//
// A custom [sync.Locker] may also be provided to ensure that scheduler operations
// on the job queue are atomic when used in distributed mode. Pass in nil to use
// the default [sync.Mutex].
func NewStdSchedulerWithOptions(
opts StdSchedulerOptions,
jobQueue JobQueue,
jobQueueMtx sync.Locker,
) *StdScheduler {
if jobQueue == nil {
jobQueue = newJobQueue()
}
if jobQueueMtx == nil {
jobQueueMtx = &sync.Mutex{}
}
return &StdScheduler{
queue: jobQueue,
queueMtx: jobQueueMtx,
interrupt: make(chan struct{}, 1),
feeder: make(chan ScheduledJob),
dispatch: make(chan ScheduledJob),
Expand All @@ -161,8 +169,8 @@ func (sched *StdScheduler) ScheduleJob(
jobDetail *JobDetail,
trigger Trigger,
) error {
sched.mtx.Lock()
defer sched.mtx.Unlock()
sched.queueMtx.Lock()
defer sched.queueMtx.Unlock()

if jobDetail == nil {
return illegalArgumentError("jobDetail is nil")
Expand Down Expand Up @@ -244,8 +252,8 @@ func (sched *StdScheduler) IsStarted() bool {
// For a job key to be returned, the job must satisfy all of the matchers specified.
// Given no matchers, it returns the keys of all scheduled jobs.
func (sched *StdScheduler) GetJobKeys(matchers ...Matcher[ScheduledJob]) ([]*JobKey, error) {
sched.mtx.Lock()
defer sched.mtx.Unlock()
sched.queueMtx.Lock()
defer sched.queueMtx.Unlock()

scheduledJobs, err := sched.queue.ScheduledJobs(matchers)
if err != nil {
Expand All @@ -260,8 +268,8 @@ func (sched *StdScheduler) GetJobKeys(matchers ...Matcher[ScheduledJob]) ([]*Job

// GetScheduledJob returns the ScheduledJob with the specified key.
func (sched *StdScheduler) GetScheduledJob(jobKey *JobKey) (ScheduledJob, error) {
sched.mtx.Lock()
defer sched.mtx.Unlock()
sched.queueMtx.Lock()
defer sched.queueMtx.Unlock()

if jobKey == nil {
return nil, illegalArgumentError("jobKey is nil")
Expand All @@ -271,8 +279,8 @@ func (sched *StdScheduler) GetScheduledJob(jobKey *JobKey) (ScheduledJob, error)

// DeleteJob removes the Job with the specified key if present.
func (sched *StdScheduler) DeleteJob(jobKey *JobKey) error {
sched.mtx.Lock()
defer sched.mtx.Unlock()
sched.queueMtx.Lock()
defer sched.queueMtx.Unlock()

if jobKey == nil {
return illegalArgumentError("jobKey is nil")
Expand All @@ -290,8 +298,8 @@ func (sched *StdScheduler) DeleteJob(jobKey *JobKey) error {
// PauseJob suspends the job with the specified key from being
// executed by the scheduler.
func (sched *StdScheduler) PauseJob(jobKey *JobKey) error {
sched.mtx.Lock()
defer sched.mtx.Unlock()
sched.queueMtx.Lock()
defer sched.queueMtx.Unlock()

if jobKey == nil {
return illegalArgumentError("jobKey is nil")
Expand Down Expand Up @@ -324,8 +332,8 @@ func (sched *StdScheduler) PauseJob(jobKey *JobKey) error {

// ResumeJob restarts the suspended job with the specified key.
func (sched *StdScheduler) ResumeJob(jobKey *JobKey) error {
sched.mtx.Lock()
defer sched.mtx.Unlock()
sched.queueMtx.Lock()
defer sched.queueMtx.Unlock()

if jobKey == nil {
return illegalArgumentError("jobKey is nil")
Expand Down Expand Up @@ -362,8 +370,8 @@ func (sched *StdScheduler) ResumeJob(jobKey *JobKey) error {

// Clear removes all of the scheduled jobs.
func (sched *StdScheduler) Clear() error {
sched.mtx.Lock()
defer sched.mtx.Unlock()
sched.queueMtx.Lock()
defer sched.queueMtx.Unlock()

// reset the job queue
err := sched.queue.Clear()
Expand Down Expand Up @@ -545,8 +553,8 @@ func (sched *StdScheduler) validateJob(job ScheduledJob) (bool, func() (int64, e
}

func (sched *StdScheduler) fetchAndReschedule() (ScheduledJob, bool) {
sched.mtx.Lock()
defer sched.mtx.Unlock()
sched.queueMtx.Lock()
defer sched.queueMtx.Unlock()

// fetch a job for processing
job, err := sched.queue.Pop()
Expand Down
4 changes: 2 additions & 2 deletions quartz/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestScheduler_BlockingSemantics(t *testing.T) {

opts.OutdatedThreshold = 10 * time.Millisecond

sched := quartz.NewStdSchedulerWithOptions(opts, nil)
sched := quartz.NewStdSchedulerWithOptions(opts, nil, nil)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
sched.Start(ctx)
Expand Down Expand Up @@ -394,7 +394,7 @@ func TestScheduler_MisfiredJob(t *testing.T) {
OutdatedThreshold: time.Millisecond,
RetryInterval: time.Millisecond,
MisfiredChan: misfiredChan,
}, nil)
}, nil, nil)

jobDetail := quartz.NewJobDetail(funcJob, quartz.NewJobKey("funcJob"))
err := sched.ScheduleJob(jobDetail, quartz.NewSimpleTrigger(2*time.Millisecond))
Expand Down
Loading