Skip to content

Commit

Permalink
Merge pull request #6339 from xiang90/close
Browse files Browse the repository at this point in the history
grpcproxy: stop watchers in watch groups
  • Loading branch information
xiang90 authored Sep 2, 2016
2 parents 5b14b83 + eded62e commit 81bd381
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 15 deletions.
40 changes: 29 additions & 11 deletions proxy/grpcproxy/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
wp.mu.Unlock()

sws := serverWatchStream{
cw: wp.cw,
groups: &wp.wgs,
singles: make(map[int64]*watcherSingle),
cw: wp.cw,
groups: &wp.wgs,
singles: make(map[int64]*watcherSingle),
inGroups: make(map[int64]struct{}),

id: wp.nextStreamID,
gRPCStream: stream,
Expand All @@ -80,9 +81,10 @@ type serverWatchStream struct {
id int64
cw clientv3.Watcher

mu sync.Mutex // make sure any access of groups and singles is atomic
groups *watchergroups
singles map[int64]*watcherSingle
mu sync.Mutex // make sure any access of groups and singles is atomic
groups *watchergroups
singles map[int64]*watcherSingle
inGroups map[int64]struct{}

gRPCStream pb.Watch_WatchServer

Expand All @@ -94,22 +96,31 @@ type serverWatchStream struct {
}

func (sws *serverWatchStream) close() {
close(sws.watchCh)

var wg sync.WaitGroup
sws.mu.Lock()
wg.Add(len(sws.singles))
wg.Add(len(sws.singles) + len(sws.inGroups))
for _, ws := range sws.singles {
ws.stop()
// copy the range variable to avoid race
copyws := ws
go func() {
<-copyws.stopNotify()
copyws.stop()
wg.Done()
}()
}
for id := range sws.inGroups {
// copy the range variable to avoid race
wid := id
go func() {
sws.groups.removeWatcher(receiverID{streamID: sws.id, watcherID: wid})
wg.Done()
}()
}
sws.inGroups = nil
sws.mu.Unlock()

wg.Wait()

close(sws.watchCh)
}

func (sws *serverWatchStream) recvLoop() error {
Expand Down Expand Up @@ -176,6 +187,7 @@ func (sws *serverWatchStream) addCoalescedWatcher(w watcher) {

rid := receiverID{streamID: sws.id, watcherID: w.id}
sws.groups.addWatcher(rid, w)
sws.inGroups[w.id] = struct{}{}
}

func (sws *serverWatchStream) addDedicatedWatcher(w watcher, rev int64) {
Expand All @@ -201,8 +213,13 @@ func (sws *serverWatchStream) maybeCoalesceWatcher(ws watcherSingle) bool {
defer sws.mu.Unlock()

rid := receiverID{streamID: sws.id, watcherID: ws.w.id}
// do not add new watchers when stream is closing
if sws.inGroups == nil {
return false
}
if sws.groups.maybeJoinWatcherSingle(rid, ws) {
delete(sws.singles, ws.w.id)
sws.inGroups[ws.w.id] = struct{}{}
return true
}
return false
Expand Down Expand Up @@ -236,6 +253,7 @@ func (sws *serverWatchStream) removeWatcher(id int64) {

rev, ok = sws.groups.removeWatcher(receiverID{streamID: sws.id, watcherID: id})
if ok {
delete(sws.inGroups, id)
return
}

Expand Down
5 changes: 1 addition & 4 deletions proxy/grpcproxy/watcher_single.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,5 @@ func (ws watcherSingle) canPromote() bool {

func (ws watcherSingle) stop() {
ws.cancel()
}

func (ws watcherSingle) stopNotify() <-chan struct{} {
return ws.donec
<-ws.donec
}

0 comments on commit 81bd381

Please sign in to comment.