From b339f733437dae5eea11d8b01a65c9ac503220eb Mon Sep 17 00:00:00 2001 From: "fengyun.rui" Date: Tue, 5 Sep 2023 03:17:20 +0800 Subject: [PATCH] fix: block unlock with limit concurrency (#559) * fix: block unlock with limit concurrency * fix: block unlock with limit concurrency --- executor.go | 11 ++++++++++- scheduler_test.go | 38 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/executor.go b/executor.go index 401f7fcb..f1a30acd 100644 --- a/executor.go +++ b/executor.go @@ -170,8 +170,17 @@ func (e *executor) runJob(f jobFunction) { if durationToNextRun > time.Second*5 { durationToNextRun = time.Second * 5 } + + delay := time.Duration(float64(durationToNextRun) * 0.9) + if e.limitModeMaxRunningJobs > 0 { + time.AfterFunc(delay, func() { + _ = l.Unlock(f.ctx) + }) + return + } + if durationToNextRun > time.Millisecond*100 { - timer := time.NewTimer(time.Duration(float64(durationToNextRun) * 0.9)) + timer := time.NewTimer(delay) defer timer.Stop() select { diff --git a/scheduler_test.go b/scheduler_test.go index 0e04bc18..22d63f64 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -2702,6 +2702,44 @@ func runTestWithDistributedLocking(t *testing.T, maxConcurrentJobs int) { assert.Len(t, results, 4) } +func TestWithDistributedLockingBlocking(t *testing.T) { + var ( + maxConcurrentJobs = 1 + counter = 10 + resultChan = make(chan time.Time, 20) + ) + + f := func() { + if counter == 0 { + close(resultChan) + return + } + + resultChan <- time.Now() + counter-- + } + + l := &locker{ + store: make(map[string]struct{}, 0), + } + + s := NewScheduler(time.UTC) + s.WithDistributedLocker(l) + s.SetMaxConcurrentJobs(maxConcurrentJobs, WaitMode) + + s.Every(1).Seconds().Do(func() {}) + s.Every(150).Milliseconds().Do(f) + s.StartAsync() + + last := time.Now() + for ts := range resultChan { + assert.True(t, ts.Sub(last) <= 200*time.Millisecond) + last = ts + } + + s.Stop() +} + func TestScheduler_WithDistributedLocker_With_Name(t *testing.T) { testCases := []struct { description string