From c722202ec119ff939a24c575e8bd99607aa16315 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Wed, 20 Sep 2023 21:05:22 -0500 Subject: [PATCH] fix removing a job not stopping with limit mode --- executor.go | 6 +++++- scheduler_test.go | 25 ++++++++++++++++--------- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/executor.go b/executor.go index 8dea17c3..894468c4 100644 --- a/executor.go +++ b/executor.go @@ -128,7 +128,11 @@ func (e *executor) limitModeRunner() { return case jf := <-e.limitModeQueue: if !e.stopped.Load() { - e.runJob(jf) + select { + case <-jf.ctx.Done(): + default: + e.runJob(jf) + } } } } diff --git a/scheduler_test.go b/scheduler_test.go index 91b5be58..d94b6c01 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -1605,10 +1605,10 @@ func TestScheduler_SetMaxConcurrentJobs(t *testing.T) { f func() }{ // Expecting a total of 4 job runs: - // 0ms - 2 jobs are run, the 3rd job hits the limit and is skipped - // 100ms - job 1 hits the limit and is skipped - // 200ms - job 1 & 2 run - // 300ms - jobs 1 & 3 hit the limit and are skipped + // 0ms - 2 jobs are run, 1 is skipped + // 100ms - 3 jobs hit the limit and are skipped + // 200ms - 2 jobs are run, 1 is skipped + // 300ms - 3 jobs hit the limit and are skipped { "reschedule mode", 2, RescheduleMode, 4, false, func() { @@ -1616,6 +1616,13 @@ func TestScheduler_SetMaxConcurrentJobs(t *testing.T) { time.Sleep(200 * time.Millisecond) }, }, + { + "reschedule mode with job removal", 2, RescheduleMode, 4, true, + func() { + semaphore <- true + time.Sleep(200 * time.Millisecond) + }, + }, // Expecting a total of 8 job runs. The exact order of jobs may vary, for example: // 0ms - jobs 2 & 3 run, job 1 hits the limit and waits @@ -1648,10 +1655,10 @@ func TestScheduler_SetMaxConcurrentJobs(t *testing.T) { j1, err := s.Every("100ms").Do(tc.f) require.NoError(t, err) - j2, err := s.Every("200ms").Do(tc.f) + j2, err := s.Every("100ms").Do(tc.f) require.NoError(t, err) - j3, err := s.Every("300ms").Do(tc.f) + j3, err := s.Every("100ms").Do(tc.f) require.NoError(t, err) s.StartAsync() @@ -1682,13 +1689,13 @@ func TestScheduler_SetMaxConcurrentJobs(t *testing.T) { for time.Now().Before(now.Add(200 * time.Millisecond)) { select { case <-semaphore: - counter++ + t.Error("received a job run after jobs were removed or scheduler stopeed") default: } } - assert.GreaterOrEqual(t, counter, tc.expectedRuns-2) - assert.LessOrEqual(t, counter, tc.expectedRuns+2) + assert.GreaterOrEqual(t, counter, tc.expectedRuns) + assert.LessOrEqual(t, counter, tc.expectedRuns) if tc.removeJobs { s.Stop()