Skip to content

Commit

Permalink
refactor: update ttl event expiry model
Browse files Browse the repository at this point in the history
Signed-off-by: zhiyuan <[email protected]>
  • Loading branch information
gshilei authored and brandond committed Nov 3, 2023
1 parent 13bff3e commit 5fd1780
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 21 deletions.
7 changes: 6 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ require (
github.com/rancher/wrangler v1.1.1-0.20230425173236-39a4707f0689
github.com/shengdoushi/base58 v1.0.0
github.com/sirupsen/logrus v1.9.0
github.com/soheilhy/cmux v0.1.5
github.com/urfave/cli v1.22.4
go.etcd.io/etcd/api/v3 v3.5.9
go.etcd.io/etcd/client/pkg/v3 v3.5.9
go.etcd.io/etcd/client/v3 v3.5.9
go.etcd.io/etcd/server/v3 v3.5.9
google.golang.org/grpc v1.56.3
k8s.io/client-go v0.25.4
)

require (
Expand All @@ -36,6 +36,7 @@ require (
github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.4.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
Expand All @@ -61,6 +62,7 @@ require (
github.com/rogpeppe/go-internal v1.10.0 // indirect
github.com/russross/blackfriday/v2 v2.0.1 // indirect
github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect
github.com/soheilhy/cmux v0.1.5 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
Expand All @@ -87,5 +89,8 @@ require (
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
k8s.io/apimachinery v0.25.4 // indirect
k8s.io/klog/v2 v2.70.1 // indirect
k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)
14 changes: 14 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI=
github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
Expand Down Expand Up @@ -301,6 +305,7 @@ github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVs
github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js=
github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down Expand Up @@ -686,6 +691,15 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/apimachinery v0.25.4 h1:CtXsuaitMESSu339tfhVXhQrPET+EiWnIY1rcurKnAc=
k8s.io/apimachinery v0.25.4/go.mod h1:jaF9C/iPNM1FuLl7Zuy5b9v+n35HGSh6AQ4HYRkCqwo=
k8s.io/client-go v0.25.4 h1:3RNRDffAkNU56M/a7gUfXaEzdhZlYhoW8dgViGy5fn8=
k8s.io/client-go v0.25.4/go.mod h1:8trHCAC83XKY0wsBIpbirZU4NTUpbuhc2JnI7OruGZw=
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
k8s.io/klog/v2 v2.70.1 h1:7aaoSdahviPmR+XkS7FyxlkkXs6tHISSG03RxleQAVQ=
k8s.io/klog/v2 v2.70.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed h1:jAne/RjBTyawwAy0utX5eqigAwz/lQhTmy+Hr/Cpue4=
k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
Expand Down
149 changes: 129 additions & 20 deletions pkg/logstructured/logstructured.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,14 @@ import (
"sync"
"time"

"github.com/k3s-io/kine/pkg/server"
"github.com/sirupsen/logrus"
"k8s.io/client-go/util/workqueue"

"github.com/k3s-io/kine/pkg/server"
)

const (
retryInterval = 250 * time.Millisecond
)

type Log interface {
Expand All @@ -20,6 +26,12 @@ type Log interface {
DbSize(ctx context.Context) (int64, error)
}

type ttlEventKV struct {
key string
modRevision int64
expiredAt time.Time
}

type LogStructured struct {
log Log
}
Expand Down Expand Up @@ -252,66 +264,163 @@ func (l *LogStructured) Update(ctx context.Context, key string, value []byte, re
return rev, updateEvent.KV, true, err
}

func (l *LogStructured) ttl(ctx context.Context) {
queue := workqueue.NewDelayingQueue()
rwMutex := &sync.RWMutex{}
ttlEventKVMap := make(map[string]*ttlEventKV)
go func() {
for l.handleTTLEvents(ctx, rwMutex, queue, ttlEventKVMap) {
}
}()

for {
select {
case <-ctx.Done():
queue.ShutDown()
return
default:
}

for event := range l.ttlEvents(ctx) {
if event.Delete {
continue
}

eventKV := loadTTLEventKV(rwMutex, ttlEventKVMap, event.KV.Key)
if eventKV == nil {
logrus.Tracef("Add ttl event key %v, modRev %v", event.KV.Key, event.KV.ModRevision)
expires := storeTTLEventKV(rwMutex, ttlEventKVMap, event.KV)
queue.AddAfter(event.KV.Key, expires)
} else {
if event.KV.ModRevision > eventKV.modRevision {
logrus.Tracef("Update ttl event key %v, modRev %v", event.KV.Key, event.KV.ModRevision)
expires := storeTTLEventKV(rwMutex, ttlEventKVMap, event.KV)
queue.AddAfter(event.KV.Key, expires)
}
}
}
}
}

func (l *LogStructured) handleTTLEvents(ctx context.Context, rwMutex *sync.RWMutex, queue workqueue.DelayingInterface, store map[string]*ttlEventKV) bool {
key, shutdown := queue.Get()
if shutdown {
logrus.Info("TTL events work queue has shut down")
return false
}
defer queue.Done(key)

eventKV := loadTTLEventKV(rwMutex, store, key.(string))
if eventKV == nil {
logrus.Errorf("Failed to find ttl event for key %v", key)
return true
}

if eventKV.expiredAt.After(time.Now()) {
logrus.Tracef("TTL event key %v has not expired yet, the latest expiration time is %v, requeuing", key, eventKV.expiredAt)
queue.AddAfter(key, time.Until(eventKV.expiredAt))
return true
}

l.deleteTTLEvent(ctx, rwMutex, queue, store, eventKV)
return true
}

func (l *LogStructured) deleteTTLEvent(ctx context.Context, rwMutex *sync.RWMutex, queue workqueue.DelayingInterface, store map[string]*ttlEventKV, preEventKV *ttlEventKV) {
logrus.Tracef("Delete ttl event key %v, modRev %v", preEventKV.key, preEventKV.modRevision)
_, _, _, err := l.Delete(ctx, preEventKV.key, preEventKV.modRevision)

rwMutex.Lock()
defer rwMutex.Unlock()
curEventKV := store[preEventKV.key]
if curEventKV.expiredAt.After(preEventKV.expiredAt) {
logrus.Tracef("TTL event key %v has updated, requeuing", curEventKV.key)
queue.AddAfter(curEventKV.key, time.Until(curEventKV.expiredAt))
return
}
if err != nil {
logrus.Errorf("Failed to delete key %v at end of lease: %v, requeuing", curEventKV.key, err)
queue.AddAfter(curEventKV.key, retryInterval)
return
}

delete(store, curEventKV.key)
}

func (l *LogStructured) ttlEvents(ctx context.Context) chan *server.Event {
result := make(chan *server.Event)
lastListRevision := make(chan int64)
wg := sync.WaitGroup{}
wg.Add(2)

go func() {
wg.Wait()
close(result)
close(lastListRevision)
}()

go func() {
defer wg.Done()
var lastRev int64

rev, events, err := l.log.List(ctx, "/", "", 1000, 0, false)
for len(events) > 0 {
if err != nil {
logrus.Errorf("failed to read old events for ttl")
return
logrus.Errorf("Failed to read old events for ttl: %v", err)
break
}

for _, event := range events {
if event.KV.Lease > 0 {
result <- event
}

if event.KV.ModRevision > lastRev {
lastRev = event.KV.ModRevision
}
}

_, events, err = l.log.List(ctx, "/", events[len(events)-1].KV.Key, 1000, rev, false)
}
lastListRevision <- lastRev
}()

go func() {
defer wg.Done()
for events := range l.log.Watch(ctx, "/") {
revision := <-lastListRevision
if revision == 0 {
logrus.Error("TTL events last list revision is zero, retry to process ttl events")
return
}
for events := range l.Watch(ctx, "/", revision) {
for _, event := range events {
if event.KV.Lease > 0 {
result <- event
}
}
}
logrus.Info("TTL events watch channel was closed")
}()

return result
}

func (l *LogStructured) ttl(ctx context.Context) {
// vary naive TTL support
mutex := &sync.Mutex{}
for event := range l.ttlEvents(ctx) {
go func(event *server.Event) {
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(event.KV.Lease) * time.Second):
}
mutex.Lock()
if _, _, _, err := l.Delete(ctx, event.KV.Key, event.KV.ModRevision); err != nil {
logrus.Errorf("failed to delete expired key: %v", err)
}
mutex.Unlock()
}(event)
func loadTTLEventKV(rwMutex *sync.RWMutex, store map[string]*ttlEventKV, key string) *ttlEventKV {
rwMutex.RLock()
defer rwMutex.RUnlock()
return store[key]
}

func storeTTLEventKV(rwMutex *sync.RWMutex, store map[string]*ttlEventKV, eventKV *server.KeyValue) time.Duration {
rwMutex.Lock()
defer rwMutex.Unlock()
expires := time.Duration(eventKV.Lease) * time.Second
store[eventKV.Key] = &ttlEventKV{
key: eventKV.Key,
modRevision: eventKV.ModRevision,
expiredAt: time.Now().Add(expires),
}
return expires
}

func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64) <-chan []*server.Event {
Expand Down

0 comments on commit 5fd1780

Please sign in to comment.