Skip to content

Commit

Permalink
api/v3rpc: add watch ID to "watchStream.Watch"
Browse files Browse the repository at this point in the history
Signed-off-by: Gyuho Lee <[email protected]>
  • Loading branch information
gyuho committed Dec 22, 2017
1 parent a8273cd commit c4ca5ad
Showing 1 changed file with 7 additions and 4 deletions.
11 changes: 7 additions & 4 deletions etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (sws *serverWatchStream) recvLoop() error {
if !sws.isWatchPermitted(creq) {
wr := &pb.WatchResponse{
Header: sws.newResponseHeader(sws.watchStream.Rev()),
WatchId: -1,
WatchId: creq.WatchId,
Canceled: true,
Created: true,
CancelReason: rpctypes.ErrGRPCPermissionDenied.Error(),
Expand All @@ -225,8 +225,8 @@ func (sws *serverWatchStream) recvLoop() error {
if rev == 0 {
rev = wsrev + 1
}
id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev, filters...)
if id != -1 {
id, err := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...)
if err == nil {
sws.mu.Lock()
if creq.ProgressNotify {
sws.progress[id] = true
Expand All @@ -240,7 +240,10 @@ func (sws *serverWatchStream) recvLoop() error {
Header: sws.newResponseHeader(wsrev),
WatchId: int64(id),
Created: true,
Canceled: id == -1,
Canceled: err != nil,
}
if err != nil {
wr.CancelReason = err.Error()
}
select {
case sws.ctrlStream <- wr:
Expand Down

0 comments on commit c4ca5ad

Please sign in to comment.