Skip to content

Commit

Permalink
feat: add election leader (#561)
Browse files Browse the repository at this point in the history
* feat: add election leader

* feat: add election leader

* Apply suggestions from code review

Co-authored-by: John Roesler <[email protected]>

* feat: add election leader

* feat: add election leader

---------

Co-authored-by: John Roesler <[email protected]>
  • Loading branch information
rfyiamcool and JohnRoesler authored Sep 6, 2023
1 parent af55934 commit 51d96d3
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 1 deletion.
13 changes: 12 additions & 1 deletion executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ type executor struct {
limitModeRunningJobs *atomic.Int64 // tracks the count of running jobs to check against the max
stopped *atomic.Bool // allow workers to drain the buffered limitModeQueue

distributedLocker Locker // support running jobs across multiple instances
distributedLocker Locker // support running jobs across multiple instances
distributedElector Elector // support running jobs across multiple instances
}

func newExecutor() executor {
Expand Down Expand Up @@ -160,6 +161,14 @@ func (e *executor) runJob(f jobFunction) {
if lockKey == "" {
lockKey = f.funcName
}
if e.distributedElector != nil {
err := e.distributedElector.IsLeader(e.ctx)
if err != nil {
return
}
runJob(f)
return
}
if e.distributedLocker != nil {
l, err := e.distributedLocker.Lock(f.ctx, lockKey)
if err != nil || l == nil {
Expand Down Expand Up @@ -190,6 +199,8 @@ func (e *executor) runJob(f jobFunction) {
}
_ = l.Unlock(f.ctx)
}()
runJob(f)
return
}
runJob(f)
case singletonMode:
Expand Down
7 changes: 7 additions & 0 deletions locker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,10 @@ type Locker interface {
type Lock interface {
Unlock(ctx context.Context) error
}

// Elector determines the leader from instances asking to be the leader. Only
// the leader runs jobs. If the leader goes down, a new leader will be elected.
type Elector interface {
// IsLeader should return an error if the job should not be scheduled and nil if the job should be scheduled.
IsLeader(ctx context.Context) error
}
12 changes: 12 additions & 0 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1493,6 +1493,18 @@ func (s *Scheduler) WithDistributedLocker(l Locker) {
s.executor.distributedLocker = l
}

// WithDistributedElector prevents the same job from being run more than once
// when multiple schedulers are trying to schedule the same job, by allowing only
// the leader to run jobs. Non-leaders wait until the leader instance goes down
// and then a new leader is elected.
//
// Compared with the distributed lock, the election is the same as leader/follower framework.
// All jobs are only scheduled and execute on the leader scheduler instance. Only when the leader scheduler goes down
// and one of the scheduler instances is successfully elected, then the new leader scheduler instance can schedule jobs.
func (s *Scheduler) WithDistributedElector(e Elector) {
s.executor.distributedElector = e
}

// RegisterEventListeners accepts EventListeners and registers them for all jobs
// in the scheduler at the time this function is called.
// The event listeners are then called at the times described by each listener.
Expand Down
66 changes: 66 additions & 0 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gocron

import (
"context"
"errors"
"fmt"
"sort"
"sync"
Expand Down Expand Up @@ -2851,3 +2852,68 @@ func TestDataRace(t *testing.T) {
time.Sleep(1 * time.Second)
sut.Stop()
}

var _ Elector = (*elector)(nil)

type elector struct {
isLeader bool
}

func (e *elector) IsLeader(_ context.Context) error {
if e.isLeader {
return nil
}
return errors.New("is not leader")
}

func (e *elector) setLeader() {
e.isLeader = true
}

func TestScheduler_EnableDistributedElector(t *testing.T) {
runTestWithDistributedElector(t, 0)
}

func TestScheduler_EnableDistributedElectorWithMaxConcurrent(t *testing.T) {
runTestWithDistributedElector(t, 1)
}

func runTestWithDistributedElector(t *testing.T, maxConcurrentJobs int) {
resultChan := make(chan int, 20)
f := func(schedulerInstance int) {
resultChan <- schedulerInstance
}

leaderIndex := 0
schedulers := make([]*Scheduler, 0)
for i := 0; i < 3; i++ {
el := &elector{}
if i == leaderIndex {
el.setLeader()
}

s := NewScheduler(time.UTC)
s.WithDistributedElector(el)
if maxConcurrentJobs > 0 {
s.SetMaxConcurrentJobs(maxConcurrentJobs, WaitMode)
}
_, err := s.Every("50ms").Do(f, i)
require.NoError(t, err)
schedulers = append(schedulers, s)
}
for i := range schedulers {
schedulers[i].StartAsync()
}
time.Sleep(530 * time.Millisecond)
for i := range schedulers {
schedulers[i].Stop()
}
close(resultChan)

// 10 <- len <- 12
assert.Greater(t, len(resultChan), 10)
assert.Less(t, len(resultChan), 12)
for r := range resultChan {
assert.Equal(t, leaderIndex, r)
}
}

0 comments on commit 51d96d3

Please sign in to comment.