Skip to content

Commit

Permalink
feat: add election leader
Browse files Browse the repository at this point in the history
  • Loading branch information
rfyiamcool committed Sep 6, 2023
1 parent 4636313 commit c8316b6
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 17 deletions.
8 changes: 4 additions & 4 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ type executor struct {
limitModeRunningJobs *atomic.Int64 // tracks the count of running jobs to check against the max
stopped *atomic.Bool // allow workers to drain the buffered limitModeQueue

distributedLocker Locker // support running jobs across multiple instances
distributedElection Election // support running jobs across multiple instances
distributedLocker Locker // support running jobs across multiple instances
distributedElector Elector // support running jobs across multiple instances
}

func newExecutor() executor {
Expand Down Expand Up @@ -161,8 +161,8 @@ func (e *executor) runJob(f jobFunction) {
if lockKey == "" {
lockKey = f.funcName
}
if e.distributedElection != nil {
err := e.distributedElection.IsLeader(e.ctx)
if e.distributedElector != nil {
err := e.distributedElector.IsLeader(e.ctx)
if err != nil {
return
}
Expand Down
4 changes: 2 additions & 2 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1495,14 +1495,14 @@ func (s *Scheduler) WithDistributedLocker(l Locker) {

// WithDistributedElector prevents the same job from being run more than once
// when multiple schedulers are trying to schedule the same job, by allowing only
// the leader to run jobs. Non-leaders wait until the leader instance goes down
// the leader to run jobs. Non-leaders wait until the leader instance goes down
// and then a new leader is elected.
//
// Compared with the distributed lock, the election is the same as leader/follower framework.
// All jobs are only scheduled and execute on the leader scheduler instance. Only when the leader scheduler goes down
// and one of the scheduler instances is successfully elected, then the new leader scheduler instance can schedule jobs.
func (s *Scheduler) WithDistributedElector(e Elector) {
s.executor.distributedElection = el
s.executor.distributedElector = e
}

// RegisterEventListeners accepts EventListeners and registers them for all jobs
Expand Down
22 changes: 11 additions & 11 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2853,32 +2853,32 @@ func TestDataRace(t *testing.T) {
sut.Stop()
}

var _ Election = (*election)(nil)
var _ Elector = (*elector)(nil)

type election struct {
type elector struct {
isLeader bool
}

func (e *election) IsLeader(_ context.Context) error {
func (e *elector) IsLeader(_ context.Context) error {
if e.isLeader {
return nil
}
return errors.New("is not leader")
}

func (e *election) setLeader() {
func (e *elector) setLeader() {
e.isLeader = true
}

func TestScheduler_EnableDistributedElection(t *testing.T) {
runTestWithDistributedElection(t, 0)
func TestScheduler_EnableDistributedElector(t *testing.T) {
runTestWithDistributedElector(t, 0)
}

func TestScheduler_EnableDistributedElectionWithMaxConcurrent(t *testing.T) {
runTestWithDistributedElection(t, 1)
func TestScheduler_EnableDistributedElectorWithMaxConcurrent(t *testing.T) {
runTestWithDistributedElector(t, 1)
}

func runTestWithDistributedElection(t *testing.T, maxConcurrentJobs int) {
func runTestWithDistributedElector(t *testing.T, maxConcurrentJobs int) {
resultChan := make(chan int, 20)
f := func(schedulerInstance int) {
resultChan <- schedulerInstance
Expand All @@ -2887,13 +2887,13 @@ func runTestWithDistributedElection(t *testing.T, maxConcurrentJobs int) {
leaderIndex := 0
schedulers := make([]*Scheduler, 0)
for i := 0; i < 3; i++ {
el := &election{}
el := &elector{}
if i == leaderIndex {
el.setLeader()
}

s := NewScheduler(time.UTC)
s.WithDistributedElection(el)
s.WithDistributedElector(el)
if maxConcurrentJobs > 0 {
s.SetMaxConcurrentJobs(maxConcurrentJobs, WaitMode)
}
Expand Down

0 comments on commit c8316b6

Please sign in to comment.