Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add election leader #561

Merged
merged 5 commits into from
Sep 6, 2023

Conversation

rfyiamcool
Copy link

@rfyiamcool rfyiamcool commented Sep 5, 2023

summary

Sometimes, we need gocron to support active-standby mode. Only the active instance can run jobs, and other instances are just backup instances. Only when there is a problem with the master instance can other backup instances run jobs.

we can use libs to implement election interface, e.g. etcd electionk8s leaderelectionconsulzk and redis lock. 😁

It is recommended that gocron can give users more choices, allowing users to choose distributed locks and distributed election solutions according to their needs.

election vs lock

Of course, the distributed lock solution is also very good, each gocron instances try to acquire lock, each gocron instaces can run jobs with probability.

The advantage of election is that all tasks are executed on one instance, and more business optimization operations can be done, such as simpler code, local cache, connection pool, etc. 😁

image

Aftger the server-1 crashes and exits abnormally,the server-2 elected as leader. the server-2 can schedule all jobs.

image

@rfyiamcool
Copy link
Author

rfyiamcool commented Sep 5, 2023

example

package main

import (
	"context"
	"errors"
	"fmt"
	"math/rand"
	"os"
	"os/signal"
	"sync"
	"time"

	"github.com/go-co-op/gocron"
	"go.etcd.io/etcd/clientv3"
	"go.etcd.io/etcd/clientv3/concurrency"
)

var (
	serverName = fmt.Sprintf("gocron-%03d", rand.Intn(100))
	endpoints  = []string{"http://127.0.0.1:2379"}
)

type Election struct {
	mu       sync.RWMutex
	ctx      context.Context
	cancel   context.CancelFunc
	isLeader bool
}

func newElection(ctx context.Context) *Election {
	cctx, cancel := context.WithCancel(ctx)
	return &Election{
		ctx:    cctx,
		cancel: cancel,
	}
}

func (e *Election) Close() error {
	e.cancel()
	return nil
}

func (e *Election) IsLeader(_ context.Context) error {
	e.mu.RLock()
	defer e.mu.RUnlock()

	if e.isLeader {
		return nil
	}

	return errors.New("non leader")
}

func (e *Election) start() error {
	cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
	if err != nil {
		return err
	}
	defer cli.Close()

	session, err := concurrency.NewSession(cli, concurrency.WithTTL(10))
	if err != nil {
		return err
	}
	defer session.Close()

	electionHandler := concurrency.NewElection(session, "/my-election")
	fmt.Println(123)

	go func() {
		for e.ctx.Err() == nil {
			if err := electionHandler.Campaign(context.Background(), serverName); err != nil {
				fmt.Println("campaign", err)
			}

			time.Sleep(200 * time.Millisecond)
		}
	}()

	cctx, cancel := context.WithCancel(context.TODO())
	defer cancel()

	for e.ctx.Err() == nil {
		resp := <-electionHandler.Observe(cctx)
		if len(resp.Kvs) == 0 {
			continue
		}

		e.mu.Lock()
		if string(resp.Kvs[0].Value) == serverName {
			e.isLeader = true
		} else {
			e.isLeader = false
			fmt.Println("current leader is", string(resp.Kvs[0].Value))
		}
		e.mu.Unlock()

		time.Sleep(1e9)
	}

	return nil
}

func main() {
	el := newElection(context.Background())
	go el.start()

	fmt.Println("start", serverName)

	s := gocron.NewScheduler(time.UTC)
	s.WithDistributedElection(el)

	s.Every("1s").Do(func() {
		fmt.Println("call 1s", serverName)
	})

	s.Every("2s").Do(func() {
		fmt.Println("call 2s", serverName)
	})

	s.Every("3s").Do(func() {
		fmt.Println("call 3s", serverName)
	})

	s.StartAsync()

	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt)
	<-c

	fmt.Println("exit")
}

executor.go Outdated
@@ -160,6 +161,12 @@ func (e *executor) runJob(f jobFunction) {
if lockKey == "" {
lockKey = f.funcName
}
if e.distributedElection != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be problematic if someone runs both election and the locker? They seem mutually exclusive in that you'd use one or the other, so perhaps this should be an if/else?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. 😁 Can you see if this can be changed ?

locker.go Outdated Show resolved Hide resolved
locker.go Outdated Show resolved Hide resolved
scheduler.go Outdated Show resolved Hide resolved
Copy link
Contributor

@JohnRoesler JohnRoesler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice - I like this addition. Just comments on naming.

@rfyiamcool do you have an implementation in mind that you'd like to add? I can create a repo for you in go-co-op for gocron-elector-etcd? or whichever tech

@rfyiamcool
Copy link
Author

do you have an implementation in mind that you'd like to add? I can create a repo for you in go-co-op for gocron-elector-etcd? or whichever tech

good,i can. 😀

@JohnRoesler JohnRoesler merged commit 51d96d3 into go-co-op:main Sep 6, 2023
6 checks passed
@husam-e
Copy link

husam-e commented Sep 19, 2023

Cool didn't realize this was so recently added! Very nice. Thinking of adding an impl for spindle.

One thing that confused me a bit though in the docs is on potential race conditions and incompatibility between the Distributed Locker and SingletonMode/SetMaxConcurrentJobs.

  1. Why do those modes not work with the Distributed Locker?
  2. Do those modes work with the DistributedElector?

@JohnRoesler
Copy link
Contributor

@husam-e

  1. The distributed locker allows running multiple active schedulers at the same time. So if you start using job modes that allow jobs to run at different times from when they are scheduled, with how the locker is implemented (at execution time) we could no longer guarantee that the locker will do anything. There is nothing that actually prohibits running with those modes IIRC, it's just a big warning because I can't guarantee it will behave well or work given the design.
  2. They would work with elector, because it's only ever running a single instance as the active gocron scheduler.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants