diff --git a/clientv3/op.go b/clientv3/op.go index c6ec5bf5200c..cdf00db924a6 100644 --- a/clientv3/op.go +++ b/clientv3/op.go @@ -53,6 +53,9 @@ type Op struct { // for watch, put, delete prevKV bool + // for watch + noFragment bool + // for put ignoreValue bool ignoreLease bool @@ -466,6 +469,12 @@ func WithPrevKV() OpOption { } } +// WithNoFragment to receive raw watch response +// without fragmentation. +func WithNoFragment() OpOption { + return func(op *Op) { op.noFragment = true } +} + // WithIgnoreValue updates the key using its current value. // This option can not be combined with non-empty values. // Returns an error if the key does not exist. diff --git a/clientv3/watch.go b/clientv3/watch.go index 9452d0d92e40..2ac0c3059760 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -178,6 +178,10 @@ type watchRequest struct { createdNotify bool // progressNotify is for progress updates progressNotify bool + // noFragment "true" to receive raw watch response without fragment + // server split watch response by default in case watch events exceed + // server request limit, which is default 1.5 MiB + noFragment bool // filters is the list of events to filter out filters []pb.WatchCreateRequest_FilterType // get the previous key-value pair before the event happens @@ -232,7 +236,7 @@ func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false } func (vc *valCtx) Done() <-chan struct{} { return valCtxCh } func (vc *valCtx) Err() error { return nil } -func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream { +func (w *watcher) newWatcherGrpcStream(inctx context.Context, op Op) *watchGrpcStream { ctx, cancel := context.WithCancel(&valCtx{inctx}) wgs := &watchGrpcStream{ owner: w, @@ -274,6 +278,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch progressNotify: ow.progressNotify, filters: filters, prevKV: ow.prevKV, + noFragment: ow.noFragment, retc: make(chan chan WatchResponse, 1), } @@ -291,7 +296,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch } wgs := w.streams[ctxKey] if wgs == nil { - wgs = w.newWatcherGrpcStream(ctx) + wgs = w.newWatcherGrpcStream(ctx, ow) w.streams[ctxKey] = wgs } donec := wgs.donec @@ -450,7 +455,7 @@ func (w *watchGrpcStream) run() { } cancelSet := make(map[int64]struct{}) - + var cur *pb.WatchResponse for { select { // Watch() requested @@ -477,8 +482,17 @@ func (w *watchGrpcStream) run() { } // New events from the watch client case pbresp := <-w.respc: + if cur == nil || pbresp.Created || pbresp.Canceled { + cur = pbresp + } else if cur != nil && cur.WatchId == pbresp.WatchId { + // combine with previous events + cur.Events = append(cur.Events, pbresp.Events...) + cur.Fragment = pbresp.Fragment + } + switch { case pbresp.Created: + cur = nil // reset for next iteration // response to head of queue creation if ws := w.resuming[0]; ws != nil { w.addSubstream(pbresp, ws) @@ -489,15 +503,21 @@ func (w *watchGrpcStream) run() { wc.Send(ws.initReq.toPB()) } case pbresp.Canceled && pbresp.CompactRevision == 0: + cur = nil // reset for next iteration delete(cancelSet, pbresp.WatchId) if ws, ok := w.substreams[pbresp.WatchId]; ok { // signal to stream goroutine to update closingc close(ws.recvc) closing[ws] = struct{}{} } + case cur.Fragment: + // continue until all fragments arrive + continue default: // dispatch to appropriate watch stream - if ok := w.dispatchEvent(pbresp); ok { + ok := w.dispatchEvent(cur) + cur = nil // reset for next iteration + if ok { break } // watch response on unexpected watch id; cancel id @@ -820,6 +840,7 @@ func (wr *watchRequest) toPB() *pb.WatchRequest { ProgressNotify: wr.progressNotify, Filters: wr.filters, PrevKv: wr.prevKV, + Fragment: !wr.noFragment, } cr := &pb.WatchRequest_CreateRequest{CreateRequest: req} return &pb.WatchRequest{RequestUnion: cr}