From a6606b13c740408c9dc6701c5d8a2b447a9eebb2 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Fri, 23 Sep 2016 16:47:25 -0700 Subject: [PATCH] clientv3: simplify watch synchronization Was more complicated than it needed to be and didn't really work in the first place. Restructured watcher registation to use a queue. --- clientv3/watch.go | 513 ++++++++++++++++++++-------------------------- 1 file changed, 221 insertions(+), 292 deletions(-) diff --git a/clientv3/watch.go b/clientv3/watch.go index 78d810d41d37..b07877832576 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -29,8 +29,6 @@ import ( const ( EventTypeDelete = mvccpb.DELETE EventTypePut = mvccpb.PUT - - closeSendErrTimeout = 250 * time.Millisecond ) type Event mvccpb.Event @@ -106,6 +104,7 @@ type watcher struct { streams map[string]*watchGrpcStream } +// watchGrpcStream tracks all watch resources attached to a single grpc stream. type watchGrpcStream struct { owner *watcher remote pb.WatchClient @@ -116,10 +115,10 @@ type watchGrpcStream struct { ctxKey string cancel context.CancelFunc - // mu protects the streams map - mu sync.RWMutex - // streams holds all active watchers - streams map[int64]*watcherStream + // substreams holds all active watchers on this grpc stream + substreams map[int64]*watcherStream + // resuming holds all resuming watchers on this grpc stream + resuming []*watcherStream // reqc sends a watch request from Watch() to the main goroutine reqc chan *watchRequest @@ -134,7 +133,9 @@ type watchGrpcStream struct { // closingc gets the watcherStream of closing watchers closingc chan *watcherStream - // the error that closed the watch stream + // resumec closes to signal that all substreams should begin resuming + resumec chan struct{} + // closeErr is the error that closed the watch stream closeErr error } @@ -162,15 +163,18 @@ type watcherStream struct { initReq watchRequest // outc publishes watch responses to subscriber - outc chan<- WatchResponse + outc chan WatchResponse // recvc buffers watch responses before publishing recvc chan *WatchResponse - id int64 + // donec closes when the watcherStream goroutine stops. + donec chan struct{} + // closing is set to true when stream should be scheduled to shutdown. + closing bool + // id is the registered watch id for on the grpc stream + id int64 - // lastRev is revision last successfully sent over outc - lastRev int64 - // resumec indicates the stream must recover at a given revision - resumec chan int64 + // buf holds all events received from etcd but not yet consumed by the client + buf []*WatchResponse } func NewWatcher(c *Client) Watcher { @@ -198,12 +202,12 @@ func (vc *valCtx) Err() error { return nil } func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream { ctx, cancel := context.WithCancel(&valCtx{inctx}) wgs := &watchGrpcStream{ - owner: w, - remote: w.remote, - ctx: ctx, - ctxKey: fmt.Sprintf("%v", inctx), - cancel: cancel, - streams: make(map[int64]*watcherStream), + owner: w, + remote: w.remote, + ctx: ctx, + ctxKey: fmt.Sprintf("%v", inctx), + cancel: cancel, + substreams: make(map[int64]*watcherStream), respc: make(chan *pb.WatchResponse), reqc: make(chan *watchRequest), @@ -211,6 +215,7 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream { donec: make(chan struct{}), errc: make(chan error, 1), closingc: make(chan *watcherStream), + resumec: make(chan struct{}), } go wgs.run() return wgs @@ -220,8 +225,6 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream { func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan { ow := opWatch(key, opts...) - retc := make(chan chan WatchResponse, 1) - var filters []pb.WatchCreateRequest_FilterType if ow.filterPut { filters = append(filters, pb.WatchCreateRequest_NOPUT) @@ -239,7 +242,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch progressNotify: ow.progressNotify, filters: filters, prevKV: ow.prevKV, - retc: retc, + retc: make(chan chan WatchResponse, 1), } ok := false @@ -283,7 +286,10 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch // receive channel if ok { select { - case ret := <-retc: + case ret, ok := <-wr.retc: + if !ok { + return w.Watch(ctx, key, opts...) + } return ret case <-ctx.Done(): case <-donec: @@ -314,12 +320,7 @@ func (w *watcher) Close() (err error) { } func (w *watchGrpcStream) Close() (err error) { - w.mu.Lock() - if w.stopc != nil { - close(w.stopc) - w.stopc = nil - } - w.mu.Unlock() + close(w.stopc) <-w.donec select { case err = <-w.errc: @@ -328,71 +329,57 @@ func (w *watchGrpcStream) Close() (err error) { return toErr(w.ctx, err) } -func (w *watchGrpcStream) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) { - if pendingReq == nil { - // no pending request; ignore - return - } - if resp.Canceled || resp.CompactRevision != 0 { - // a cancel at id creation time means the start revision has - // been compacted out of the store - ret := make(chan WatchResponse, 1) - ret <- WatchResponse{ - Header: *resp.Header, - CompactRevision: resp.CompactRevision, - Canceled: true} - close(ret) - pendingReq.retc <- ret - return +func (w *watcher) closeStream(wgs *watchGrpcStream) { + w.mu.Lock() + close(wgs.donec) + wgs.cancel() + if w.streams != nil { + delete(w.streams, wgs.ctxKey) } + w.mu.Unlock() +} - ret := make(chan WatchResponse) +func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) { if resp.WatchId == -1 { // failed; no channel - close(ret) - pendingReq.retc <- ret + close(ws.recvc) return } + ws.id = resp.WatchId + w.substreams[ws.id] = ws +} - ws := &watcherStream{ - initReq: *pendingReq, - id: resp.WatchId, - outc: ret, - // buffered so unlikely to block on sending while holding mu - recvc: make(chan *WatchResponse, 4), - resumec: make(chan int64), - } - - if pendingReq.rev == 0 { - // note the header revision so that a put following a current watcher - // disconnect will arrive on the watcher channel after reconnect - ws.initReq.rev = resp.Header.Revision +func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchResponse) { + select { + case ws.outc <- *resp: + case <-ws.initReq.ctx.Done(): + case <-w.donec: } - - w.mu.Lock() - w.streams[ws.id] = ws - w.mu.Unlock() - - // pass back the subscriber channel for the watcher - pendingReq.retc <- ret - - // send messages to subscriber - go w.serveStream(ws) + close(ws.outc) } -func (w *watchGrpcStream) closeStream(ws *watcherStream) bool { - w.mu.Lock() - // cancels request stream; subscriber receives nil channel - close(ws.initReq.retc) +func (w *watchGrpcStream) closeSubstream(ws *watcherStream) { + // send channel response in case stream was never established + select { + case ws.initReq.retc <- ws.outc: + default: + } // close subscriber's channel - close(ws.outc) - delete(w.streams, ws.id) - empty := len(w.streams) == 0 - if empty && w.stopc != nil { - w.stopc = nil + if closeErr := w.closeErr; closeErr != nil { + go w.sendCloseSubstream(ws, &WatchResponse{closeErr: w.closeErr}) + } else { + close(ws.outc) + } + if ws.id != -1 { + delete(w.substreams, ws.id) + return + } + for i := range w.resuming { + if w.resuming[i] == ws { + w.resuming[i] = nil + return + } } - w.mu.Unlock() - return empty } // run is the root of the goroutines for managing a watcher client @@ -400,67 +387,79 @@ func (w *watchGrpcStream) run() { var wc pb.Watch_WatchClient var closeErr error + // substreams marked to close but goroutine still running; needed for + // avoiding double-closing recvc on grpc stream teardown + closing := make(map[*watcherStream]struct{}) + defer func() { - w.owner.mu.Lock() w.closeErr = closeErr - if w.owner.streams != nil { - delete(w.owner.streams, w.ctxKey) + // shutdown substreams and resuming substreams + for _, ws := range w.substreams { + if _, ok := closing[ws]; !ok { + close(ws.recvc) + } + } + for _, ws := range w.resuming { + if _, ok := closing[ws]; ws != nil && !ok { + close(ws.recvc) + } + } + w.joinSubstreams() + for toClose := len(w.substreams) + len(w.resuming); toClose > 0; toClose-- { + w.closeSubstream(<-w.closingc) } - close(w.donec) - w.owner.mu.Unlock() - w.cancel() - }() - // already stopped? - w.mu.RLock() - stopc := w.stopc - w.mu.RUnlock() - if stopc == nil { - return - } + w.owner.closeStream(w) + }() // start a stream with the etcd grpc server if wc, closeErr = w.newWatchClient(); closeErr != nil { return } - var pendingReq, failedReq *watchRequest - curReqC := w.reqc cancelSet := make(map[int64]struct{}) for { select { // Watch() requested - case pendingReq = <-curReqC: - // no more watch requests until there's a response - curReqC = nil - if err := wc.Send(pendingReq.toPB()); err == nil { - // pendingReq now waits on w.respc - break + case wreq := <-w.reqc: + outc := make(chan WatchResponse, 1) + ws := &watcherStream{ + initReq: *wreq, + id: -1, + outc: outc, + // unbufffered so resumes won't cause repeat events + recvc: make(chan *WatchResponse), + } + + ws.donec = make(chan struct{}) + go w.serveSubstream(ws, w.resumec) + + // queue up for watcher creation/resume + w.resuming = append(w.resuming, ws) + if len(w.resuming) == 1 { + // head of resume queue, can register a new watcher + wc.Send(ws.initReq.toPB()) } - failedReq = pendingReq // New events from the watch client case pbresp := <-w.respc: switch { case pbresp.Created: - // response to pending req, try to add - w.addStream(pbresp, pendingReq) - pendingReq = nil - curReqC = w.reqc - w.dispatchEvent(pbresp) + // response to head of queue creation + if ws := w.resuming[0]; ws != nil { + w.addSubstream(pbresp, ws) + w.dispatchEvent(pbresp) + w.resuming[0] = nil + } + if ws := w.nextResume(); ws != nil { + wc.Send(ws.initReq.toPB()) + } case pbresp.Canceled: delete(cancelSet, pbresp.WatchId) - // shutdown serveStream, if any - w.mu.Lock() - if ws, ok := w.streams[pbresp.WatchId]; ok { + if ws, ok := w.substreams[pbresp.WatchId]; ok { + // signal to stream goroutine to update closingc close(ws.recvc) - delete(w.streams, ws.id) - } - numStreams := len(w.streams) - w.mu.Unlock() - if numStreams == 0 { - // don't leak watcher streams - return + closing[ws] = struct{}{} } default: // dispatch to appropriate watch stream @@ -481,7 +480,6 @@ func (w *watchGrpcStream) run() { wc.Send(req) } // watch client failed to recv; spawn another if possible - // TODO report watch client errors from errc? case err := <-w.errc: if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader { closeErr = err @@ -490,43 +488,41 @@ func (w *watchGrpcStream) run() { if wc, closeErr = w.newWatchClient(); closeErr != nil { return } - curReqC = w.reqc - if pendingReq != nil { - failedReq = pendingReq + if ws := w.nextResume(); ws != nil { + wc.Send(ws.initReq.toPB()) } cancelSet = make(map[int64]struct{}) - case <-stopc: + case <-w.stopc: return case ws := <-w.closingc: - if w.closeStream(ws) { + w.closeSubstream(ws) + delete(closing, ws) + if len(w.substreams)+len(w.resuming) == 0 { + // no more watchers on this stream, shutdown return } } + } +} - // send failed; queue for retry - if failedReq != nil { - go func(wr *watchRequest) { - select { - case w.reqc <- wr: - case <-wr.ctx.Done(): - case <-w.donec: - } - }(pendingReq) - failedReq = nil - pendingReq = nil +// nextResume chooses the next resuming to register with the grpc stream. Abandoned +// streams are marked as nil in the queue since the head must wait for its inflight registration. +func (w *watchGrpcStream) nextResume() *watcherStream { + for len(w.resuming) != 0 { + if w.resuming[0] != nil { + return w.resuming[0] } + w.resuming = w.resuming[1:len(w.resuming)] } + return nil } // dispatchEvent sends a WatchResponse to the appropriate watcher stream func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool { - w.mu.RLock() - defer w.mu.RUnlock() - ws, ok := w.streams[pbresp.WatchId] + ws, ok := w.substreams[pbresp.WatchId] if !ok { return false } - events := make([]*Event, len(pbresp.Events)) for i, ev := range pbresp.Events { events[i] = (*Event)(ev) @@ -538,7 +534,11 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool { Created: pbresp.Created, Canceled: pbresp.Canceled, } - ws.recvc <- wr + select { + case ws.recvc <- wr: + case <-ws.donec: + return false + } return true } @@ -561,140 +561,126 @@ func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) { } } -// serveStream forwards watch responses from run() to the subscriber -func (w *watchGrpcStream) serveStream(ws *watcherStream) { +// serveSubstream forwards watch responses from run() to the subscriber +func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) { + if ws.closing { + panic("created substream goroutine but substream is closing") + } + + // nextRev is the minimum expected next revision + nextRev := ws.initReq.rev + resuming := false defer func() { - // signal that this watcherStream is finished - select { - case w.closingc <- ws: - case <-w.donec: - w.closeStream(ws) + if !resuming { + ws.closing = true + } + ws.initReq.rev = nextRev + close(ws.donec) + if ws.closing { + w.closingc <- ws } }() - var closeErr error emptyWr := &WatchResponse{} - wrs := []*WatchResponse{} - resuming := false - closing := false - for !closing { + for { curWr := emptyWr outc := ws.outc - // ignore created event if create notify is not requested or - // we already sent the initial created event (when we are on the resume path). - if len(wrs) > 0 && wrs[0].Created && - (!ws.initReq.createdNotify || ws.lastRev != 0) { - wrs = wrs[1:] + if len(ws.buf) > 0 && ws.buf[0].Created { + select { + case ws.initReq.retc <- ws.outc: + // send first creation event and only if requested + if !ws.initReq.createdNotify { + ws.buf = ws.buf[1:] + } + default: + } } - if len(wrs) > 0 { - curWr = wrs[0] + if len(ws.buf) > 0 { + curWr = ws.buf[0] } else { outc = nil } select { case outc <- *curWr: - if wrs[0].Err() != nil { - closing = true - break - } - var newRev int64 - if len(wrs[0].Events) > 0 { - newRev = wrs[0].Events[len(wrs[0].Events)-1].Kv.ModRevision - } else { - newRev = wrs[0].Header.Revision - } - if newRev != ws.lastRev { - ws.lastRev = newRev + if ws.buf[0].Err() != nil { + return } - wrs[0] = nil - wrs = wrs[1:] + ws.buf[0] = nil + ws.buf = ws.buf[1:] case wr, ok := <-ws.recvc: if !ok { - // shutdown from closeStream + // shutdown from closeSubstream return } - // resume up to last seen event if disconnected - if resuming && wr.Err() == nil { - resuming = false - // trim events already seen - for i := 0; i < len(wr.Events); i++ { - if wr.Events[i].Kv.ModRevision > ws.lastRev { - wr.Events = wr.Events[i:] - break - } - } - // only forward new events - if wr.Events[0].Kv.ModRevision == ws.lastRev { - break - } + // TODO pause channel if buffer gets too large + ws.buf = append(ws.buf, wr) + nextRev = wr.Header.Revision + if len(wr.Events) > 0 { + nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1 } - resuming = false - // TODO don't keep buffering if subscriber stops reading - wrs = append(wrs, wr) - case resumeRev := <-ws.resumec: - wrs = nil - resuming = true - if resumeRev == -1 { - // pause serving stream while resume gets set up - break - } - if resumeRev != ws.lastRev { - panic("unexpected resume revision") - } - case <-w.donec: - closing = true - closeErr = w.closeErr case <-ws.initReq.ctx.Done(): - closing = true - } - } - - // try to send off close error - if closeErr != nil { - select { - case ws.outc <- WatchResponse{closeErr: w.closeErr}: - case <-w.donec: - case <-time.After(closeSendErrTimeout): + return + case <-resumec: + resuming = true + return } } - // lazily send cancel message if events on missing id } func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) { - ws, rerr := w.resume() - if rerr != nil { - return nil, rerr + // connect to grpc stream + wc, err := w.openWatchClient() + if err != nil { + return nil, v3rpc.Error(err) + } + // mark all substreams as resuming + if len(w.substreams)+len(w.resuming) > 0 { + close(w.resumec) + w.resumec = make(chan struct{}) + w.joinSubstreams() + for _, ws := range w.substreams { + ws.id = -1 + w.resuming = append(w.resuming, ws) + } + for _, ws := range w.resuming { + if ws == nil || ws.closing { + continue + } + ws.donec = make(chan struct{}) + go w.serveSubstream(ws, w.resumec) + } } - go w.serveWatchClient(ws) - return ws, nil + w.substreams = make(map[int64]*watcherStream) + // receive data from new grpc stream + go w.serveWatchClient(wc) + return wc, nil } -// resume creates a new WatchClient with all current watchers reestablished -func (w *watchGrpcStream) resume() (ws pb.Watch_WatchClient, err error) { - for { - if ws, err = w.openWatchClient(); err != nil { - break - } else if err = w.resumeWatchers(ws); err == nil { - break +// joinSubstream waits for all substream goroutines to complete +func (w *watchGrpcStream) joinSubstreams() { + for _, ws := range w.substreams { + <-ws.donec + } + for _, ws := range w.resuming { + if ws != nil { + <-ws.donec } } - return ws, v3rpc.Error(err) } // openWatchClient retries opening a watchclient until retryConnection fails func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) { for { - w.mu.Lock() - stopc := w.stopc - w.mu.Unlock() - if stopc == nil { + select { + case <-w.stopc: if err == nil { - err = context.Canceled + return nil, context.Canceled } return nil, err + default: } if ws, err = w.remote.Watch(w.ctx, grpc.FailFast(false)); ws != nil && err == nil { break @@ -706,63 +692,6 @@ func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) return ws, nil } -// resumeWatchers rebuilds every registered watcher on a new client -func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error { - w.mu.RLock() - streams := make([]*watcherStream, 0, len(w.streams)) - for _, ws := range w.streams { - streams = append(streams, ws) - } - w.mu.RUnlock() - - for _, ws := range streams { - // drain recvc so no old WatchResponses (e.g., Created messages) - // are processed while resuming - ws.drain() - - // pause serveStream - ws.resumec <- -1 - - // reconstruct watcher from initial request - if ws.lastRev != 0 { - ws.initReq.rev = ws.lastRev - } - if err := wc.Send(ws.initReq.toPB()); err != nil { - return err - } - - // wait for request ack - resp, err := wc.Recv() - if err != nil { - return err - } else if len(resp.Events) != 0 || !resp.Created { - return fmt.Errorf("watcher: unexpected response (%+v)", resp) - } - - // id may be different since new remote watcher; update map - w.mu.Lock() - delete(w.streams, ws.id) - ws.id = resp.WatchId - w.streams[ws.id] = ws - w.mu.Unlock() - - // unpause serveStream - ws.resumec <- ws.lastRev - } - return nil -} - -// drain removes all buffered WatchResponses from the stream's receive channel. -func (ws *watcherStream) drain() { - for { - select { - case <-ws.recvc: - default: - return - } - } -} - // toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest) func (wr *watchRequest) toPB() *pb.WatchRequest { req := &pb.WatchCreateRequest{