diff --git a/clientv3/watch.go b/clientv3/watch.go index dbb73f4a21c1..24a1b8d6229f 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -367,7 +367,8 @@ func (w *watcher) closeStream(wgs *watchGrpcStream) { } func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) { - if resp.WatchId == -1 { + // check watch ID for backward compatibility (<= v3.3) + if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") { // failed; no channel close(ws.recvc) return @@ -453,6 +454,7 @@ func (w *watchGrpcStream) run() { // Watch() requested case wreq := <-w.reqc: outc := make(chan WatchResponse, 1) + // TODO: pass custom watch ID? ws := &watcherStream{ initReq: *wreq, id: -1, @@ -553,6 +555,7 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool { for i, ev := range pbresp.Events { events[i] = (*Event)(ev) } + // TODO: return watch ID? wr := &WatchResponse{ Header: *pbresp.Header, Events: events,