Skip to content

Commit

Permalink
fix removing a job not stopping with limit mode
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnRoesler committed Sep 21, 2023
1 parent 7a102da commit c722202
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 10 deletions.
6 changes: 5 additions & 1 deletion executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,11 @@ func (e *executor) limitModeRunner() {
return
case jf := <-e.limitModeQueue:
if !e.stopped.Load() {
e.runJob(jf)
select {
case <-jf.ctx.Done():
default:
e.runJob(jf)
}
}
}
}
Expand Down
25 changes: 16 additions & 9 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1605,17 +1605,24 @@ func TestScheduler_SetMaxConcurrentJobs(t *testing.T) {
f func()
}{
// Expecting a total of 4 job runs:
// 0ms - 2 jobs are run, the 3rd job hits the limit and is skipped
// 100ms - job 1 hits the limit and is skipped
// 200ms - job 1 & 2 run
// 300ms - jobs 1 & 3 hit the limit and are skipped
// 0ms - 2 jobs are run, 1 is skipped
// 100ms - 3 jobs hit the limit and are skipped
// 200ms - 2 jobs are run, 1 is skipped
// 300ms - 3 jobs hit the limit and are skipped
{
"reschedule mode", 2, RescheduleMode, 4, false,
func() {
semaphore <- true
time.Sleep(200 * time.Millisecond)
},
},
{
"reschedule mode with job removal", 2, RescheduleMode, 4, true,
func() {
semaphore <- true
time.Sleep(200 * time.Millisecond)
},
},

// Expecting a total of 8 job runs. The exact order of jobs may vary, for example:
// 0ms - jobs 2 & 3 run, job 1 hits the limit and waits
Expand Down Expand Up @@ -1648,10 +1655,10 @@ func TestScheduler_SetMaxConcurrentJobs(t *testing.T) {
j1, err := s.Every("100ms").Do(tc.f)
require.NoError(t, err)

j2, err := s.Every("200ms").Do(tc.f)
j2, err := s.Every("100ms").Do(tc.f)
require.NoError(t, err)

j3, err := s.Every("300ms").Do(tc.f)
j3, err := s.Every("100ms").Do(tc.f)
require.NoError(t, err)

s.StartAsync()
Expand Down Expand Up @@ -1682,13 +1689,13 @@ func TestScheduler_SetMaxConcurrentJobs(t *testing.T) {
for time.Now().Before(now.Add(200 * time.Millisecond)) {
select {
case <-semaphore:
counter++
t.Error("received a job run after jobs were removed or scheduler stopeed")
default:
}
}

assert.GreaterOrEqual(t, counter, tc.expectedRuns-2)
assert.LessOrEqual(t, counter, tc.expectedRuns+2)
assert.GreaterOrEqual(t, counter, tc.expectedRuns)
assert.LessOrEqual(t, counter, tc.expectedRuns)

if tc.removeJobs {
s.Stop()
Expand Down

0 comments on commit c722202

Please sign in to comment.