Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CM: Add on-demand cluster discovery functionality #15857

Closed
wants to merge 45 commits into from
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
0652500
CM, ODCDS: Add on-demand cluster discovery functionality
krnowak Mar 19, 2021
050c490
CM: Fix a typo
krnowak Apr 1, 2021
d5f74fb
CDS helper: Make name const
krnowak Apr 1, 2021
805ef37
test: Add tests for ODCDS functionality in CM
krnowak Apr 6, 2021
8cac4ac
ODCDS: Allow passing a resource locator
krnowak Apr 7, 2021
3c4cfa5
CM, test: Refactor to move discovery functionality to handle
krnowak Apr 8, 2021
75a0c46
CM, test: Fix formatting
krnowak Apr 8, 2021
a53ea84
Merge remote-tracking branch 'origin/main' into krnowak/odcds-cm-only
krnowak Apr 8, 2021
a718e3b
CM: Fix build issues
krnowak Apr 8, 2021
cbe797e
CM: Fix linking issues
krnowak Apr 8, 2021
bc1d0df
test: Fix build issues
krnowak Apr 8, 2021
b32112f
CM, test: Address review issues
krnowak Apr 9, 2021
4e52486
CM: test: Address review issues
krnowak Apr 12, 2021
7ea51f0
CM, test: Improve coverage a bit
krnowak Apr 13, 2021
fad4410
CM, test: Address review issues
krnowak Apr 14, 2021
4f854b5
CM: Address review issues
krnowak Apr 15, 2021
bb2e17c
test: Address review issues
krnowak Apr 20, 2021
caad394
Merge remote-tracking branch 'origin/main' into krnowak/odcds-cm-only
krnowak Apr 20, 2021
722562c
test: Fix build
krnowak Apr 20, 2021
2fd85dc
CM: Refactor cluster discovery manager
krnowak Apr 22, 2021
61a07d2
refactor of the discovery manager
krnowak May 5, 2021
1002639
refactor
krnowak May 5, 2021
c4eef58
CM: Add missing cluster status
krnowak May 5, 2021
161d896
Merge remote-tracking branch 'origin/main' into krnowak/odcds-cm-only
krnowak May 5, 2021
6058b3d
grpc: initial * hack
krnowak May 6, 2021
857f696
document the purpose of the map
krnowak May 6, 2021
d138b40
fix build
krnowak May 6, 2021
d694f4b
fixup! grpc: initial * hack
krnowak May 6, 2021
11031da
test with notifier
krnowak May 7, 2021
00cc3b3
fix format
krnowak May 7, 2021
1d11e12
Merge remote-tracking branch 'origin/main' into krnowak/odcds-cm-only
krnowak May 11, 2021
f912324
Merge remote-tracking branch 'origin/main' into krnowak/odcds-cm-only
krnowak May 18, 2021
ccc45c5
add invoker to dict
krnowak May 18, 2021
8464e01
fix comments
krnowak May 18, 2021
3c5ec84
delta-xds: Fix initial requests
krnowak May 22, 2021
1db17a0
more comment fixes
krnowak May 22, 2021
60394a4
delta-xds: add resource type
krnowak May 22, 2021
a513cb6
docs: update xds-protocol docs about explicit wildcard mode
krnowak May 22, 2021
7b3f78b
docs: further updates in xds protocol
krnowak May 22, 2021
bb3eb45
tests: fix clang-tidy, use nicemock
krnowak May 22, 2021
124179a
test: Add another case to discovery manager tests
krnowak May 25, 2021
788680a
test: Improve coverage around notifications
krnowak May 25, 2021
04f8f43
CM, test: Try to improve coverage around detection of discovery in pr…
krnowak May 25, 2021
411e72d
test: Improve coverage for explicit wildcard subscription mode
krnowak May 25, 2021
f6ee71a
config, test: Fixes for explicit wildcard mode and coverage improvements
krnowak May 25, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 71 additions & 1 deletion include/envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ namespace Envoy {
namespace Upstream {

/**
* ClusterUpdateCallbacks provide a way to exposes Cluster lifecycle events in the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing that you'll need to add as we finalize on this PR is version history and also probably some docs like https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/on_demand_updates_filter.html. Maybe also update https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol to reference (places where CDS and VHDS crop up are a good start).

* ClusterUpdateCallbacks provide a way to expose Cluster lifecycle events in the
* ClusterManager.
*/
class ClusterUpdateCallbacks {
Expand Down Expand Up @@ -72,6 +72,63 @@ class ClusterUpdateCallbacksHandle {

using ClusterUpdateCallbacksHandlePtr = std::unique_ptr<ClusterUpdateCallbacksHandle>;

/**
* Status enum for the result of an attempted cluster discovery.
*/
enum class ClusterDiscoveryStatus {
/**
* Cluster was not found during the discovery process.
*/
Missing,
/**
* Cluster found and currently available through ClusterManager.
*/
Available,
};

/**
* ClusterDiscoveryCallback is a callback called at the end of the on-demand cluster discovery
* process. The status of the discovery is sent as a parameter.
*/
using ClusterDiscoveryCallback = std::function<void(ClusterDiscoveryStatus)>;
using ClusterDiscoveryCallbackWeakPtr = std::weak_ptr<ClusterDiscoveryCallback>;
using ClusterDiscoveryCallbackSharedPtr = std::shared_ptr<ClusterDiscoveryCallback>;

/**
* ClusterDiscoveryCallbackHandle is a RAII wrapper for a ClusterDiscoveryCallback. Deleting the
* ClusterDiscoveryCallbackHandle will remove the callbacks from ClusterManager.
*/
class ClusterDiscoveryCallbackHandle {
public:
virtual ~ClusterDiscoveryCallbackHandle() = default;
};

using ClusterDiscoveryCallbackHandlePtr = std::unique_ptr<ClusterDiscoveryCallbackHandle>;

class OdCdsApiHandle {
public:
virtual ~OdCdsApiHandle() = default;

/**
* Request an on-demand discovery of a cluster with a passed name. This ODCDS may be used to
* perform the discovery process in the main thread if there is no discovery going on for this
* cluster. The passed callback will be invoked when the cluster is added and warmed up. It is
* expected that the callback will be destroyed when it is invoked. To cancel the discovery,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the cross thread issues here? If the request is issued on a worker thread, will the passed callback be potentially invoked and destructed on the main thread, or do we guarantee this happens on the originating worker thread?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The callback is always invoked in the same thread that requested the discovery. The worker thread is the sole owner of the callback, the main thread only has a weak access to it.

The expectation of the callback clearing itself is to avoid a situation where the callback could be called twice like:

Worker thread W1 requests the discovery of a cluster C. Main thread sees that this cluster is unknown and there is no discovery process for it ongoing, so it begins one. It sends a request, receives a reply and starts processing it in the main thread. Worker thread W2 in the same time also requests the discovery of cluster C (as it still not available for W2), so its callback is added to the discovery manager. In the main thread the processing of the reply is finished, so worker threads get notified about the added cluster C, so both W1 and W2 get notified (both had their callbacks in the discovery manager). And then in the main thread, the continuation of W2's discovery request is executed - it notices that the cluster C is already known, so it posts the callback back to W2 to be executed. And if the callback haven't cleared itself on the first call, it's going to be executed for the second time.

Please see the attached picture, hopefully makes the situation clearer:
discovery-callbacks

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels a bit fragile, why can't the discovery manager handle the clearing after invocation?

Regarding the diagram, this is excellent, can we include this somewhere? E.g. source/docs?

I think the thread model makes sense, can you explain at least the basic guarantee that callback is invoked on the thread it is registered on?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, how are we managing lifetime? What happens if the original request that registered the callback is already deleted (including state captured in the callback) by time the request completes?

I'm asking as it's really tricky to get these corner cases correct, @stevenzzzz @chaoqin-li1123 @dmitri-d all have experience in implemented on-demand SRDS/VHDS and the tricky race conditions around lifetime. Do you have tests that capture all these possibilities?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels a bit fragile, why can't the discovery manager handle the clearing after invocation?

This is a good idea, I'll investigate it. I can imagine requestOnDemandClusterDiscovery taking a unique_ptr to the callback, instead of shared_ptr, so the discovery manager becomes a sole manager of the callback. The returned handle would be a way to remove the callback from the manager. It should be a responsibility of a requester to make sure that whatever data are captured in the callback stay alive for the duration of the discovery or until the handle is destroyed.

I'll try to refactor it that way and see where it takes me.

Regarding the diagram, this is excellent, can we include this somewhere? E.g. source/docs?

Thanks. I can make some diagrams like this to explain the possible workflows during the discovery process. About this one specifically - it shows a scenario that we wanted to avoid, so it's probably not the best diagram for docs. :)

I think the thread model makes sense, can you explain at least the basic guarantee that callback is invoked on the thread it is registered on?

Sure, will update the comment.

Also, how are we managing lifetime? What happens if the original request that registered the callback is already deleted (including state captured in the callback) by time the request completes?

With current state of the code, both handle and the callback would also be deleted, so the discovery manager would either not see the callback (because destroying the handle should remove the callback) or will see it as a nullptr after locking (because it only has a weak_ptr to the callback), so it would not execute it.

I'm asking as it's really tricky to get these corner cases correct, @stevenzzzz @chaoqin-li1123 @dmitri-d all have experience in implemented on-demand SRDS/VHDS and the tricky race conditions around lifetime. Do you have tests that capture all these possibilities?

The tests capturing these possibilities are tricky to write as unit tests, because mock tls assumes just one thread, and mock dispatcher executes the callbacks immediately. Not sure if integration tests would make testing it easier either (no fine grained control of the discovery process and this feature is not really exposed yet in any way).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chaoqin-li1123 @dmitri-d can you point to some example tests that you wrote that captured these lifetime issues for on-demand SRDS or VHDS?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, if you are relying shared/weak pointer semantics for lifetime safety, don't worry about tryign to convert to a unique_ptr..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored it anyway, because I found some lifetime issues with handles (basically make a request for cluster foo, wait for the callback to be invoked, but keep the handle, request the same cluster again and then destroy the old handle, all of it in the same thread and we end up using a dangling iterator inside the destroyed handle).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the link. I think I skimmed through it some time ago, when I was writing the integration test for the previous PR. Not sure if I can write the integration tests for this PR, since the cluster discovery functionality is not yet exposed in any way.

* destroy the returned handle and the callback.
*
* This function is thread-safe.
*
* @param name is the name of the cluster to be discovered.
* @param callback will be called when the discovery is finished.
* @return ClusterDiscoveryCallbackHandlePtr the discovery process handle.
*/
virtual ClusterDiscoveryCallbackHandlePtr
requestOnDemandClusterDiscovery(const std::string& name,
ClusterDiscoveryCallbackWeakPtr callback) PURE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I commented on this previously, but in general it doesn't make sense to pass a weak ptr here. The callback should exist at the time of the call. I would let the callee store it as a weak ptr.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I thought earlier it was only about the odcds weak pointer. Will change it.

};

using OdCdsApiHandleSharedPtr = std::shared_ptr<OdCdsApiHandle>;

class ClusterManagerFactory;

// These are per-cluster per-thread, so not "global" stats.
Expand Down Expand Up @@ -309,6 +366,19 @@ class ClusterManager {
virtual const ClusterRequestResponseSizeStatNames&
clusterRequestResponseSizeStatNames() const PURE;
virtual const ClusterTimeoutBudgetStatNames& clusterTimeoutBudgetStatNames() const PURE;

/**
* Allocates an on-demand CDS API provider from configuration proto or locator.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the semantics when we are using CDS and have both regular CDS wildcard subscription and OCDS on the same stream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. To rehash:

  • Once CDS subscription enters the wildcard mode, it should ignore requests for specific cluster names.
  • ODCDS currently always sends requests with specific cluster names.

So, it looks like those two currently can't work together in one stream. Now, I don't know if this ignoring of requests with specific resource names while being in wildcard mode is enforced on Envoy side or this is an expectation from the config server. I don't know what is the rationale behind this restriction, so I'd ask if lifting this restriction would be an acceptable idea.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @markdroth added that restriction in https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol. I also agree it would be good to be able to combine wildcard and on-demand CDS here on the same stream. Thoughts @markdroth?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As currently defined, it's not possible to have both a wildcard subscription and subscription to specific resource names on the same stream. Note, however, that wildcard subscriptions are intended to go away as part of the new naming scheme, so one option here would be to simply say that you can't do both unless you migrate to the new naming scheme.

If that's not feasible, we could look into making some tranport protocol change to support this. For example, maybe the wildcard request could be represented by resource_names: "*" instead of leaving resource_names completely unset. But we'd need to carefully look through the spec and make sure that there are no non-obvious implications of doing that. I think it should be okay, because it doesn't change the two things that are really special about responses from wildcard subscriptions: the fact that the server must send all existing resources in every response and the fact that if a resource is no longer present in a response, the client can assume it's been deleted on the server.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As currently defined, it's not possible to have both a wildcard subscription and subscription to specific resource names on the same stream. Note, however, that wildcard subscriptions are intended to go away as part of the new naming scheme, so one option here would be to simply say that you can't do both unless you migrate to the new naming scheme.

Is there a reason why both wildcard subscription and subscription to specific resource names are impossible on the same stream? I can see the conflict between SotW wildcard subscription and subscription to specific resource names (SotW update would likely delete the clusters we got on-demand). Such conflict wouldn't exist with delta wildcard subscription. I suppose I could limit the on-demand CDS to be usable only with delta gRPC.

About the naming scheme - I need to read about this, so can't say much about it. Just wanted to say that I dunno if cluster names coming from cluster_header action checking the Host HTTP header fit into a new naming scheme.

If that's not feasible, we could look into making some tranport protocol change to support this. For example, maybe the wildcard request could be represented by resource_names: "*" instead of leaving resource_names completely unset. But we'd need to carefully look through the spec and make sure that there are no non-obvious implications of doing that. I think it should be okay, because it doesn't change the two things that are really special about responses from wildcard subscriptions: the fact that the server must send all existing resources in every response and the fact that if a resource is no longer present in a response, the client can assume it's been deleted on the server.

Wouldn't changing the wildcard request representation to resource_names: "*" be a breaking change for the existing config servers?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG. Let's update https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol as part of this proposal then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If CDS is configured and ODCDS is not configured, what initial request will contain in resource names? An empty list or "*"? What I'm asking here is how do we decide when to send an empty list vs. "*". Is this decided statically (like for CDS we will always send "*", because we will have ODCDS, but for EDS we will send an empty list, because there is no such thing like ODEDS?

I suppose that this "static" decision would be the way to go, because CDS is usually configured statically, while ODCDS might be configured much later (for example, ODCDS could be a part of some route configuration received from RDS). I'm probably wrong somewhere, so please correct me if it's the case. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we require this to be explicitly configured, then we need to figure out how to handle the case where someone configures CDS to leave the field unset instead of setting it to * but then later configures ODCDS. It seems better to figure this out automatically, so that it's not possible to configure this incorrectly.

Note that currently, wildcard mode is determined based on the first request for that resource type on a given stream, and there is no way to subsequently switch in or out of wildcard mode for that resource type on that stream. So if the first CDS request on a stream has the resource_names field unset, then the stream is in wildcard mode for CDS resources, and the field is ignored in all subsequent CDS requests on that stream. Conversely, if the first CDS request on a stream has the resource_names field set, then the stream is in non-wildcard mode for CDS resources, and the field is honored in all subsequent CDS requests on that stream; after that, if a request is sent with the field unset, that is interpretted as unsubscribing from all CDS resources instead of being interpretted as a wildcard request. For details, see:

https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#how-the-client-specifies-what-resources-to-return
https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#unsubscribing-from-resources

I think that what we should do here is to change the spec to make it possible to switch out of wildcard code after the first request on a stream. However, it will still be impossible to switch into wildcard mode once in non-wildcard mode, because otherwise there will be no way to unsubscribe from the last resource of a given type. In other words:

  • If the first request for a given resource type on a stream has the resource_names field unset, then it's in wildcard mode.
  • If a subsequent request for that resource type on the stream has the resource_names field set, then the stream switches to non-wildcard mode. If the client wants to retain the original wildcard subscription, then the resource_names field must include *; if it includes only other resource names but no *, that means to remove from the wildcard subscription and add whatever other subscriptions are specified.
  • Once a stream has seen at least one non-wildcard subscription for a given resource type, a subsequent request for that same resource type with the resource_names field unset means to unsubscribe to everything; it does not indicate a wildcard subscription. Clients wishing to unsubscribe to non-wildcard subscriptions but retain a wildcard subscription must continue to send * for the duration of the stream.

Note that in terms of the spec, I think we can say that servers should be prepared to accept * to mean wildcard even at the start of a stream, even when there are no non-wildcard subscriptions. But we don't ever actually need to send that in Envoy, so in practice there won't be any backward-compatibility problems.

In terms of the Envoy implementation, I think we can figure out what to send automatically at the moment that we send each request. We just need to track a bool indicating whether the stream has previously seen a non-wildcard subscription for a given resource type. If it hasn't, then we can leave the field unset; if it has, then we send * instead.

FWIW, note that EDS never makes wildcard requests. The only resource types that make wildcard requests are LDS, CDS, and (I think) SRDS.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG, I think your third case doesn't apply with delta xDS though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, for delta xDS, instead of the resource_names field being unset, it would be sending resource_names_unsubscribe for all current subscriptions. But I think the same concept would apply: once the set of subscribed resource locators goes back to being the empty set, that indicates unsubscribing to all, not a wildcard subscription.

*
* @param odcds_config is a configuration proto. Used when odcds_resources_locator is a nullptr.
* @param odcds_resources_locator is a locator for ODCDS.
* @param validation_visitor
* @return OdCdsApiHandleSharedPtr the ODCDS handle.
*/
virtual OdCdsApiHandleSharedPtr
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I see why you uses shared_ptr here (to deal with lifetime issues), but in a perfect world from an API perspective this should be a unique_ptr return and any needed shared/weak ptr gymnastics should be done under the hood. From a user perspective I would expect to allocate a handle in my filter, etc. main thread init, use that across different threads, and then destroy it when the filter is destroyed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this makes sense. Thanks for the insight. I have made it an unique_ptr that holds a shared_ptr to OdCdsApi.

allocateOdCdsApi(const envoy::config::core::v3::ConfigSource& odcds_config,
const xds::core::v3::ResourceLocator* odcds_resources_locator,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would use OptRef here, and clarify above that this is use if provided over odcds_config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a pointer, because I copied that from CdsApiImpl. Should CdsApiImpl be fixed too?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's easy to fix, yes, otherwise I wouldn't worry about it for now.

ProtobufMessage::ValidationVisitor& validation_visitor) PURE;
};

using ClusterManagerPtr = std::unique_ptr<ClusterManager>;
Expand Down
20 changes: 20 additions & 0 deletions source/common/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,25 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "od_cds_api_lib",
srcs = ["od_cds_api_impl.cc"],
hdrs = ["od_cds_api_impl.h"],
deps = [
":cds_api_helper_lib",
"//include/envoy/config:subscription_interface",
"//include/envoy/protobuf:message_validator_interface",
"//include/envoy/stats:stats_interface",
"//include/envoy/upstream:cluster_manager_interface",
"//source/common/common:minimal_logger_lib",
"//source/common/config:subscription_base_interface",
"//source/common/grpc:common_lib",
"//source/common/protobuf",
"@envoy_api//envoy/config/cluster/v3:pkg_cc_proto",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
],
)

envoy_cc_library(
name = "cluster_manager_lib",
srcs = ["cluster_manager_impl.cc"],
Expand All @@ -52,6 +71,7 @@ envoy_cc_library(
":cds_api_lib",
":load_balancer_lib",
":load_stats_reporter_lib",
":od_cds_api_lib",
":ring_hash_lb_lib",
":subset_lb_lib",
"//include/envoy/api:api_interface",
Expand Down
2 changes: 1 addition & 1 deletion source/common/upstream/cds_api_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class CdsApiHelper : Logger::Loggable<Logger::Id::upstream> {

private:
ClusterManager& cm_;
std::string name_;
const std::string name_;
std::string system_version_info_;
};

Expand Down
195 changes: 194 additions & 1 deletion source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,7 @@ void ClusterManagerImpl::postThreadLocalClusterUpdate(ClusterManagerCluster& cm_
per_priority.overprovisioning_factor_ = host_set->overprovisioningFactor();
}

pending_cluster_creations_.erase(cm_cluster.cluster().info()->name());
tls_.runOnAllThreads(
[info = cm_cluster.cluster().info(), params = std::move(params), add_or_update_cluster,
load_balancer_factory](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
Expand Down Expand Up @@ -1027,6 +1028,198 @@ ClusterManagerImpl::addThreadLocalClusterUpdateCallbacks(ClusterUpdateCallbacks&
return std::make_unique<ClusterUpdateCallbacksHandleImpl>(cb, cluster_manager.update_callbacks_);
}

namespace {

using ClusterAddedCb = std::function<void(ThreadLocalCluster&)>;

class ClusterCallbacks : public ClusterUpdateCallbacks {
public:
ClusterCallbacks(ClusterAddedCb cb) : cb_(std::move(cb)) {}

void onClusterAddOrUpdate(ThreadLocalCluster& cluster) override { cb_(cluster); };

void onClusterRemoval(const std::string&) override {}

private:
ClusterAddedCb cb_;
};

} // namespace

ClusterManagerImpl::ClusterDiscoveryManager::ClusterDiscoveryManager(
ThreadLocalClusterManagerImpl& parent)
: parent_(parent) {}

void ClusterManagerImpl::ClusterDiscoveryManager::ensureCallbacksAreInstalled() {
if (callbacks_handle_) {
return;
}
auto cb = ClusterAddedCb([this](ThreadLocalCluster& cluster) {
processClusterName(cluster.info()->name(), ClusterDiscoveryStatus::Available);
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just inline next line

callbacks_ = std::make_unique<ClusterCallbacks>(cb);
callbacks_handle_ = parent_.parent_.addThreadLocalClusterUpdateCallbacks(*callbacks_);
}

void ClusterManagerImpl::ClusterDiscoveryManager::processClusterName(
const std::string& name, ClusterDiscoveryStatus cluster_status) {
auto map_node_handle = pending_clusters_.extract(name);
if (map_node_handle.empty()) {
return;
}
for (const auto& weak_callback : map_node_handle.mapped()) {
auto callback = weak_callback.lock();
if (callback != nullptr) {
(*callback)(cluster_status);
}
}
maybePostResetCallbacks();
}

ClusterManagerImpl::ClusterDiscoveryManager::Pair
ClusterManagerImpl::ClusterDiscoveryManager::addCallback(
const std::string& name, ClusterDiscoveryCallbackWeakPtr weak_callback) {
auto& callbacks_deque = pending_clusters_[name];
callbacks_deque.emplace_back(weak_callback);
auto handle =
std::make_unique<ClusterDiscoveryCallbackHandleImpl>(*this, std::move(weak_callback), name);
auto discovery_in_progress = (callbacks_deque.size() > 1);
return {std::move(handle), discovery_in_progress};
}

void ClusterManagerImpl::ClusterDiscoveryManager::erase(const std::string& name,
ClusterDiscoveryCallbackWeakPtr cb) {
const bool drop_deque = eraseFromDeque(name, cb);
if (drop_deque) {
pending_clusters_.erase(name);
}
maybePostResetCallbacks();
}

bool ClusterManagerImpl::ClusterDiscoveryManager::eraseFromDeque(
const std::string& name, ClusterDiscoveryCallbackWeakPtr cb) {
auto it = pending_clusters_.find(name);
if (it == pending_clusters_.end()) {
return false;
}
auto& deque = it->second;
// Could use std::erase_if, but it's only in C++20.
auto it2 = std::remove_if(deque.begin(), deque.end(),
[cb](ClusterDiscoveryCallbackWeakPtr const& weak_ptr) {
if (cb.owner_before(weak_ptr)) {
return false;
}
if (weak_ptr.owner_before(cb)) {
return false;
}
return true;
});
deque.erase(it2, deque.end());
return deque.empty();
}

void ClusterManagerImpl::ClusterDiscoveryManager::maybePostResetCallbacks() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I'm a little confused on the purpose of this function. Can you clarify? I think you can just leave the callbacks handle installed the entire time? This is related to my other comment on the ensure function.

if (!callbacks_handle_cleanup_posted_ && pending_clusters_.empty()) {
parent_.thread_local_dispatcher_.post([this] {
// Something might got added in the meantime, so check the
// map again.
if (pending_clusters_.empty()) {
callbacks_handle_.reset();
callbacks_.reset();
}
callbacks_handle_cleanup_posted_ = false;
});
callbacks_handle_cleanup_posted_ = true;
}
}

OdCdsApiHandleSharedPtr
ClusterManagerImpl::allocateOdCdsApi(const envoy::config::core::v3::ConfigSource& odcds_config,
const xds::core::v3::ResourceLocator* odcds_resources_locator,
ProtobufMessage::ValidationVisitor& validation_visitor) {
// TODO(krnowak): Instead of creating a new handle every time, store the handles internally and
// return an already existing one if the config or locator matches. Note that this may need a way
// to clean up the unused handles, so we can close the unnecessary connections.
auto odcds = OdCdsApiImpl::create(odcds_config, odcds_resources_locator, *this, stats_,
validation_visitor);
return OdCdsApiHandleImpl::create(*this, std::move(odcds));
}

ClusterDiscoveryCallbackHandlePtr
ClusterManagerImpl::requestOnDemandClusterDiscovery(OdCdsApiHandleImplSharedPtr odcds_handle,
const std::string& name,
ClusterDiscoveryCallbackWeakPtr weak_callback) {
ThreadLocalClusterManagerImpl& cluster_manager = *tls_;

cluster_manager.cdm_.ensureCallbacksAreInstalled();

auto [handle, discovery_in_progress] = cluster_manager.cdm_.addCallback(name, weak_callback);
if (discovery_in_progress) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be useful to add a comment that this will only catch discoveries in progress in this thread; But if a reponse to the same request made in another thread arrives earlier, the callback will be invoked anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add it.

// We have already started a discovery process for a cluster with
// this name, so nothing more left to do here.
return std::move(handle);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure you actually need an explicit move here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, let me check that. Maybe NRVO will kick in here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's needed. NRVO can't kick in here, because handle is a part of the structured binding [handle, discovery_in_progress] = …, so I think it's type is a reference, not a value, so it does not match this function's return type.

}
dispatcher_.post([this, odcds_handle = std::move(odcds_handle), weak_callback, name,
&thread_local_dispatcher = cluster_manager.thread_local_dispatcher_] {
// Check for the cluster here too. It might have been added
// between the time when this callback was posted and when it is
// being executed.
if (getThreadLocalCluster(name) != nullptr) {
if (weak_callback.expired()) {
// Not only the cluster was added, but it was also already
// handled, so don't bother with posting the callback back to
// the worker thread.
return;
}
thread_local_dispatcher.post([weak_callback] {
if (auto callback = weak_callback.lock(); callback != nullptr) {
// If this gets called here, it means that we requested a
// discovery of a cluster without checking if that cluster
// is already known by cluster manager.
(*callback)(ClusterDiscoveryStatus::Available);
}
});
return;
}

auto it = pending_cluster_creations_.find(name);
if (it != pending_cluster_creations_.end()) {
// We already began the discovery process for this cluster,
// nothing to do.
return;
}
auto& odcds = odcds_handle->getOdCds();
odcds.updateOnDemand(name);
auto timer_cb = Event::TimerCb([this, name] { notifyExpiredDiscovery(name); });
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just inline this next line

auto timer = dispatcher_.createTimer(timer_cb);
timer->enableTimer(std::chrono::milliseconds(5000));
// Keep odcds alive for the duration of the discovery process.
pending_cluster_creations_.insert(
{std::move(name), ClusterCreation{std::move(odcds_handle), std::move(timer)}});
});

return std::move(handle);
}

void ClusterManagerImpl::notifyExpiredDiscovery(const std::string& name) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug log please

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added there a debug log already - ENVOY_LOG(debug, "cm odcds: on-demand discovery for cluster {} timed out", name);. Did github place this comment in wrong place?

auto map_node_handle = pending_cluster_creations_.extract(name);
if (map_node_handle.empty()) {
return;
}
// Defer destroying the timer, so it's not destroyed during its
// callback. TimerPtr is a unique_ptr, which is not copyable, but
// std::function is copyable, we turn a move-only unique_ptr into a
// copyable shared_ptr and pass that to the std::function.
dispatcher_.post([timer = std::shared_ptr<Event::Timer>(
std::move(map_node_handle.mapped().expiration_timer_))] {});
tls_.runOnAllThreads([name](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
if (!cluster_manager.has_value()) {
return;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is possible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an opt-ref so I thought that the check would be prudent to do. I'll remove it.

cluster_manager->cdm_.processClusterName(name, ClusterDiscoveryStatus::Missing);
});
}

ProtobufTypes::MessagePtr ClusterManagerImpl::dumpClusterConfigs() {
auto config_dump = std::make_unique<envoy::admin::v3::ClustersConfigDump>();
config_dump->set_version_info(cds_api_ != nullptr ? cds_api_->versionInfo() : "");
Expand Down Expand Up @@ -1061,7 +1254,7 @@ ProtobufTypes::MessagePtr ClusterManagerImpl::dumpClusterConfigs() {
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ThreadLocalClusterManagerImpl(
ClusterManagerImpl& parent, Event::Dispatcher& dispatcher,
const absl::optional<LocalClusterParams>& local_cluster_params)
: parent_(parent), thread_local_dispatcher_(dispatcher) {
: parent_(parent), thread_local_dispatcher_(dispatcher), cdm_(*this) {
// If local cluster is defined then we need to initialize it first.
if (local_cluster_params.has_value()) {
const auto& local_cluster_name = local_cluster_params->info_->name();
Expand Down
Loading