diff --git a/pkg/drivers/nats/backend.go b/pkg/drivers/nats/backend.go index c6667b61..c96fa902 100644 --- a/pkg/drivers/nats/backend.go +++ b/pkg/drivers/nats/backend.go @@ -129,6 +129,11 @@ func (b *Backend) DbSize(ctx context.Context) (int64, error) { return b.kv.BucketSize(ctx) } +// CurrentRevision returns the current revision of the database. +func (b *Backend) CurrentRevision(ctx context.Context) (int64, error) { + return b.kv.BucketRevision(), nil +} + // Count returns an exact count of the number of matching keys and the current revision of the database. func (b *Backend) Count(ctx context.Context, prefix string) (int64, int64, error) { count, err := b.kv.Count(ctx, prefix) diff --git a/pkg/drivers/nats/logger.go b/pkg/drivers/nats/logger.go index 543384fd..f794fc66 100644 --- a/pkg/drivers/nats/logger.go +++ b/pkg/drivers/nats/logger.go @@ -115,3 +115,8 @@ func (b *BackendLogger) Watch(ctx context.Context, prefix string, revision int64 func (b *BackendLogger) DbSize(ctx context.Context) (int64, error) { return b.backend.DbSize(ctx) } + +// CurrentRevision returns the current revision of the database. +func (b *BackendLogger) CurrentRevision(ctx context.Context) (int64, error) { + return b.backend.CurrentRevision(ctx) +} diff --git a/pkg/logstructured/logstructured.go b/pkg/logstructured/logstructured.go index 729693b9..90a71480 100644 --- a/pkg/logstructured/logstructured.go +++ b/pkg/logstructured/logstructured.go @@ -489,3 +489,7 @@ func filter(events []*server.Event, rev int64) []*server.Event { func (l *LogStructured) DbSize(ctx context.Context) (int64, error) { return l.log.DbSize(ctx) } + +func (l *LogStructured) CurrentRevision(ctx context.Context) (int64, error) { + return l.log.CurrentRevision(ctx) +} diff --git a/pkg/server/types.go b/pkg/server/types.go index 797e1dd8..989f71d7 100644 --- a/pkg/server/types.go +++ b/pkg/server/types.go @@ -27,6 +27,7 @@ type Backend interface { Update(ctx context.Context, key string, value []byte, revision, lease int64) (int64, *KeyValue, bool, error) Watch(ctx context.Context, key string, revision int64) WatchResult DbSize(ctx context.Context) (int64, error) + CurrentRevision(ctx context.Context) (int64, error) } type Dialect interface { diff --git a/pkg/server/watch.go b/pkg/server/watch.go index b6a7af15..b20bbb8f 100644 --- a/pkg/server/watch.go +++ b/pkg/server/watch.go @@ -2,8 +2,10 @@ package server import ( "context" + "math/rand" "sync" "sync/atomic" + "time" "github.com/sirupsen/logrus" "go.etcd.io/etcd/api/v3/etcdserverpb" @@ -12,20 +14,40 @@ import ( ) var watchID int64 +var progressReportInterval = 10 * time.Minute // explicit interface check var _ etcdserverpb.WatchServer = (*KVServerBridge)(nil) +// GetProgressReportInterval returns the current progress report interval, with some jitter +func GetProgressReportInterval() time.Duration { + interval := progressReportInterval + + // add rand(1/10*progressReportInterval) as jitter so that kine will not + // send progress notifications to watchers at the same time even when watchers + // are created at the same time. + jitter := time.Duration(rand.Int63n(int64(interval) / 10)) + + return interval + jitter +} + func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error { + interval := GetProgressReportInterval() + progressTicker := time.NewTicker(interval) + defer progressTicker.Stop() + w := watcher{ - server: ws, - backend: s.limited.backend, - watches: map[int64]func(){}, + server: ws, + backend: s.limited.backend, + watches: map[int64]func(){}, + progress: map[int64]int64{}, } defer w.Close() logrus.Tracef("WATCH SERVER CREATE") + go w.DoProgress(ws.Context(), progressTicker) + for { msg, err := ws.Recv() if err != nil { @@ -45,10 +67,12 @@ func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error { type watcher struct { sync.Mutex - wg sync.WaitGroup - backend Backend - server etcdserverpb.Watch_WatchServer - watches map[int64]func() + wg sync.WaitGroup + backend Backend + server etcdserverpb.Watch_WatchServer + watches map[int64]func() + progress map[int64]int64 + progressRev int64 } func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) { @@ -63,7 +87,11 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) key := string(r.Key) - logrus.Tracef("WATCH START id=%d, count=%d, key=%s, revision=%d", id, len(w.watches), key, r.StartRevision) + if r.ProgressNotify { + w.progress[id] = w.progressRev + } + + logrus.Tracef("WATCH START id=%d, count=%d, key=%s, revision=%d, progressNotify=%v", id, len(w.watches), key, r.StartRevision, r.ProgressNotify) go func() { defer w.wg.Done() @@ -111,13 +139,19 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) // Send collected events in a single response if len(events) > 0 { + revision := events[len(events)-1].KV.ModRevision + w.Lock() + if r, ok := w.progress[id]; ok && r == w.progressRev { + w.progress[id] = revision + } + w.Unlock() if logrus.IsLevelEnabled(logrus.TraceLevel) { for _, event := range events { logrus.Tracef("WATCH READ id=%d, key=%s, revision=%d", id, event.KV.Key, event.KV.ModRevision) } } wr := &etcdserverpb.WatchResponse{ - Header: txnHeader(events[len(events)-1].KV.ModRevision), + Header: txnHeader(revision), WatchId: id, Events: toEvents(events...), } @@ -159,6 +193,7 @@ func (w *watcher) Cancel(watchID, revision, compactRev int64, err error) { if cancel, ok := w.watches[watchID]; ok { cancel() delete(w.watches, watchID) + delete(w.progress, watchID) } w.Unlock() @@ -183,9 +218,36 @@ func (w *watcher) Cancel(watchID, revision, compactRev int64, err error) { func (w *watcher) Close() { logrus.Tracef("WATCH SERVER CLOSE") w.Lock() - for _, v := range w.watches { + for id, v := range w.watches { + delete(w.progress, id) v() } w.Unlock() w.wg.Wait() } + +func (w *watcher) DoProgress(ctx context.Context, ticker *time.Ticker) { + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + rev, err := w.backend.CurrentRevision(ctx) + if err != nil { + logrus.Errorf("Failed to get current revision for ProgressNotify: %v", err) + continue + } + + w.Lock() + for id, r := range w.progress { + if r == w.progressRev { + logrus.Tracef("WATCH SEND PROGRESS id=%d, revision=%d", id, rev) + go w.server.Send(&etcdserverpb.WatchResponse{Header: txnHeader(rev), WatchId: id}) + } + w.progress[id] = rev + } + w.progressRev = rev + w.Unlock() + } + } +}