Skip to content

Commit

Permalink
clientv3: support "watch fragment"
Browse files Browse the repository at this point in the history
Signed-off-by: Gyuho Lee <[email protected]>
  • Loading branch information
gyuho committed Feb 13, 2018
1 parent fb230b6 commit 6c7f556
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 4 deletions.
9 changes: 9 additions & 0 deletions clientv3/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type Op struct {
// for watch, put, delete
prevKV bool

// for watch
noFragment bool

// for put
ignoreValue bool
ignoreLease bool
Expand Down Expand Up @@ -466,6 +469,12 @@ func WithPrevKV() OpOption {
}
}

// WithNoFragment to receive raw watch response
// without fragmentation.
func WithNoFragment() OpOption {
return func(op *Op) { op.noFragment = true }
}

// WithIgnoreValue updates the key using its current value.
// This option can not be combined with non-empty values.
// Returns an error if the key does not exist.
Expand Down
29 changes: 25 additions & 4 deletions clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ type watchRequest struct {
createdNotify bool
// progressNotify is for progress updates
progressNotify bool
// noFragment "true" to receive raw watch response without fragment
// server split watch response by default in case watch events exceed
// server request limit, which is default 1.5 MiB
noFragment bool
// filters is the list of events to filter out
filters []pb.WatchCreateRequest_FilterType
// get the previous key-value pair before the event happens
Expand Down Expand Up @@ -232,7 +236,7 @@ func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false }
func (vc *valCtx) Done() <-chan struct{} { return valCtxCh }
func (vc *valCtx) Err() error { return nil }

func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
func (w *watcher) newWatcherGrpcStream(inctx context.Context, op Op) *watchGrpcStream {
ctx, cancel := context.WithCancel(&valCtx{inctx})
wgs := &watchGrpcStream{
owner: w,
Expand Down Expand Up @@ -274,6 +278,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
progressNotify: ow.progressNotify,
filters: filters,
prevKV: ow.prevKV,
noFragment: ow.noFragment,
retc: make(chan chan WatchResponse, 1),
}

Expand All @@ -291,7 +296,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
}
wgs := w.streams[ctxKey]
if wgs == nil {
wgs = w.newWatcherGrpcStream(ctx)
wgs = w.newWatcherGrpcStream(ctx, ow)
w.streams[ctxKey] = wgs
}
donec := wgs.donec
Expand Down Expand Up @@ -450,7 +455,7 @@ func (w *watchGrpcStream) run() {
}

cancelSet := make(map[int64]struct{})

var cur *pb.WatchResponse
for {
select {
// Watch() requested
Expand All @@ -477,8 +482,17 @@ func (w *watchGrpcStream) run() {
}
// New events from the watch client
case pbresp := <-w.respc:
if cur == nil || pbresp.Created || pbresp.Canceled {
cur = pbresp
} else if cur != nil && cur.WatchId == pbresp.WatchId {
// combine with previous events
cur.Events = append(cur.Events, pbresp.Events...)
cur.Fragment = pbresp.Fragment
}

switch {
case pbresp.Created:
cur = nil // reset for next iteration
// response to head of queue creation
if ws := w.resuming[0]; ws != nil {
w.addSubstream(pbresp, ws)
Expand All @@ -489,15 +503,21 @@ func (w *watchGrpcStream) run() {
wc.Send(ws.initReq.toPB())
}
case pbresp.Canceled && pbresp.CompactRevision == 0:
cur = nil // reset for next iteration
delete(cancelSet, pbresp.WatchId)
if ws, ok := w.substreams[pbresp.WatchId]; ok {
// signal to stream goroutine to update closingc
close(ws.recvc)
closing[ws] = struct{}{}
}
case cur.Fragment:
// continue until all fragments arrive
continue
default:
// dispatch to appropriate watch stream
if ok := w.dispatchEvent(pbresp); ok {
ok := w.dispatchEvent(cur)
cur = nil // reset for next iteration
if ok {
break
}
// watch response on unexpected watch id; cancel id
Expand Down Expand Up @@ -820,6 +840,7 @@ func (wr *watchRequest) toPB() *pb.WatchRequest {
ProgressNotify: wr.progressNotify,
Filters: wr.filters,
PrevKv: wr.prevKV,
Fragment: !wr.noFragment,
}
cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
return &pb.WatchRequest{RequestUnion: cr}
Expand Down

0 comments on commit 6c7f556

Please sign in to comment.