From c4ca5adf7d10195ef3d6ef8229153ab73ad0e4a1 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Fri, 22 Dec 2017 14:15:54 -0800 Subject: [PATCH] api/v3rpc: add watch ID to "watchStream.Watch" Signed-off-by: Gyuho Lee --- etcdserver/api/v3rpc/watch.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index e29c4210b627..7c79dace2ae1 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -205,7 +205,7 @@ func (sws *serverWatchStream) recvLoop() error { if !sws.isWatchPermitted(creq) { wr := &pb.WatchResponse{ Header: sws.newResponseHeader(sws.watchStream.Rev()), - WatchId: -1, + WatchId: creq.WatchId, Canceled: true, Created: true, CancelReason: rpctypes.ErrGRPCPermissionDenied.Error(), @@ -225,8 +225,8 @@ func (sws *serverWatchStream) recvLoop() error { if rev == 0 { rev = wsrev + 1 } - id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev, filters...) - if id != -1 { + id, err := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...) + if err == nil { sws.mu.Lock() if creq.ProgressNotify { sws.progress[id] = true @@ -240,7 +240,10 @@ func (sws *serverWatchStream) recvLoop() error { Header: sws.newResponseHeader(wsrev), WatchId: int64(id), Created: true, - Canceled: id == -1, + Canceled: err != nil, + } + if err != nil { + wr.CancelReason = err.Error() } select { case sws.ctrlStream <- wr: