diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index e29c4210b627..7c79dace2ae1 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -205,7 +205,7 @@ func (sws *serverWatchStream) recvLoop() error { if !sws.isWatchPermitted(creq) { wr := &pb.WatchResponse{ Header: sws.newResponseHeader(sws.watchStream.Rev()), - WatchId: -1, + WatchId: creq.WatchId, Canceled: true, Created: true, CancelReason: rpctypes.ErrGRPCPermissionDenied.Error(), @@ -225,8 +225,8 @@ func (sws *serverWatchStream) recvLoop() error { if rev == 0 { rev = wsrev + 1 } - id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev, filters...) - if id != -1 { + id, err := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...) + if err == nil { sws.mu.Lock() if creq.ProgressNotify { sws.progress[id] = true @@ -240,7 +240,10 @@ func (sws *serverWatchStream) recvLoop() error { Header: sws.newResponseHeader(wsrev), WatchId: int64(id), Created: true, - Canceled: id == -1, + Canceled: err != nil, + } + if err != nil { + wr.CancelReason = err.Error() } select { case sws.ctrlStream <- wr: