diff --git a/server/etcdserver/api/v3rpc/watch.go b/server/etcdserver/api/v3rpc/watch.go index 5153007258d3..679586e9b549 100644 --- a/server/etcdserver/api/v3rpc/watch.go +++ b/server/etcdserver/api/v3rpc/watch.go @@ -360,10 +360,9 @@ func (sws *serverWatchStream) recvLoop() error { } case *pb.WatchRequest_ProgressRequest: if uv.ProgressRequest != nil { - sws.ctrlStream <- &pb.WatchResponse{ - Header: sws.newResponseHeader(sws.watchStream.Rev()), - WatchId: clientv3.InvalidWatchID, // response is not associated with any WatchId and will be broadcast to all watch channels - } + // Request progress for all watchers, + // force generation of a response + sws.watchStream.RequestProgressAll(true) } default: // we probably should not shutdown the entire stream when @@ -408,6 +407,7 @@ func (sws *serverWatchStream) sendLoop() { // either return []*mvccpb.Event from the mvcc package // or define protocol buffer with []mvccpb.Event. evs := wresp.Events + progressNotify := len(evs) == 0 events := make([]*mvccpb.Event, len(evs)) sws.mu.RLock() needPrevKV := sws.prevKV[wresp.WatchID] @@ -432,11 +432,15 @@ func (sws *serverWatchStream) sendLoop() { Canceled: canceled, } - if _, okID := ids[wresp.WatchID]; !okID { - // buffer if id not yet announced - wrs := append(pending[wresp.WatchID], wr) - pending[wresp.WatchID] = wrs - continue + // Progress notifications can have WatchID -1 + // if they announce on behalf of multiple watchers + if !progressNotify || wresp.WatchID != clientv3.InvalidWatchID { + if _, okID := ids[wresp.WatchID]; !okID { + // buffer if id not yet announced + wrs := append(pending[wresp.WatchID], wr) + pending[wresp.WatchID] = wrs + continue + } } mvcc.ReportEventReceived(len(evs)) diff --git a/server/storage/mvcc/watchable_store.go b/server/storage/mvcc/watchable_store.go index adf07f7755b0..89f2ba8dd494 100644 --- a/server/storage/mvcc/watchable_store.go +++ b/server/storage/mvcc/watchable_store.go @@ -41,6 +41,7 @@ var ( type watchable interface { watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) progress(w *watcher) + progress_all(force bool) rev() int64 } @@ -62,6 +63,9 @@ type watchableStore struct { // The key of the map is the key that the watcher watches on. synced watcherGroup + // Whether to generate a progress notification once all watchers are synchronised + progressOnSync bool + stopc chan struct{} wg sync.WaitGroup } @@ -79,11 +83,12 @@ func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg S lg = zap.NewNop() } s := &watchableStore{ - store: NewStore(lg, b, le, cfg), - victimc: make(chan struct{}, 1), - unsynced: newWatcherGroup(), - synced: newWatcherGroup(), - stopc: make(chan struct{}), + store: NewStore(lg, b, le, cfg), + victimc: make(chan struct{}, 1), + unsynced: newWatcherGroup(), + synced: newWatcherGroup(), + stopc: make(chan struct{}), + progressOnSync: false, } s.store.ReadView = &readView{s} s.store.WriteView = &writeView{s} @@ -407,6 +412,15 @@ func (s *watchableStore) syncWatchers() int { } slowWatcherGauge.Set(float64(s.unsynced.size() + vsz)) + // Deferred progress notification left to send when synced? + if s.progressOnSync && s.unsynced.size() == 0 { + for w, _ := range s.synced.watchers { + w.send(WatchResponse{WatchID: -1, Revision: s.rev()}) + break + } + s.progressOnSync = false + } + return s.unsynced.size() } @@ -482,6 +496,27 @@ func (s *watchableStore) progress(w *watcher) { } } +func (s *watchableStore) progress_all(force bool) { + s.mu.RLock() + defer s.mu.RUnlock() + + // Any watcher unsynced? + if s.unsynced.size() > 0 { + // If forced: Defer progress until successfully synced + if force { + s.progressOnSync = true + } + + } else { + // If all watchers are synchronised, send out progress + // watch response on first watcher (if any) + for w, _ := range s.synced.watchers { + w.send(WatchResponse{WatchID: -1, Revision: s.rev()}) + break + } + } +} + type watcher struct { // the watcher key key []byte diff --git a/server/storage/mvcc/watcher.go b/server/storage/mvcc/watcher.go index 7d2490b1d6e9..2505c64b15f4 100644 --- a/server/storage/mvcc/watcher.go +++ b/server/storage/mvcc/watcher.go @@ -58,6 +58,14 @@ type WatchStream interface { // of the watchers since the watcher is currently synced. RequestProgress(id WatchID) + // RequestProgressAll requests a progress notification for the + // entire watcher group. The response will only be sent if + // all watchers are synced - or once they become synced, if + // forced. The responses will be sent through the + // WatchRespone Chan of the first watcher of this stream, if + // any. + RequestProgressAll(force bool) + // Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be // returned. Cancel(id WatchID) error @@ -188,3 +196,7 @@ func (ws *watchStream) RequestProgress(id WatchID) { } ws.watchable.progress(w) } + +func (ws *watchStream) RequestProgressAll(force bool) { + ws.watchable.progress_all(force) +}