-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathetcd_lease_watcher.go
51 lines (40 loc) · 1.05 KB
/
etcd_lease_watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package main
import (
"context"
"log"
clientv3 "go.etcd.io/etcd/client/v3"
)
const keyPrefix = "/my-data/"
func watchExpiredLease(cli *clientv3.Client) <-chan *clientv3.Event {
events := make(chan *clientv3.Event)
wCh := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix(), clientv3.WithPrevKV(), clientv3.WithFilterPut())
go func() {
defer close(events)
for wResp := range wCh {
for _, ev := range wResp.Events {
expired, err := isExpired(cli, ev)
if err != nil {
log.Println("Error when checking expiry")
} else if expired {
events <- ev
}
}
}
}()
return events
}
// isExpired decides if a DELETE event happended because of a lease expiry
func isExpired(cli *clientv3.Client, ev *clientv3.Event) (bool, error) {
if ev.PrevKv == nil {
return false, nil
}
leaseID := clientv3.LeaseID(ev.PrevKv.Lease)
if leaseID == clientv3.NoLease {
return false, nil
}
ttlResponse, err := cli.TimeToLive(context.Background(), leaseID)
if err != nil {
return false, err
}
return ttlResponse.TTL == -1, nil
}