Skip to content

Commit

Permalink
Merge pull request #6367 from heyitsanthony/fix-watch-init-reconn
Browse files Browse the repository at this point in the history
clientv3: drain buffered WatchResponses before resuming
  • Loading branch information
xiang90 authored Sep 7, 2016
2 parents f411583 + ad318ee commit a6c905a
Showing 1 changed file with 15 additions and 0 deletions.
15 changes: 15 additions & 0 deletions clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,10 @@ func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error {
w.mu.RUnlock()

for _, ws := range streams {
// drain recvc so no old WatchResponses (e.g., Created messages)
// are processed while resuming
ws.drain()

// pause serveStream
ws.resumec <- -1

Expand Down Expand Up @@ -740,6 +744,17 @@ func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error {
return nil
}

// drain removes all buffered WatchResponses from the stream's receive channel.
func (ws *watcherStream) drain() {
for {
select {
case <-ws.recvc:
default:
return
}
}
}

// toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest)
func (wr *watchRequest) toPB() *pb.WatchRequest {
req := &pb.WatchCreateRequest{
Expand Down

0 comments on commit a6c905a

Please sign in to comment.