Skip to content

Commit

Permalink
mvcc: allow clients to assign watcher IDs
Browse files Browse the repository at this point in the history
This allows for watchers to be created concurrently without needing potentially complex and latency-adding queuing on the client
  • Loading branch information
connor4312 committed Oct 7, 2017
1 parent e8e3467 commit 4e4979d
Show file tree
Hide file tree
Showing 11 changed files with 389 additions and 280 deletions.
5 changes: 5 additions & 0 deletions Documentation/dev-guide/apispec/swagger/rpc.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -2283,6 +2283,11 @@
"description": "start_revision is an optional revision to watch from (inclusive). No start_revision is \"now\".",
"type": "string",
"format": "int64"
},
"watch_id": {
"description": "If watch_id is provided, it will be used to assign to this watcher. Since\ncreating a watcher in etcd is not a synchronous operation, this can be\nused ensure that ordering is correct when creating multiple watchers on\nthe same stream. Creating a watcher with an ID already in use on the\nstream will cause an error to be returned.",
"type": "string",
"format": "int64"
}
}
},
Expand Down
1 change: 1 addition & 0 deletions etcdserver/api/v3rpc/rpctypes/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ var (
ErrGRPCAuthNotEnabled = status.New(codes.FailedPrecondition, "etcdserver: authentication is not enabled").Err()
ErrGRPCInvalidAuthToken = status.New(codes.Unauthenticated, "etcdserver: invalid auth token").Err()
ErrGRPCInvalidAuthMgmt = status.New(codes.InvalidArgument, "etcdserver: invalid auth management").Err()
ErrGRPCDuplicateWatcherID = status.New(codes.InvalidArgument, "etcdserver: duplicate watcher ID").Err()

ErrGRPCNoLeader = status.New(codes.Unavailable, "etcdserver: no leader").Err()
ErrGRPCNotLeader = status.New(codes.Unavailable, "etcdserver: not leader").Err()
Expand Down
39 changes: 26 additions & 13 deletions etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,21 @@ func (sws *serverWatchStream) isWatchPermitted(wcr *pb.WatchCreateRequest) bool
return sws.ag.AuthStore().IsRangePermitted(authInfo, wcr.Key, wcr.RangeEnd) == nil
}

func (sws *serverWatchStream) sendWatchCreateError(id mvcc.WatchID, reason string) {
wr := &pb.WatchResponse{
Header: sws.newResponseHeader(sws.watchStream.Rev()),
WatchId: int64(id),
Canceled: true,
Created: true,
CancelReason: reason,
}

select {
case sws.ctrlStream <- wr:
case <-sws.closec:
}
}

func (sws *serverWatchStream) recvLoop() error {
for {
req, err := sws.gRPCStream.Recv()
Expand Down Expand Up @@ -201,19 +216,13 @@ func (sws *serverWatchStream) recvLoop() error {
creq.RangeEnd = []byte{}
}

if !sws.isWatchPermitted(creq) {
wr := &pb.WatchResponse{
Header: sws.newResponseHeader(sws.watchStream.Rev()),
WatchId: -1,
Canceled: true,
Created: true,
CancelReason: rpctypes.ErrGRPCPermissionDenied.Error(),
}
id := mvcc.WatchID(-1)
if creq.WatchId != 0 {
id = mvcc.WatchID(creq.WatchId)
}

select {
case sws.ctrlStream <- wr:
case <-sws.closec:
}
if !sws.isWatchPermitted(creq) {
sws.sendWatchCreateError(id, rpctypes.ErrGRPCPermissionDenied.Error())
return nil
}

Expand All @@ -224,7 +233,11 @@ func (sws *serverWatchStream) recvLoop() error {
if rev == 0 {
rev = wsrev + 1
}
id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev, filters...)
id, err = sws.watchStream.Watch(id, creq.Key, creq.RangeEnd, rev, filters...)
if err != nil {
sws.sendWatchCreateError(id, err.Error())
continue
}
if id != -1 {
sws.mu.Lock()
if creq.ProgressNotify {
Expand Down
502 changes: 271 additions & 231 deletions etcdserver/etcdserverpb/rpc.pb.go

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions etcdserver/etcdserverpb/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,13 @@ message WatchCreateRequest {
// If prev_kv is set, created watcher gets the previous KV before the event happens.
// If the previous KV is already compacted, nothing will be returned.
bool prev_kv = 6;

// If watch_id is provided and non-zero, it will be used to assign to this
// watcher. Since creating a watcher in etcd is not a synchronous operation,
// this can be used ensure that ordering is correct when creating multiple
// watchers on the same stream. Creating a watcher with an ID already in
// use on the stream will cause an error to be returned.
int64 watch_id = 7;
}

message WatchCancelRequest {
Expand Down
4 changes: 2 additions & 2 deletions mvcc/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ func TestWatchableKVWatch(t *testing.T) {
w := s.NewWatchStream()
defer w.Close()

wid := w.Watch([]byte("foo"), []byte("fop"), 0)
wid, _ := w.Watch(-1, []byte("foo"), []byte("fop"), 0)

wev := []mvccpb.Event{
{Type: mvccpb.PUT,
Expand Down Expand Up @@ -783,7 +783,7 @@ func TestWatchableKVWatch(t *testing.T) {
}

w = s.NewWatchStream()
wid = w.Watch([]byte("foo1"), []byte("foo2"), 3)
wid, _ = w.Watch(-1, []byte("foo1"), []byte("foo2"), 3)

select {
case resp := <-w.Chan():
Expand Down
6 changes: 3 additions & 3 deletions mvcc/watchable_store_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func BenchmarkWatchableStoreWatchSyncPut(b *testing.B) {
watchIDs := make([]WatchID, b.N)
for i := range watchIDs {
// non-0 value to keep watchers in unsynced
watchIDs[i] = w.Watch(k, nil, 1)
watchIDs[i], _ = w.Watch(-1, k, nil, 1)
}

b.ResetTimer()
Expand Down Expand Up @@ -142,7 +142,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
watchIDs := make([]WatchID, watcherN)
for i := 0; i < watcherN; i++ {
// non-0 value to keep watchers in unsynced
watchIDs[i] = w.Watch(testKey, nil, 1)
watchIDs[i], _ = w.Watch(-1, testKey, nil, 1)
}

// random-cancel N watchers to make it not biased towards
Expand Down Expand Up @@ -182,7 +182,7 @@ func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
watchIDs := make([]WatchID, watcherN)
for i := 0; i < watcherN; i++ {
// 0 for startRev to keep watchers in synced
watchIDs[i] = w.Watch(testKey, nil, 0)
watchIDs[i], _ = w.Watch(-1, testKey, nil, 0)
}

// randomly cancel watchers to make it not biased towards
Expand Down
20 changes: 10 additions & 10 deletions mvcc/watchable_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestWatch(t *testing.T) {
s.Put(testKey, testValue, lease.NoLease)

w := s.NewWatchStream()
w.Watch(testKey, nil, 0)
w.Watch(-1, testKey, nil, 0)

if !s.synced.contains(string(testKey)) {
// the key must have had an entry in synced
Expand All @@ -63,7 +63,7 @@ func TestNewWatcherCancel(t *testing.T) {
s.Put(testKey, testValue, lease.NoLease)

w := s.NewWatchStream()
wt := w.Watch(testKey, nil, 0)
wt, _ := w.Watch(-1, testKey, nil, 0)

if err := w.Cancel(wt); err != nil {
t.Error(err)
Expand Down Expand Up @@ -114,7 +114,7 @@ func TestCancelUnsynced(t *testing.T) {
watchIDs := make([]WatchID, watcherN)
for i := 0; i < watcherN; i++ {
// use 1 to keep watchers in unsynced
watchIDs[i] = w.Watch(testKey, nil, 1)
watchIDs[i], _ = w.Watch(-1, testKey, nil, 1)
}

for _, idx := range watchIDs {
Expand Down Expand Up @@ -160,7 +160,7 @@ func TestSyncWatchers(t *testing.T) {

for i := 0; i < watcherN; i++ {
// specify rev as 1 to keep watchers in unsynced
w.Watch(testKey, nil, 1)
w.Watch(-1, testKey, nil, 1)
}

// Before running s.syncWatchers() synced should be empty because we manually
Expand Down Expand Up @@ -242,7 +242,7 @@ func TestWatchCompacted(t *testing.T) {
}

w := s.NewWatchStream()
wt := w.Watch(testKey, nil, compactRev-1)
wt, _ := w.Watch(-1, testKey, nil, compactRev-1)

select {
case resp := <-w.Chan():
Expand Down Expand Up @@ -271,7 +271,7 @@ func TestWatchFutureRev(t *testing.T) {

w := s.NewWatchStream()
wrev := int64(10)
w.Watch(testKey, nil, wrev)
w.Watch(-1, testKey, nil, wrev)

for i := 0; i < 10; i++ {
rev := s.Put(testKey, testValue, lease.NoLease)
Expand Down Expand Up @@ -310,7 +310,7 @@ func TestWatchRestore(t *testing.T) {
defer cleanup(newStore, newBackend, newPath)

w := newStore.NewWatchStream()
w.Watch(testKey, nil, rev-1)
w.Watch(-1, testKey, nil, rev-1)

newStore.Restore(b)
select {
Expand Down Expand Up @@ -349,7 +349,7 @@ func TestWatchBatchUnsynced(t *testing.T) {
}

w := s.NewWatchStream()
w.Watch(v, nil, 1)
w.Watch(-1, v, nil, 1)
for i := 0; i < batches; i++ {
if resp := <-w.Chan(); len(resp.Events) != watchBatchMaxRevs {
t.Fatalf("len(events) = %d, want %d", len(resp.Events), watchBatchMaxRevs)
Expand Down Expand Up @@ -485,7 +485,7 @@ func TestWatchVictims(t *testing.T) {
for i := 0; i < numWatches; i++ {
go func() {
w := s.NewWatchStream()
w.Watch(testKey, nil, 1)
w.Watch(-1, testKey, nil, 1)
defer func() {
w.Close()
wg.Done()
Expand Down Expand Up @@ -561,7 +561,7 @@ func TestStressWatchCancelClose(t *testing.T) {
w := s.NewWatchStream()
ids := make([]WatchID, 10)
for i := range ids {
ids[i] = w.Watch(testKey, nil, 0)
ids[i], _ = w.Watch(-1, testKey, nil, 0)
}
<-readyc
wg.Add(1 + len(ids)/2)
Expand Down
28 changes: 19 additions & 9 deletions mvcc/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import (
)

var (
ErrWatcherNotExist = errors.New("mvcc: watcher does not exist")
ErrWatcherNotExist = errors.New("mvcc: watcher does not exist")
ErrWatcherDuplicateID = errors.New("mvcc: duplicate ID on the WatchStream")
)

type WatchID int64
Expand All @@ -33,15 +34,17 @@ type FilterFunc func(e mvccpb.Event) bool

type WatchStream interface {
// Watch creates a watcher. The watcher watches the events happening or
// happened on the given key or range [key, end) from the given startRev.
// happened on the given key or range [key, end) from the given startRev. If
// the ID is provided and non-zero, it will be used, otherwise a new ID
// will be returned.
//
// The whole event history can be watched unless compacted.
// If `startRev` <=0, watch observes events after currentRev.
//
// The returned `id` is the ID of this watcher. It appears as WatchID
// in events that are sent to the created watcher through stream channel.
//
Watch(key, end []byte, startRev int64, fcs ...FilterFunc) WatchID
Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error)

// Chan returns a chan. All watch response will be sent to the returned chan.
Chan() <-chan WatchResponse
Expand Down Expand Up @@ -99,27 +102,34 @@ type watchStream struct {

// Watch creates a new watcher in the stream and returns its WatchID.
// TODO: return error if ws is closed?
func (ws *watchStream) Watch(key, end []byte, startRev int64, fcs ...FilterFunc) WatchID {
func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) {
// prevent wrong range where key >= end lexicographically
// watch request with 'WithFromKey' has empty-byte range end
if len(end) != 0 && bytes.Compare(key, end) != -1 {
return -1
return -1, nil
}

ws.mu.Lock()
defer ws.mu.Unlock()
if ws.closed {
return -1
return -1, nil
}

id := ws.nextID
ws.nextID++
if id == -1 {
for ws.watchers[ws.nextID] != nil {
ws.nextID++
}
id = ws.nextID
ws.nextID++
} else if ws.watchers[id] != nil {
return -1, ErrWatcherDuplicateID
}

w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)

ws.cancels[id] = c
ws.watchers[id] = w
return id
return id, nil
}

func (ws *watchStream) Chan() <-chan WatchResponse {
Expand Down
2 changes: 1 addition & 1 deletion mvcc/watcher_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ func BenchmarkKVWatcherMemoryUsage(b *testing.B) {
b.ReportAllocs()
b.StartTimer()
for i := 0; i < b.N; i++ {
w.Watch([]byte(fmt.Sprint("foo", i)), nil, 0)
w.Watch(-1, []byte(fmt.Sprint("foo", i)), nil, 0)
}
}
Loading

0 comments on commit 4e4979d

Please sign in to comment.