From 894970124c3e669280c351178db6d8a2d682ef56 Mon Sep 17 00:00:00 2001 From: Manuel Doncel Martos Date: Wed, 19 Jun 2024 13:33:50 +0200 Subject: [PATCH] after lock error listener (#734) * after job listener * fixing lint issues --------- Co-authored-by: lv90no --- example_test.go | 32 ++++++++++++++++++++++ executor.go | 2 ++ job.go | 13 +++++++++ job_test.go | 11 ++++++++ scheduler_test.go | 69 +++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 127 insertions(+) diff --git a/example_test.go b/example_test.go index 928b92f1..a4baa8f2 100644 --- a/example_test.go +++ b/example_test.go @@ -1,6 +1,8 @@ package gocron_test import ( + "context" + "errors" "fmt" "sync" "time" @@ -10,6 +12,14 @@ import ( "github.com/jonboulle/clockwork" ) +var _ Locker = new(errorLocker) + +type errorLocker struct{} + +func (e errorLocker) Lock(_ context.Context, _ string) (Lock, error) { + return nil, errors.New("locked") +} + func ExampleAfterJobRuns() { s, _ := NewScheduler() defer func() { _ = s.Shutdown() }() @@ -52,6 +62,28 @@ func ExampleAfterJobRunsWithError() { ) } +func ExampleAfterLockError() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func() {}, + ), + WithDistributedJobLocker(&errorLocker{}), + WithEventListeners( + AfterLockError( + func(jobID uuid.UUID, jobName string, err error) { + // do something immediately before the job is run + }, + ), + ), + ) +} + func ExampleBeforeJobRuns() { s, _ := NewScheduler() defer func() { _ = s.Shutdown() }() diff --git a/executor.go b/executor.go index 64e12c5a..05e75d84 100644 --- a/executor.go +++ b/executor.go @@ -343,6 +343,7 @@ func (e *executor) runJob(j internalJob, jIn jobIn) { } else if j.locker != nil { lock, err := j.locker.Lock(j.ctx, j.name) if err != nil { + _ = callJobFuncWithParams(j.afterLockError, j.id, j.name, err) e.sendOutForRescheduling(&jIn) e.incrementJobCounter(j, Skip) return @@ -351,6 +352,7 @@ func (e *executor) runJob(j internalJob, jIn jobIn) { } else if e.locker != nil { lock, err := e.locker.Lock(j.ctx, j.name) if err != nil { + _ = callJobFuncWithParams(j.afterLockError, j.id, j.name, err) e.sendOutForRescheduling(&jIn) e.incrementJobCounter(j, Skip) return diff --git a/job.go b/job.go index 4a7b8cff..0ac792a2 100644 --- a/job.go +++ b/job.go @@ -42,6 +42,7 @@ type internalJob struct { afterJobRuns func(jobID uuid.UUID, jobName string) beforeJobRuns func(jobID uuid.UUID, jobName string) afterJobRunsWithError func(jobID uuid.UUID, jobName string, err error) + afterLockError func(jobID uuid.UUID, jobName string, err error) locker Locker } @@ -639,6 +640,18 @@ func BeforeJobRuns(eventListenerFunc func(jobID uuid.UUID, jobName string)) Even } } +// AfterLockError is used to when the distributed locker returns an error and +// then run the provided function. +func AfterLockError(eventListenerFunc func(jobID uuid.UUID, jobName string, err error)) EventListener { + return func(j *internalJob) error { + if eventListenerFunc == nil { + return ErrEventListenerFuncNil + } + j.afterLockError = eventListenerFunc + return nil + } +} + // ----------------------------------------------- // ----------------------------------------------- // ---------------- Job Schedules ---------------- diff --git a/job_test.go b/job_test.go index eff7a088..14c98765 100644 --- a/job_test.go +++ b/job_test.go @@ -437,12 +437,20 @@ func TestWithEventListeners(t *testing.T) { }, nil, }, + { + "afterLockError", + []EventListener{ + AfterLockError(func(_ uuid.UUID, _ string, _ error) {}), + }, + nil, + }, { "multiple event listeners", []EventListener{ AfterJobRuns(func(_ uuid.UUID, _ string) {}), AfterJobRunsWithError(func(_ uuid.UUID, _ string, _ error) {}), BeforeJobRuns(func(_ uuid.UUID, _ string) {}), + AfterLockError(func(_ uuid.UUID, _ string, _ error) {}), }, nil, }, @@ -488,6 +496,9 @@ func TestWithEventListeners(t *testing.T) { if ij.beforeJobRuns != nil { count++ } + if ij.afterLockError != nil { + count++ + } assert.Equal(t, len(tt.eventListeners), count) }) } diff --git a/scheduler_test.go b/scheduler_test.go index c701593e..a59b59bd 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -2,6 +2,7 @@ package gocron import ( "context" + "errors" "fmt" "os" "sync" @@ -49,6 +50,14 @@ func newTestScheduler(t *testing.T, options ...SchedulerOption) Scheduler { return s } +var _ Locker = new(errorLocker) + +type errorLocker struct{} + +func (e errorLocker) Lock(_ context.Context, _ string) (Lock, error) { + return nil, errors.New("locked") +} + func TestScheduler_OneSecond_NoOptions(t *testing.T) { defer verifyNoGoroutineLeaks(t) cronNoOptionsCh := make(chan struct{}, 10) @@ -1631,6 +1640,66 @@ func TestScheduler_WithEventListeners(t *testing.T) { } } +func TestScheduler_WithLocker_WithEventListeners(t *testing.T) { + defer verifyNoGoroutineLeaks(t) + + listenerRunCh := make(chan error, 1) + tests := []struct { + name string + locker Locker + tsk Task + el EventListener + expectRun bool + expectErr error + }{ + { + "AfterLockError", + errorLocker{}, + NewTask(func() {}), + AfterLockError(func(_ uuid.UUID, _ string, err error) { + listenerRunCh <- nil + }), + true, + nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := newTestScheduler(t) + _, err := s.NewJob( + DurationJob(time.Minute*10), + tt.tsk, + WithStartAt( + WithStartImmediately(), + ), + WithDistributedJobLocker(tt.locker), + WithEventListeners(tt.el), + WithLimitedRuns(1), + ) + require.NoError(t, err) + + s.Start() + if tt.expectRun { + select { + case err = <-listenerRunCh: + assert.ErrorIs(t, err, tt.expectErr) + case <-time.After(time.Second): + t.Fatal("timed out waiting for listener to run") + } + } else { + select { + case <-listenerRunCh: + t.Fatal("listener ran when it shouldn't have") + case <-time.After(time.Millisecond * 100): + } + } + + require.NoError(t, s.Shutdown()) + }) + } +} + func TestScheduler_ManyJobs(t *testing.T) { defer verifyNoGoroutineLeaks(t)