-
Notifications
You must be signed in to change notification settings - Fork 0
/
leader_election.go
122 lines (107 loc) · 2.27 KB
/
leader_election.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package main
import (
"context"
"log"
"os"
"time"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
)
const (
HOST = "host1"
ETCD_ENDPOINTS = "http://123.456.789.10:2379"
)
func init() {
log.SetFlags(log.Lshortfile)
}
type Server struct {
client *clientv3.Client
lease clientv3.Lease
leaseGrantResp *clientv3.LeaseGrantResponse
}
func NewServer(endpoints []string) *Server {
cfg := clientv3.Config{
Endpoints: endpoints,
DialTimeout: 2 * time.Second,
}
c, err := clientv3.New(cfg)
if err != nil {
log.Println(err)
os.Exit(-1)
}
srv := &Server{}
srv.client = c
lease := clientv3.NewLease(srv.client)
srv.lease = lease
leaseGrantResp, err := lease.Grant(context.TODO(), 10)
if err != nil {
log.Println(err)
os.Exit(-1)
}
srv.leaseGrantResp = leaseGrantResp
leaseKeepAliveChan, err := lease.KeepAlive(context.TODO(), leaseGrantResp.ID)
if err != nil {
log.Println(err)
os.Exit(-1)
}
go func() {
for {
select {
case <-leaseKeepAliveChan:
}
}
}()
return srv
}
func (srv *Server) register(key string) (bool, error) {
kv := clientv3.NewKV(srv.client)
resp, err := kv.Txn(context.TODO()).
If(clientv3.Compare(clientv3.Version(key), "=", 0)).
Then(clientv3.OpPut(key, HOST, clientv3.WithLease(srv.leaseGrantResp.ID))).
Commit()
if err != nil {
return false, err
}
if resp.Succeeded {
log.Println("i am the primary")
} else {
resp, err := srv.client.Get(context.TODO(), key)
if err != nil {
log.Println(err)
}
for _, kv := range resp.Kvs {
log.Printf("%s is the primary\n", kv.Value)
}
}
return resp.Succeeded, nil
}
func (srv *Server) watch(key string) bool {
wch := srv.client.Watch(context.TODO(), key)
for wresp := range wch {
for _, ev := range wresp.Events {
switch ev.Type {
case mvccpb.DELETE:
return true
}
}
}
return false
}
func (srv *Server) Register() {
go func() {
srv.register("service")
for {
if srv.watch("service") {
srv.register("service")
}
}
}()
}
func main() {
log.Println("start...")
endpoints := []string{ETCD_ENDPOINTS}
srv := NewServer(endpoints)
srv.Register()
log.Println("end...")
select {}
}