-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CM: Add on-demand cluster discovery functionality #18723
Conversation
OdCdsApi interface is introduced as an API used for the on-demand discovery. The implementation of it is provided using the CdsApiHelper class, so OdCdsApiImpl handles the discovered cluster in the same way as CdsApiImpl does. On the ClusterManagerImpl side, the discovery manager (ClusterDiscoveryManager) is added to help in deduplicating the requests for the same cluster from within the same worker thread. Further deduplication of the requests coming from different worker threads is done in ClusterManagerImpl in the main thread. Each unique request for a cluster also receives a timeout to catch a case when a discovery fails, thus allowing to let the worker threads to handle the failure. Signed-off-by: Krzesimir Nowak <[email protected]>
I also have a branch, where I have implemented an extension that uses on-demand CDS (there are some integration tests too): https://github.com/kinvolk/envoy/tree/krnowak/odcds |
We've been testing the earlier version of this at Netflix, and I recently updated my test setup to use the updated branch that @krnowak shared above. The test harness is as follows
So far, so good. No issues found (just like the last time we tested this). Thanks @krnowak, great work! |
@adisuissa I'd welcome any feedback/PR comments you have! |
Would it be possible to break this PR up a bit? It's a bit large to review on its own Thanks! |
I'm not really sure. The changes I have made are mostly:
Maybe a short description of the discovery process will help in the review: First part is allocation of the on-demand CDS - this results in obtaining a handle to it. That's what Second part is when some worker uses the OD CDS handle to start a discovery process. This is where the cluster discovery manager (CDM) jumps in. CDM is a thread local object, so each worker has its own. CDM tracks the discovery process for the worker for every requested cluster. Once the discovery is finished, CDM will invoke callbacks with the result of the discovery. So when discovery starts, CDM is queried to check if the worker already requested the cluster earlier. If not, we check in the main thread if some other worker started the discovery. If not, we use the OD CDS handle to talk to the config server for the requested cluster. Whatever the result of it (success, no-such-cluster or time-out), the main thread will propagate the discovery status to the CDMs, which in turn will invoke the callbacks in the worker threads. OD CDS starts talking to config server on the first discovery request - that was to avoid starting the subscription with no resources, which gets interpreted as wildcard subscription. That led to having OD CDS being in one of three states: envoy/source/common/upstream/od_cds_api_impl.h Lines 21 to 28 in e83527c
So things are a bit related, so not sure how to split that up without having a PR introducing a dead code. Maybe reviews of the previous ODCDS PR could also help - #15857 Also, an example use of the new interfaces: (These are from my branch that extends the on_demand filter). |
Talked to @htuch about review ownership of this PR (I don't have the bandwidth to take this on anytime soon) - @adisuissa can you make a first pass and then @htuch will handle senior review? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a high-level pass on the shape of the PR and I think this makes sense. @adisuissa could you take the initial review as next step? Thanks.
Friendly ping @adisuissa for a review on this one. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this!
Overall I think this looks good, though this PR is quite long and somewhat challenging to understand the subtleties.
One thing to note is the many uses of std::move
throughout the code for non-ptrs, as it may make the code less readable (and should be used instead of passing a reference, if there is a performance reasoning behind it).
envoy/upstream/cluster_manager.h
Outdated
* @param name is the name of the cluster to be discovered. | ||
* @param callback will be called when the discovery is finished. | ||
* @param timeout describes how long the operation may take before failing. | ||
* @return ClusterDiscoveryCallbackHandlePtr the discovery process handle. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: no need for the return type in the comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I saw the type name used somewhere, so I copied it. Will remove it.
envoy/upstream/cluster_manager.h
Outdated
* cluster. When the requested cluster is added and warmed up, the passed callback will be invoked | ||
* in the same thread that invoked this function. | ||
* | ||
* The returned handle can be destroyed to prevent the callback to be invoked. Note that the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* The returned handle can be destroyed to prevent the callback to be invoked. Note that the | |
* The returned handle can be destroyed to prevent the callback from being invoked. Note that the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, will do.
private: | ||
friend class ClusterDiscoveryManager; | ||
|
||
CallbackInvoker(ClusterDiscoveryManager& parent, std::string name, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CallbackInvoker(ClusterDiscoveryManager& parent, std::string name, | |
CallbackInvoker(ClusterDiscoveryManager& parent, const std::string&& name, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rvalue reference to const is a contradictory type, since by writing rvalue reference we say "we are going to move its contents into something else", while "const" means "it's immutable". I pass the string by value, since the invoker needs its own copy anyway.
* Creates a new discovery manager in current thread and swaps it with the one in thread local | ||
* cluster manager. This could be used to simulate requesting a cluster from a different | ||
* thread. Used for tests only. | ||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add: @return the previous cluster discovery manager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do, thanks.
name, cluster_manager.thread_local_dispatcher_.name()); | ||
// This worker thread has already requested a discovery of a cluster with this name, so nothing | ||
// more left to do here. | ||
return std::move(handle); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why std::move
?
It might be better just to return the value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think it should be just return handle;
. I must have confused myself, because ClusterDiscoveryCallbackHandlePtr
is a move-only pointer (std::unique_ptr
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I had to turn it back into return std::move(handle);
, because handle
is a part of structured binding (so it isn't just ClusterDiscoveryCallbackHandlePtr
but rather some reference), so NRVO does not apply here. I added a comment in both places to explain the situation.
{std::move(name), ClusterCreation{std::move(odcds), std::move(timer)}}); | ||
}); | ||
|
||
return std::move(handle); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
dispatcher_.post([this, odcds = std::move(odcds), timeout, name = std::move(name), | ||
invoker = std::move(invoker), | ||
&thread_local_dispatcher = cluster_manager.thread_local_dispatcher_] { | ||
// Check for the cluster here too. It might have been added between the time when this closure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could there be a case that the discovery request for a cluster from one worker thread was already added, and then the cluster was removed?
If so, should the ODCDS protocol continue requesting the cluster?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me think about this case, it's been a while since I wrote it. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If some cluster is removed then we check the pending_cluster_creations_
map to see if that cluster was requested on demand. If it's so then we notify all worker threads about this fact. So I'd say that it depends on timing when the event comes in relation to adding the cluster name to discovery manager and adding the cluster name to the pending_cluster_creations_
map. I could imagine following scenarios, where W1T is worker1 thread, W2T is worker2 thread and MT is main thread.
case 1:
W1T: register callback for cluster foo
in thread-local discovery manager
MT: received cluster foo
removed for some reason
Nothing happens in the reaction to the cluster foo
removal, because foo
is not yet in pending_cluster_creations_
. So eventually we would add foo
to pending_cluster_creations_
and use ODCDS to request the cluster and the wait for discovery results.
case 2:
W1T: register callback for cluster foo
in thread-local discovery manager
MT: register foo
as requested by W1T in pending_cluster_creations_
and use ODCDS to request the cluster
MT: received cluster foo
removed
This will invoke the callback in W1T with a result that cluster is missing.
case 3:
W1T: register callback for cluster foo
in thread-local discovery manager
MT: register foo
as requested by W1T in pending_cluster_creations_
and use ODCDS to request the cluster
W2T: register callback for cluster foo
in thread-local discovery manager
MT: received cluster foo
removed
In this case both W1T and W2T will invoke the callbacks for cluster foo
with a result missing
. But since the registration of foo
in pending_cluster_creations_
from W2T didn't happen yet (scheduled and not yet dispatched), eventually we will request the cluster foo
again. The callback in discovery manager won't be invoked for a second time, so it looks like we shouldn't do the discovery again. But at this point we are in the main thread, but the information about callbacks are in worker thread. Trying to avoid the superfluous discovery could be a messy and unreliable thing involving mutexes and whatnot.
return {}; | ||
} | ||
CallbackList extracted; | ||
map_node_handle.mapped().swap(extracted); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is swapping necessary here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me think about this case, it's been a while since I wrote it. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function wants to steal the callback list from the pending_clusters_
map. It seems to me that the only way to steal something from a map is to use extract
, which gives you a "map node". To actually steal the value, I'd need to either do a swap like I did, or maybe do:
CallbackList extracted = std::move(map_node_handle.mapped())
Maybe another way could also be:
CallbackList extracted = std::move(pending_clusters_[name]);
pending_clusters_.erase(name);
return extracted;
Signed-off-by: Krzesimir Nowak <[email protected]>
@adisuissa: Thanks for the review, I appreciate it. I'm going to do a forced-push to fix DCO in the last commit's message and then I'll add the fixes and merge the main branch into mine. |
Signed-off-by: Krzesimir Nowak <[email protected]>
99e911c
to
1981e2e
Compare
But also add a comment why the return statement looks the way it does. Signed-off-by: Krzesimir Nowak <[email protected]>
/retest |
Retrying Azure Pipelines: |
/retest |
Retrying Azure Pipelines: |
/retest |
Retrying Azure Pipelines: |
/retest |
Retrying Azure Pipelines: |
@htuch, @adisuissa: I think I'll need help from you to kick github action again or something. Github thinks its running for 15 hours at the time of writing it (so I can't tell repokitteh to rerun the tests), while on azure there's an error message that it stopped hearing from some agent. |
/retest |
Retrying Azure Pipelines: |
Yeah, I had kicked the tests a while back, I probably forgot to mention that explicitly. |
Alright, it's green again, so it's ready for re-review. |
@adisuissa please take a look at this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the detailed explanations.
Mostly minor comments.
INSTANTIATE_TEST_SUITE_P(ClusterDiscoveryTestActions, ClusterDiscoveryTest, | ||
testing::ValuesIn(all_actions)); | ||
|
||
TEST_P(ClusterDiscoveryTest, TestActions) { runTest(); } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting test suite.
One downside to this is that executing a subset of tests cannot be done from the command line (--gtest_filter doesn't work in this case).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I haven't checked, but there probably is no way to filter the test parameters in generic way. The only thing that comes to my mind is to do what Envoy is already doing for ipv4 and ipv6 parameters for some of the integration tests, where you can disable ipv6 with an environment variable.
auto cb2 = createCallback(); | ||
auto handle1 = | ||
odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb1), timeout_); | ||
auto cdm = cluster_manager_->createAndSwapClusterDiscoveryManager("another_fake_thread"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line uses swap, but doesn't do anything with the returned value.
What I'm wondering is why would one use swap instead of just setting the value, i.e., swap methods have a specific semantics and if not needed why use it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is mostly an emulation thing. Emulating threads in these unit tests is icky. Each worker thread has its own cluster discovery manager (CDM). Here I emulate having a second thread by creating another CDM with a different name. I issued a discovery request in one "thread" (CDM, really) for a certain cluster and then did the same in another "thread". This is to test the case where only main thread can decide whether a discovery for a cluster was already requested and it's ongoing.
I probably don't need the old CDM to be around, but there are things that happen implicitly here when objects go out of scope and their destructors get invoked - callbacks being removed and so on. I erred here on the side of caution and kept the first CDM alive just as it would be with real threads.
odcds_callbacks_->onConfigUpdate({}, {}, ""); | ||
odcds_callbacks_->onConfigUpdate({}, {}, ""); | ||
odcds_callbacks_->onConfigUpdate({}, {}, ""); | ||
odcds_callbacks_->onConfigUpdate({}, {}, ""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why 4 calls?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Honestly I don't remember now. I replaced this with two calls. One that says that some unrelated cluster was added, and one that says that some unrelated cluster was removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe I forgot to fill out the sets. Made it four calls again: empty, unrelated cluster added, unrelated cluster removed, unrelated clusters added and removed.
Creating protobuf data is a bit more involved. Signed-off-by: Krzesimir Nowak <[email protected]>
Signed-off-by: Krzesimir Nowak <[email protected]>
Signed-off-by: Krzesimir Nowak <[email protected]>
Updated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM.
Thanks for working on this!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Sorry if I've asked this and forgotten already, but is it possible to add an integration test here?
@htuch There are integration tests in the next series of commits, which is where we expose this through configuration. I don't want to speak for @krnowak, but I think the reason they're in that patchset is that we need the config change to enable the integration test. This comment references the branch & tests: #18723 (comment) |
@htuch: What James said - the feature is exposed as API that an extension could consume, but not wired up to Envoy config. I have a branch where I modified the on-demand extension to use this API (because its name was quite fitting). I could file it as a follow-up PR. |
Ack, makes sense, let's merge this and get the next PRs landed (hopefully smaller and faster to land ;) 🎉 |
🥳 Thanks @krnowak, @adisuissa, @htuch. I appreciate all your effort and support on moving this through. |
Interesting compile failure on Linux aarch64 and amd64 and MacOS with this change when running a slightly upgraded version of
The error is the same across all platforms
Reverting back to |
Maybe this code should be using envoy/source/common/upstream/od_cds_api_impl.cc Lines 81 to 85 in 33a1129
|
OdCdsApi interface is introduced as an API used for the on-demand discovery. The implementation of it is provided using the CdsApiHelper class, so OdCdsApiImpl handles the discovered cluster in the same way as CdsApiImpl does. On the ClusterManagerImpl side, the discovery manager (ClusterDiscoveryManager) is added to help in deduplicating the requests for the same cluster from within the same worker thread. Further deduplication of the requests coming from different worker threads is done in ClusterManagerImpl in the main thread. Each unique request for a cluster also receives a timeout to catch a case when a discovery fails, thus allowing to let the worker threads to handle the failure. This is a continuation of envoyproxy#15857 - I could not reopen it, so I'm opening a new PR. I used the opportunity to rebase my changes on top of main. Risk Level: Low. A new feature not wired up anywhere yet. Signed-off-by: Krzesimir Nowak <[email protected]> Signed-off-by: Josh Perry <[email protected]>
Commit Message:
OdCdsApi interface is introduced as an API used for the on-demand discovery. The implementation of it is provided using the CdsApiHelper class, so OdCdsApiImpl handles the discovered cluster in the same way as CdsApiImpl does. On the ClusterManagerImpl side, the discovery manager (ClusterDiscoveryManager) is added to help in deduplicating the requests for the same cluster from within the same worker thread. Further deduplication of the requests coming from different worker threads is done in ClusterManagerImpl in the main thread. Each unique request for a cluster also receives a timeout to catch a case when a discovery fails, thus allowing to let the worker threads to handle the failure.
Additional Description:
This is a continuation of #15857 - I could not reopen it, so I'm opening a new PR. I used the opportunity to rebase my changes on top of main.
Risk Level:
Low. A new feature not wired up anywhere yet.
Testing:
Docs Changes:
Release Notes:
Platform Specific Features:
[Optional Runtime guard:]
[Optional Fixes #Issue]
[Optional Fixes commit #PR or SHA]
[Optional Deprecated:]
[Optional API Considerations:]