Skip to content

Commit

Permalink
fix: block unlock with limit concurrency (#559)
Browse files Browse the repository at this point in the history
* fix: block unlock with limit concurrency

* fix: block unlock with limit concurrency
  • Loading branch information
rfyiamcool authored Sep 4, 2023
1 parent d7dd276 commit b339f73
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 1 deletion.
11 changes: 10 additions & 1 deletion executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,17 @@ func (e *executor) runJob(f jobFunction) {
if durationToNextRun > time.Second*5 {
durationToNextRun = time.Second * 5
}

delay := time.Duration(float64(durationToNextRun) * 0.9)
if e.limitModeMaxRunningJobs > 0 {
time.AfterFunc(delay, func() {
_ = l.Unlock(f.ctx)
})
return
}

if durationToNextRun > time.Millisecond*100 {
timer := time.NewTimer(time.Duration(float64(durationToNextRun) * 0.9))
timer := time.NewTimer(delay)
defer timer.Stop()

select {
Expand Down
38 changes: 38 additions & 0 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2702,6 +2702,44 @@ func runTestWithDistributedLocking(t *testing.T, maxConcurrentJobs int) {
assert.Len(t, results, 4)
}

func TestWithDistributedLockingBlocking(t *testing.T) {
var (
maxConcurrentJobs = 1
counter = 10
resultChan = make(chan time.Time, 20)
)

f := func() {
if counter == 0 {
close(resultChan)
return
}

resultChan <- time.Now()
counter--
}

l := &locker{
store: make(map[string]struct{}, 0),
}

s := NewScheduler(time.UTC)
s.WithDistributedLocker(l)
s.SetMaxConcurrentJobs(maxConcurrentJobs, WaitMode)

s.Every(1).Seconds().Do(func() {})
s.Every(150).Milliseconds().Do(f)
s.StartAsync()

last := time.Now()
for ts := range resultChan {
assert.True(t, ts.Sub(last) <= 200*time.Millisecond)
last = ts
}

s.Stop()
}

func TestScheduler_WithDistributedLocker_With_Name(t *testing.T) {
testCases := []struct {
description string
Expand Down

0 comments on commit b339f73

Please sign in to comment.