From 1c83a46c6d3fb15aeb757200c6ef9f9e68caa898 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 9 Aug 2016 19:28:16 -0700 Subject: [PATCH] clientv3: handle watchGrpcStream shutdown if prior to goroutine start Fixes #6141 --- clientv3/watch.go | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/clientv3/watch.go b/clientv3/watch.go index ef4aa5304e7..6dd149057f7 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -268,6 +268,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch case reqc <- wr: ok = true case <-wr.ctx.Done(): + wgs.stopIfEmpty() case <-donec: if wgs.closeErr != nil { closeCh <- WatchResponse{closeErr: wgs.closeErr} @@ -385,10 +386,6 @@ func (w *watchGrpcStream) closeStream(ws *watcherStream) { // close subscriber's channel close(ws.outc) delete(w.streams, ws.id) - if len(w.streams) == 0 && w.stopc != nil { - close(w.stopc) - w.stopc = nil - } w.mu.Unlock() } @@ -408,6 +405,14 @@ func (w *watchGrpcStream) run() { w.cancel() }() + // already stopped? + w.mu.RLock() + stopc := w.stopc + w.mu.RUnlock() + if stopc == nil { + return + } + // start a stream with the etcd grpc server if wc, closeErr = w.newWatchClient(); closeErr != nil { return @@ -415,7 +420,6 @@ func (w *watchGrpcStream) run() { var pendingReq, failedReq *watchRequest curReqC := w.reqc - stopc := w.stopc cancelSet := make(map[int64]struct{}) for { @@ -638,9 +642,19 @@ func (w *watchGrpcStream) serveStream(ws *watcherStream) { } w.closeStream(ws) + w.stopIfEmpty() // lazily send cancel message if events on missing id } +func (wgs *watchGrpcStream) stopIfEmpty() { + wgs.mu.Lock() + if len(wgs.streams) == 0 && wgs.stopc != nil { + close(wgs.stopc) + wgs.stopc = nil + } + wgs.mu.Unlock() +} + func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) { ws, rerr := w.resume() if rerr != nil {