Skip to content

Commit

Permalink
Merge pull request #6142 from heyitsanthony/fix-cancel-watch-imm
Browse files Browse the repository at this point in the history
clientv3: handle watchGrpcStream shutdown if prior to goroutine start
  • Loading branch information
Anthony Romano authored Aug 10, 2016
2 parents 2d3eda4 + 1c83a46 commit 81f5e31
Showing 1 changed file with 19 additions and 5 deletions.
24 changes: 19 additions & 5 deletions clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
case reqc <- wr:
ok = true
case <-wr.ctx.Done():
wgs.stopIfEmpty()
case <-donec:
if wgs.closeErr != nil {
closeCh <- WatchResponse{closeErr: wgs.closeErr}
Expand Down Expand Up @@ -385,10 +386,6 @@ func (w *watchGrpcStream) closeStream(ws *watcherStream) {
// close subscriber's channel
close(ws.outc)
delete(w.streams, ws.id)
if len(w.streams) == 0 && w.stopc != nil {
close(w.stopc)
w.stopc = nil
}
w.mu.Unlock()
}

Expand All @@ -408,14 +405,21 @@ func (w *watchGrpcStream) run() {
w.cancel()
}()

// already stopped?
w.mu.RLock()
stopc := w.stopc
w.mu.RUnlock()
if stopc == nil {
return
}

// start a stream with the etcd grpc server
if wc, closeErr = w.newWatchClient(); closeErr != nil {
return
}

var pendingReq, failedReq *watchRequest
curReqC := w.reqc
stopc := w.stopc
cancelSet := make(map[int64]struct{})

for {
Expand Down Expand Up @@ -638,9 +642,19 @@ func (w *watchGrpcStream) serveStream(ws *watcherStream) {
}

w.closeStream(ws)
w.stopIfEmpty()
// lazily send cancel message if events on missing id
}

func (wgs *watchGrpcStream) stopIfEmpty() {
wgs.mu.Lock()
if len(wgs.streams) == 0 && wgs.stopc != nil {
close(wgs.stopc)
wgs.stopc = nil
}
wgs.mu.Unlock()
}

func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
ws, rerr := w.resume()
if rerr != nil {
Expand Down

0 comments on commit 81f5e31

Please sign in to comment.