-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CM: Add on-demand cluster discovery functionality #15857
Changes from 12 commits
0652500
050c490
d5f74fb
805ef37
8cac4ac
3c4cfa5
75a0c46
a53ea84
a718e3b
cbe797e
bc1d0df
b32112f
4e52486
7ea51f0
fad4410
4f854b5
bb2e17c
caad394
722562c
2fd85dc
61a07d2
1002639
c4eef58
161d896
6058b3d
857f696
d138b40
d694f4b
11031da
00cc3b3
1d11e12
f912324
ccc45c5
8464e01
3c5ec84
1db17a0
60394a4
a513cb6
7b3f78b
bb3eb45
124179a
788680a
04f8f43
411e72d
f6ee71a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,7 +39,7 @@ namespace Envoy { | |
namespace Upstream { | ||
|
||
/** | ||
* ClusterUpdateCallbacks provide a way to exposes Cluster lifecycle events in the | ||
* ClusterUpdateCallbacks provide a way to expose Cluster lifecycle events in the | ||
* ClusterManager. | ||
*/ | ||
class ClusterUpdateCallbacks { | ||
|
@@ -72,6 +72,66 @@ class ClusterUpdateCallbacksHandle { | |
|
||
using ClusterUpdateCallbacksHandlePtr = std::unique_ptr<ClusterUpdateCallbacksHandle>; | ||
|
||
/** | ||
* Status enum for the result of an attempted cluster discovery. | ||
*/ | ||
enum class ClusterDiscoveryStatus { | ||
/** | ||
* Cluster was not found during the discovery process. | ||
*/ | ||
Missing, | ||
/** | ||
* Cluster found and currently available through ClusterManager. | ||
*/ | ||
Available, | ||
}; | ||
|
||
/** | ||
* ClusterDiscoveryCallback is a callback called at the end of the on-demand cluster discovery | ||
* process. The status of the discovery is sent as a parameter. | ||
*/ | ||
using ClusterDiscoveryCallback = std::function<void(ClusterDiscoveryStatus)>; | ||
using ClusterDiscoveryCallbackWeakPtr = std::weak_ptr<ClusterDiscoveryCallback>; | ||
using ClusterDiscoveryCallbackSharedPtr = std::shared_ptr<ClusterDiscoveryCallback>; | ||
|
||
/** | ||
* ClusterDiscoveryCallbackHandle is a RAII wrapper for a ClusterDiscoveryCallback. Deleting the | ||
* ClusterDiscoveryCallbackHandle will remove the callbacks from ClusterManager. | ||
*/ | ||
class ClusterDiscoveryCallbackHandle { | ||
public: | ||
virtual ~ClusterDiscoveryCallbackHandle() = default; | ||
}; | ||
|
||
using ClusterDiscoveryCallbackHandlePtr = std::unique_ptr<ClusterDiscoveryCallbackHandle>; | ||
|
||
/** | ||
* A handle to an on-demand CDS. | ||
*/ | ||
class OdCdsApiHandle { | ||
public: | ||
virtual ~OdCdsApiHandle() = default; | ||
|
||
/** | ||
* Request an on-demand discovery of a cluster with a passed name. This ODCDS may be used to | ||
* perform the discovery process in the main thread if there is no discovery going on for this | ||
* cluster. The passed callback will be invoked when the cluster is added and warmed up. It is | ||
* expected that the callback will be destroyed when it is invoked. To cancel the discovery, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What are the cross thread issues here? If the request is issued on a worker thread, will the passed callback be potentially invoked and destructed on the main thread, or do we guarantee this happens on the originating worker thread? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The callback is always invoked in the same thread that requested the discovery. The worker thread is the sole owner of the callback, the main thread only has a weak access to it. The expectation of the callback clearing itself is to avoid a situation where the callback could be called twice like: Worker thread W1 requests the discovery of a cluster C. Main thread sees that this cluster is unknown and there is no discovery process for it ongoing, so it begins one. It sends a request, receives a reply and starts processing it in the main thread. Worker thread W2 in the same time also requests the discovery of cluster C (as it still not available for W2), so its callback is added to the discovery manager. In the main thread the processing of the reply is finished, so worker threads get notified about the added cluster C, so both W1 and W2 get notified (both had their callbacks in the discovery manager). And then in the main thread, the continuation of W2's discovery request is executed - it notices that the cluster C is already known, so it posts the callback back to W2 to be executed. And if the callback haven't cleared itself on the first call, it's going to be executed for the second time. Please see the attached picture, hopefully makes the situation clearer: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This feels a bit fragile, why can't the discovery manager handle the clearing after invocation? Regarding the diagram, this is excellent, can we include this somewhere? E.g. I think the thread model makes sense, can you explain at least the basic guarantee that callback is invoked on the thread it is registered on? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, how are we managing lifetime? What happens if the original request that registered the callback is already deleted (including state captured in the callback) by time the request completes? I'm asking as it's really tricky to get these corner cases correct, @stevenzzzz @chaoqin-li1123 @dmitri-d all have experience in implemented on-demand SRDS/VHDS and the tricky race conditions around lifetime. Do you have tests that capture all these possibilities? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This is a good idea, I'll investigate it. I can imagine I'll try to refactor it that way and see where it takes me.
Thanks. I can make some diagrams like this to explain the possible workflows during the discovery process. About this one specifically - it shows a scenario that we wanted to avoid, so it's probably not the best diagram for docs. :)
Sure, will update the comment.
With current state of the code, both handle and the callback would also be deleted, so the discovery manager would either not see the callback (because destroying the handle should remove the callback) or will see it as a nullptr after locking (because it only has a weak_ptr to the callback), so it would not execute it.
The tests capturing these possibilities are tricky to write as unit tests, because mock tls assumes just one thread, and mock dispatcher executes the callbacks immediately. Not sure if integration tests would make testing it easier either (no fine grained control of the discovery process and this feature is not really exposed yet in any way). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @chaoqin-li1123 @dmitri-d can you point to some example tests that you wrote that captured these lifetime issues for on-demand SRDS or VHDS? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, if you are relying shared/weak pointer semantics for lifetime safety, don't worry about tryign to convert to a unique_ptr.. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I refactored it anyway, because I found some lifetime issues with handles (basically make a request for cluster foo, wait for the callback to be invoked, but keep the handle, request the same cluster again and then destroy the old handle, all of it in the same thread and we end up using a dangling iterator inside the destroyed handle). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here's a vhds integration test: https://github.com/envoyproxy/envoy/blob/main/test/integration/vhds_integration_test.cc#L668 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the link. I think I skimmed through it some time ago, when I was writing the integration test for the previous PR. Not sure if I can write the integration tests for this PR, since the cluster discovery functionality is not yet exposed in any way. |
||
* destroy the returned handle and the callback. | ||
* | ||
* This function is thread-safe. | ||
* | ||
* @param name is the name of the cluster to be discovered. | ||
* @param callback will be called when the discovery is finished. | ||
* @return ClusterDiscoveryCallbackHandlePtr the discovery process handle. | ||
*/ | ||
virtual ClusterDiscoveryCallbackHandlePtr | ||
requestOnDemandClusterDiscovery(const std::string& name, | ||
ClusterDiscoveryCallbackSharedPtr callback) PURE; | ||
}; | ||
|
||
using OdCdsApiHandleSharedPtr = std::shared_ptr<OdCdsApiHandle>; | ||
|
||
class ClusterManagerFactory; | ||
|
||
// These are per-cluster per-thread, so not "global" stats. | ||
|
@@ -309,6 +369,19 @@ class ClusterManager { | |
virtual const ClusterRequestResponseSizeStatNames& | ||
clusterRequestResponseSizeStatNames() const PURE; | ||
virtual const ClusterTimeoutBudgetStatNames& clusterTimeoutBudgetStatNames() const PURE; | ||
|
||
/** | ||
* Allocates an on-demand CDS API provider from configuration proto or locator. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What are the semantics when we are using CDS and have both regular CDS wildcard subscription and OCDS on the same stream? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for pointing this out. To rehash:
So, it looks like those two currently can't work together in one stream. Now, I don't know if this ignoring of requests with specific resource names while being in wildcard mode is enforced on Envoy side or this is an expectation from the config server. I don't know what is the rationale behind this restriction, so I'd ask if lifting this restriction would be an acceptable idea. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think @markdroth added that restriction in https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol. I also agree it would be good to be able to combine wildcard and on-demand CDS here on the same stream. Thoughts @markdroth? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As currently defined, it's not possible to have both a wildcard subscription and subscription to specific resource names on the same stream. Note, however, that wildcard subscriptions are intended to go away as part of the new naming scheme, so one option here would be to simply say that you can't do both unless you migrate to the new naming scheme. If that's not feasible, we could look into making some tranport protocol change to support this. For example, maybe the wildcard request could be represented by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Is there a reason why both wildcard subscription and subscription to specific resource names are impossible on the same stream? I can see the conflict between SotW wildcard subscription and subscription to specific resource names (SotW update would likely delete the clusters we got on-demand). Such conflict wouldn't exist with delta wildcard subscription. I suppose I could limit the on-demand CDS to be usable only with delta gRPC. About the naming scheme - I need to read about this, so can't say much about it. Just wanted to say that I dunno if cluster names coming from cluster_header action checking the
Wouldn't changing the wildcard request representation to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SG. Let's update https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol as part of this proposal then. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If CDS is configured and ODCDS is not configured, what initial request will contain in resource names? An empty list or I suppose that this "static" decision would be the way to go, because CDS is usually configured statically, while ODCDS might be configured much later (for example, ODCDS could be a part of some route configuration received from RDS). I'm probably wrong somewhere, so please correct me if it's the case. Thanks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we require this to be explicitly configured, then we need to figure out how to handle the case where someone configures CDS to leave the field unset instead of setting it to Note that currently, wildcard mode is determined based on the first request for that resource type on a given stream, and there is no way to subsequently switch in or out of wildcard mode for that resource type on that stream. So if the first CDS request on a stream has the https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#how-the-client-specifies-what-resources-to-return I think that what we should do here is to change the spec to make it possible to switch out of wildcard code after the first request on a stream. However, it will still be impossible to switch into wildcard mode once in non-wildcard mode, because otherwise there will be no way to unsubscribe from the last resource of a given type. In other words:
Note that in terms of the spec, I think we can say that servers should be prepared to accept In terms of the Envoy implementation, I think we can figure out what to send automatically at the moment that we send each request. We just need to track a bool indicating whether the stream has previously seen a non-wildcard subscription for a given resource type. If it hasn't, then we can leave the field unset; if it has, then we send FWIW, note that EDS never makes wildcard requests. The only resource types that make wildcard requests are LDS, CDS, and (I think) SRDS. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SG, I think your third case doesn't apply with delta xDS though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, for delta xDS, instead of the |
||
* | ||
* @param odcds_config is a configuration proto. Used when odcds_resources_locator is a nullopt. | ||
* @param odcds_resources_locator is a locator for ODCDS. Used over odcds_config if not a nullopt. | ||
* @param validation_visitor | ||
* @return OdCdsApiHandleSharedPtr the ODCDS handle. | ||
*/ | ||
virtual OdCdsApiHandleSharedPtr | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I see why you uses shared_ptr here (to deal with lifetime issues), but in a perfect world from an API perspective this should be a unique_ptr return and any needed shared/weak ptr gymnastics should be done under the hood. From a user perspective I would expect to allocate a handle in my filter, etc. main thread init, use that across different threads, and then destroy it when the filter is destroyed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, this makes sense. Thanks for the insight. I have made it an unique_ptr that holds a shared_ptr to OdCdsApi. |
||
allocateOdCdsApi(const envoy::config::core::v3::ConfigSource& odcds_config, | ||
OptRef<xds::core::v3::ResourceLocator> odcds_resources_locator, | ||
ProtobufMessage::ValidationVisitor& validation_visitor) PURE; | ||
}; | ||
|
||
using ClusterManagerPtr = std::unique_ptr<ClusterManager>; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -957,6 +957,7 @@ void ClusterManagerImpl::postThreadLocalClusterUpdate(ClusterManagerCluster& cm_ | |
per_priority.overprovisioning_factor_ = host_set->overprovisioningFactor(); | ||
} | ||
|
||
pending_cluster_creations_.erase(cm_cluster.cluster().info()->name()); | ||
tls_.runOnAllThreads( | ||
[info = cm_cluster.cluster().info(), params = std::move(params), add_or_update_cluster, | ||
load_balancer_factory](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) { | ||
|
@@ -1027,6 +1028,194 @@ ClusterManagerImpl::addThreadLocalClusterUpdateCallbacks(ClusterUpdateCallbacks& | |
return std::make_unique<ClusterUpdateCallbacksHandleImpl>(cb, cluster_manager.update_callbacks_); | ||
} | ||
|
||
namespace { | ||
|
||
using ClusterAddedCb = std::function<void(ThreadLocalCluster&)>; | ||
|
||
class ClusterCallbacks : public ClusterUpdateCallbacks { | ||
public: | ||
ClusterCallbacks(ClusterAddedCb cb) : cb_(std::move(cb)) {} | ||
|
||
void onClusterAddOrUpdate(ThreadLocalCluster& cluster) override { cb_(cluster); }; | ||
|
||
void onClusterRemoval(const std::string&) override {} | ||
|
||
private: | ||
ClusterAddedCb cb_; | ||
}; | ||
|
||
} // namespace | ||
|
||
ClusterManagerImpl::ClusterDiscoveryManager::ClusterDiscoveryManager( | ||
ThreadLocalClusterManagerImpl& parent) | ||
: parent_(parent) {} | ||
|
||
void ClusterManagerImpl::ClusterDiscoveryManager::ensureLifecycleCallbacksAreInstalled() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this function needed? Can't these callbacks just be installed as part of the CDM constructor? Is the issue calling a virtual function in the constructor? For that I would slightly refactor to fix that vs. doing it this way. Let me know if you need more details. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, my thinking was that we wouldn't want these callbacks to be installed if they are not used. So the idea was to install them on first use and remove them if all the requests were handled. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added |
||
if (callbacks_handle_) { | ||
return; | ||
} | ||
auto cb = ClusterAddedCb([this](ThreadLocalCluster& cluster) { | ||
processClusterName(cluster.info()->name(), ClusterDiscoveryStatus::Available); | ||
}); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: just inline next line |
||
callbacks_ = std::make_unique<ClusterCallbacks>(cb); | ||
callbacks_handle_ = parent_.parent_.addThreadLocalClusterUpdateCallbacks(*callbacks_); | ||
} | ||
|
||
void ClusterManagerImpl::ClusterDiscoveryManager::processClusterName( | ||
const std::string& name, ClusterDiscoveryStatus cluster_status) { | ||
// Extracting the list of callbacks from the map makes resetting the | ||
// handle inside the callback safe, because handle would try to find | ||
// the list of callbacks in the map and would find nothing, instead | ||
// of removing an item from the list while iterating the list. | ||
auto map_node_handle = pending_clusters_.extract(name); | ||
if (map_node_handle.empty()) { | ||
return; | ||
} | ||
for (const auto& weak_callback : map_node_handle.mapped()) { | ||
auto callback = weak_callback.lock(); | ||
if (callback != nullptr) { | ||
(*callback)(cluster_status); | ||
} | ||
} | ||
maybePostResetCallbacks(); | ||
} | ||
|
||
ClusterManagerImpl::ClusterDiscoveryManager::Pair | ||
ClusterManagerImpl::ClusterDiscoveryManager::addCallback( | ||
const std::string& name, const ClusterDiscoveryCallbackSharedPtr& callback) { | ||
ensureLifecycleCallbacksAreInstalled(); | ||
|
||
auto& callbacks_list = pending_clusters_[name]; | ||
auto it = callbacks_list.emplace(callbacks_list.end(), callback); | ||
auto handle = std::make_unique<ClusterDiscoveryCallbackHandleImpl>(*this, name, it); | ||
auto discovery_in_progress = (callbacks_list.size() > 1); | ||
return {std::move(handle), discovery_in_progress}; | ||
} | ||
|
||
void ClusterManagerImpl::ClusterDiscoveryManager::erase(const std::string& name, | ||
CallbackListIterator it) { | ||
const bool drop_list = eraseFromList(name, it); | ||
if (drop_list) { | ||
pending_clusters_.erase(name); | ||
} | ||
maybePostResetCallbacks(); | ||
} | ||
|
||
bool ClusterManagerImpl::ClusterDiscoveryManager::eraseFromList(const std::string& name, | ||
CallbackListIterator it) { | ||
auto map_it = pending_clusters_.find(name); | ||
if (map_it == pending_clusters_.end()) { | ||
return false; | ||
} | ||
auto& list = map_it->second; | ||
list.erase(it); | ||
return list.empty(); | ||
} | ||
mattklein123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
void ClusterManagerImpl::ClusterDiscoveryManager::maybePostResetCallbacks() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry I'm a little confused on the purpose of this function. Can you clarify? I think you can just leave the callbacks handle installed the entire time? This is related to my other comment on the ensure function. |
||
if (!callbacks_handle_cleanup_posted_ && pending_clusters_.empty()) { | ||
parent_.thread_local_dispatcher_.post([this] { | ||
// Something might got added in the meantime, so check the map again. | ||
if (pending_clusters_.empty()) { | ||
callbacks_handle_.reset(); | ||
callbacks_.reset(); | ||
} | ||
callbacks_handle_cleanup_posted_ = false; | ||
}); | ||
callbacks_handle_cleanup_posted_ = true; | ||
} | ||
} | ||
|
||
OdCdsApiHandleSharedPtr | ||
ClusterManagerImpl::allocateOdCdsApi(const envoy::config::core::v3::ConfigSource& odcds_config, | ||
OptRef<xds::core::v3::ResourceLocator> odcds_resources_locator, | ||
ProtobufMessage::ValidationVisitor& validation_visitor) { | ||
// TODO(krnowak): Instead of creating a new handle every time, store the handles internally and | ||
// return an already existing one if the config or locator matches. Note that this may need a way | ||
// to clean up the unused handles, so we can close the unnecessary connections. | ||
auto odcds = OdCdsApiImpl::create(odcds_config, odcds_resources_locator, *this, stats_, | ||
validation_visitor); | ||
return OdCdsApiHandleImpl::create(*this, std::move(odcds)); | ||
} | ||
|
||
ClusterDiscoveryCallbackHandlePtr | ||
ClusterManagerImpl::requestOnDemandClusterDiscovery(OdCdsApiHandleImplSharedPtr odcds_handle, | ||
const std::string& name, | ||
ClusterDiscoveryCallbackSharedPtr callback) { | ||
ThreadLocalClusterManagerImpl& cluster_manager = *tls_; | ||
|
||
auto [handle, discovery_in_progress] = cluster_manager.cdm_.addCallback(name, callback); | ||
if (discovery_in_progress) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might be useful to add a comment that this will only catch discoveries in progress in this thread; But if a reponse to the same request made in another thread arrives earlier, the callback will be invoked anyway. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will add it. |
||
// This worker thread has already requested a discovery of a cluster with this name, so nothing | ||
// more left to do here. | ||
return std::move(handle); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure you actually need an explicit move here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, let me check that. Maybe NRVO will kick in here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's needed. NRVO can't kick in here, because |
||
} | ||
// This seems to be the first request for discovery of this cluster in this worker thread. Rest of | ||
// the process may only happen in the main thread. | ||
dispatcher_.post([this, odcds_handle = std::move(odcds_handle), | ||
weak_callback = ClusterDiscoveryCallbackWeakPtr(callback), name, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above, it's captured by value. I'd need to write |
||
&thread_local_dispatcher = cluster_manager.thread_local_dispatcher_] { | ||
// Check for the cluster here too. It might have been added between the time when this closure | ||
// was posted and when it is being executed. | ||
if (getThreadLocalCluster(name) != nullptr) { | ||
if (weak_callback.expired()) { | ||
// Not only the cluster was added, but it was also already handled, so don't bother with | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry can't parse. Clarify? I think this just means that by the time we got here the request has expired? I actually wonder if it's worth handling this case, since it would just get handled during the post back? I might just delete to keep it simple? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By the time we got here the callback could have expired for several reasons:
The reason I added this code is to avoid posting the essentially noop callback to the worker thread, but I'll simplify it, if it's not a big deal. |
||
// posting the callback back to the worker thread. | ||
return; | ||
} | ||
thread_local_dispatcher.post([weak_callback] { | ||
if (auto callback = weak_callback.lock(); callback != nullptr) { | ||
// If this gets called here, it means that we requested a discovery of a cluster without | ||
// checking if that cluster is already known by cluster manager. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this actually true? Couldn't this be a race between threads? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, could be a rather long-winded race, true. Worker thread W1 checks for a cluster that is absent, gets preempted for worker thread W2, which does the same and starts the discovery process. Then main thread sends the gRPC request, receives the reply, warms up the cluster and adds it to the thread local CMs. Then W1 gets a chance to run again, and since it decided that the cluster is missing, it starts the discovery and hits this case in main thread. Long-winded to a point of starvation, so I'd say this is a highly unlikely possibility. But who knows what may happen under the really high load. I'll update the comment. |
||
(*callback)(ClusterDiscoveryStatus::Available); | ||
} | ||
}); | ||
return; | ||
} | ||
|
||
if (auto it = pending_cluster_creations_.find(name); it != pending_cluster_creations_.end()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add debug/trace level logging throughout this code/PR. It's going to be very difficult to debug this and understand what is going on. Please also check coverage to make sure all various branches are hit in the new code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will do. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added some, but I'm rather unsure about adding logging to the discovery manager - it's a thread local thing, so it might spam the log N times with the same message (especially when the discovery is finished and every worker thread is notified about the fact). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's OK to do that at trace level, and probably still worth it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, will add them then. |
||
// We already began the discovery process for this cluster, nothing to do. If we got here, it | ||
// means that it was other worker thread that requested the discovery. | ||
return; | ||
} | ||
auto& odcds = odcds_handle->getOdCds(); | ||
// Start the discovery. If the cluster gets discovered, cluster manager will warm it up and | ||
// invoke the cluster lifecycle callbacks, that will in turn invoke our callback. | ||
odcds.updateOnDemand(name); | ||
// Setup the discovery timeout timer to avoid keeping callbacks indefinitely. | ||
auto timer_cb = Event::TimerCb([this, name] { notifyExpiredDiscovery(name); }); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: just inline this next line |
||
auto timer = dispatcher_.createTimer(timer_cb); | ||
// TODO(krnowak): This timeout period is arbitrary. Should it be a parameter or do we keep it, | ||
// but rather with a name? | ||
timer->enableTimer(std::chrono::milliseconds(5000)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes the timeout should be passed as an API paramater. Also I think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, will add the timeout parameter. About the |
||
// Keep odcds handle alive for the duration of the discovery process. | ||
pending_cluster_creations_.insert( | ||
{std::move(name), ClusterCreation{std::move(odcds_handle), std::move(timer)}}); | ||
}); | ||
|
||
return std::move(handle); | ||
} | ||
|
||
void ClusterManagerImpl::notifyExpiredDiscovery(const std::string& name) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. debug log please There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have added there a debug log already - |
||
auto map_node_handle = pending_cluster_creations_.extract(name); | ||
if (map_node_handle.empty()) { | ||
return; | ||
} | ||
// Defer destroying the timer, so it's not destroyed during its callback. TimerPtr is a | ||
// unique_ptr, which is not copyable, but std::function is copyable, we turn a move-only | ||
// unique_ptr into a copyable shared_ptr and pass that to the std::function. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. None of this should be necessary. Timers are one shot and it should be safe to destroy them during callback invocation, so I think it's fine to just destroy the timer as this pending discovery expires. We shouldn't be passing timers between threads in any situation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code is expected to be executed in main thread, so the deferred destruction is also meant to happen in the main thread. But anyway, I'm happy to drop this, so I'll get rid of the clang-tidy complaint here too. |
||
dispatcher_.post([timer = std::shared_ptr<Event::Timer>( | ||
std::move(map_node_handle.mapped().expiration_timer_))] {}); | ||
|
||
// Let all the worker threads know that the discovery timed out. | ||
tls_.runOnAllThreads([name](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) { | ||
if (!cluster_manager.has_value()) { | ||
return; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this is possible? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's an opt-ref so I thought that the check would be prudent to do. I'll remove it. |
||
cluster_manager->cdm_.processClusterName(name, ClusterDiscoveryStatus::Missing); | ||
}); | ||
} | ||
|
||
ProtobufTypes::MessagePtr ClusterManagerImpl::dumpClusterConfigs() { | ||
auto config_dump = std::make_unique<envoy::admin::v3::ClustersConfigDump>(); | ||
config_dump->set_version_info(cds_api_ != nullptr ? cds_api_->versionInfo() : ""); | ||
|
@@ -1061,7 +1250,7 @@ ProtobufTypes::MessagePtr ClusterManagerImpl::dumpClusterConfigs() { | |
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ThreadLocalClusterManagerImpl( | ||
ClusterManagerImpl& parent, Event::Dispatcher& dispatcher, | ||
const absl::optional<LocalClusterParams>& local_cluster_params) | ||
: parent_(parent), thread_local_dispatcher_(dispatcher) { | ||
: parent_(parent), thread_local_dispatcher_(dispatcher), cdm_(*this) { | ||
// If local cluster is defined then we need to initialize it first. | ||
if (local_cluster_params.has_value()) { | ||
const auto& local_cluster_name = local_cluster_params->info_->name(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing that you'll need to add as we finalize on this PR is version history and also probably some docs like https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/on_demand_updates_filter.html. Maybe also update https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol to reference (places where CDS and VHDS crop up are a good start).