-
Notifications
You must be signed in to change notification settings - Fork 9.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fragment Watch Reponse Messages #8371
Conversation
I left the log statements in for clarity and added a couple of "tests" for demonstration purposes. The server and watch proxy fragment a watch response by limiting the number of events per message, then the client glues the fragments back together. |
etcdserver/etcdserverpb/rpc.proto
Outdated
|
||
int64 fragment_count = 7; | ||
|
||
int64 curr_fragment = 8; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can these be bool fragment = 7
? it's simpler to have the flag set until the last message so the client only needs to know it should keep accumulating responses
proxy/grpcproxy/watch.go
Outdated
if maxEventsPerMsg == 0 { | ||
return | ||
} | ||
for _, fragment := range v3rpc.FragmentWatchResponse(maxEventsPerMsg, wresp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this duplicates the functionality from the server; the proxy can probably passively forward fragments received from the server instead of refragmenting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the proxy receives responses from a watch client, by the time the proxy gets the watch responses that it is supposed to relay to its watchers, the watch response fragments have already been glued back together. If I am not mistaken the data flow seems to be:
mvcc -> server -> client watcher started by proxy -> proxy -> user's client watcher.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The watch client could have a WithFragments()
option to skip reassembly. An alternative is to share the fragmenting code between the proxy and server, but it's not clear to me how that would fit together cleanly. Duplicating the fragmentation policy is not good-- too easy for code to drift.
etcdserver/api/v3rpc/watch.go
Outdated
if maxEventsPerMsg == 0 { | ||
return | ||
} | ||
for _, fragment := range FragmentWatchResponse(maxEventsPerMsg, wr) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the WatchCreateRequest
protobuf should have bool fragment;
field to optionally enable this feature; if it's not opt-in then older clients will have trouble with revisions being split across responses
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I'll have to think about how to implement this. Do you think a map between the watchId and fragment boolean that would be stored in the serverWatchStream
would be a good idea? It seems like once the WatchCreateRequest
reaches the serverWatchStream
's recvLoop
the fragment boolean has to be stored somewhere rather than passed along to the watch request on the mvcc backend.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that sounds OK
…d responses in fragments
Codecov Report
@@ Coverage Diff @@
## master #8371 +/- ##
==========================================
- Coverage 77.23% 74.62% -2.61%
==========================================
Files 353 354 +1
Lines 28019 28410 +391
==========================================
- Hits 21640 21202 -438
- Misses 4863 5766 +903
+ Partials 1516 1442 -74
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mostly remarks about naming and default behavior
@@ -0,0 +1,161 @@ | |||
// Copyright 2017 The etcd Authors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be part of clientv3/integration/watch_test.go
} | ||
|
||
// Create and register watch proxy. | ||
wp, _ := grpcproxy.NewWatchProxy(clus.Client(0)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to create a proxy here; the tag cluster_proxy
will automatically have the integration package put a proxy between the server and the client
clus.Members[2].GRPCAddr(), | ||
}, | ||
} | ||
cli, err := clientv3.New(cfg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't make a special client if it's not needed; just use clus.Client(0)
|
||
// Does not include the clientv3.WithFragmentedResponse option. | ||
wChannel := w.Watch(context.TODO(), "foo", clientv3.WithRange("z")) | ||
_, err = clus.Client(0).Revoke(context.Background(), firstLease.ID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of using a lease to bulk delete, `Delete(context.TODO(), "foo", clientv3.WithPrefix())`` would be clearer / less code
defer testutil.AfterTest(t) | ||
// MaxResponseBytes will overflow to 1000 once the grpcOverheadBytes, | ||
// which have a value of 512 * 1024, are added to MaxResponseBytes. | ||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, MaxResponseBytes: ^uint(0) - (512*1024 - 1 - 1000)}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Size: 1
since this isn't testing any clustering features
WatchId: int64(id), | ||
Created: true, | ||
Canceled: id == -1, | ||
MoreFragments: false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to set this; it's false by default
filters []mvcc.FilterFunc | ||
progress bool | ||
prevKV bool | ||
fragmentResponse bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/fragmentResponse/fragment
if err != nil { | ||
return err | ||
} | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not needed
continue | ||
} | ||
var sendFragments func(maxEventsPerMsg int, wr *pb.WatchResponse) error | ||
sendFragments = func(maxEventsPerMsg int, wr *pb.WatchResponse) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe this should be func (sws *serverWatchStream) sendFragments(maxEvents int, wr *pb.WatchResponse) error
|
||
// MoreFragments indicates that more fragments composing one large | ||
// watch fragment are expected. | ||
MoreFragments bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just Fragment
?
@heyitsanthony |
have been merge ? |
@allwmh Not yet, sorry for the delay. I'll continue working on this shortly |
Closing in favor of #9291. We will cherry-pick this patch. Thanks. |
Addresses #8188
Still a WIP. 💀 Do not merge 💀