Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CM: Add on-demand cluster discovery functionality #18723

Merged
merged 14 commits into from
Feb 4, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 84 additions & 1 deletion envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace Envoy {
namespace Upstream {

/**
* ClusterUpdateCallbacks provide a way to exposes Cluster lifecycle events in the
* ClusterUpdateCallbacks provide a way to expose Cluster lifecycle events in the
* ClusterManager.
*/
class ClusterUpdateCallbacks {
Expand Down Expand Up @@ -73,6 +73,76 @@ class ClusterUpdateCallbacksHandle {

using ClusterUpdateCallbacksHandlePtr = std::unique_ptr<ClusterUpdateCallbacksHandle>;

/**
* Status enum for the result of an attempted cluster discovery.
*/
enum class ClusterDiscoveryStatus {
/**
* The discovery process timed out. This means that we haven't yet received any reply from
* on-demand CDS about it.
*/
Timeout,
/**
* The discovery process has concluded and on-demand CDS has no such cluster.
htuch marked this conversation as resolved.
Show resolved Hide resolved
*/
Missing,
/**
* Cluster found and currently available through ClusterManager.
*/
Available,
};

/**
* ClusterDiscoveryCallback is a callback called at the end of the on-demand cluster discovery
* process. The status of the discovery is sent as a parameter.
*/
using ClusterDiscoveryCallback = std::function<void(ClusterDiscoveryStatus)>;
using ClusterDiscoveryCallbackPtr = std::unique_ptr<ClusterDiscoveryCallback>;

/**
* ClusterDiscoveryCallbackHandle is a RAII wrapper for a ClusterDiscoveryCallback. Deleting the
* ClusterDiscoveryCallbackHandle will remove the callbacks from ClusterManager.
*/
class ClusterDiscoveryCallbackHandle {
public:
virtual ~ClusterDiscoveryCallbackHandle() = default;
};

using ClusterDiscoveryCallbackHandlePtr = std::unique_ptr<ClusterDiscoveryCallbackHandle>;

/**
* A handle to an on-demand CDS.
*/
class OdCdsApiHandle {
public:
virtual ~OdCdsApiHandle() = default;

/**
* Request an on-demand discovery of a cluster with a passed name. This ODCDS may be used to
* perform the discovery process in the main thread if there is no discovery going on for this
* cluster. When the requested cluster is added and warmed up, the passed callback will be invoked
* in the same thread that invoked this function.
*
* The returned handle can be destroyed to prevent the callback from being invoked. Note that the
* handle can only be destroyed in the same thread that invoked the function. Destroying the
* handle might not stop the discovery process, though. As soon as the callback is invoked,
* destroying the handle does nothing. It is a responsibility of the caller to make sure that the
* objects captured in the callback outlive the callback.
*
* This function is thread-safe.
*
* @param name is the name of the cluster to be discovered.
* @param callback will be called when the discovery is finished.
* @param timeout describes how long the operation may take before failing.
* @return the discovery process handle.
*/
virtual ClusterDiscoveryCallbackHandlePtr
requestOnDemandClusterDiscovery(absl::string_view name, ClusterDiscoveryCallbackPtr callback,
std::chrono::milliseconds timeout) PURE;
};

using OdCdsApiHandlePtr = std::unique_ptr<OdCdsApiHandle>;

class ClusterManagerFactory;

// These are per-cluster per-thread, so not "global" stats.
Expand Down Expand Up @@ -327,6 +397,19 @@ class ClusterManager {
* @param cluster, the cluster to check.
*/
virtual void checkActiveStaticCluster(const std::string& cluster) PURE;

/**
* Allocates an on-demand CDS API provider from configuration proto or locator.
*
* @param odcds_config is a configuration proto. Used when odcds_resources_locator is a nullopt.
* @param odcds_resources_locator is a locator for ODCDS. Used over odcds_config if not a nullopt.
* @param validation_visitor
* @return OdCdsApiHandlePtr the ODCDS handle.
*/
virtual OdCdsApiHandlePtr
allocateOdCdsApi(const envoy::config::core::v3::ConfigSource& odcds_config,
OptRef<xds::core::v3::ResourceLocator> odcds_resources_locator,
ProtobufMessage::ValidationVisitor& validation_visitor) PURE;
};

using ClusterManagerPtr = std::unique_ptr<ClusterManager>;
Expand Down
32 changes: 32 additions & 0 deletions source/common/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,46 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "od_cds_api_lib",
srcs = ["od_cds_api_impl.cc"],
hdrs = ["od_cds_api_impl.h"],
deps = [
":cds_api_helper_lib",
"//envoy/config:subscription_interface",
"//envoy/protobuf:message_validator_interface",
"//envoy/stats:stats_interface",
"//envoy/upstream:cluster_manager_interface",
"//source/common/common:minimal_logger_lib",
"//source/common/config:subscription_base_interface",
"//source/common/grpc:common_lib",
"//source/common/protobuf",
"@envoy_api//envoy/config/cluster/v3:pkg_cc_proto",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
],
)

envoy_cc_library(
name = "cluster_discovery_manager_lib",
srcs = ["cluster_discovery_manager.cc"],
hdrs = ["cluster_discovery_manager.h"],
deps = [
"//envoy/upstream:cluster_manager_interface",
"//source/common/common:enum_to_int",
"//source/common/common:minimal_logger_lib",
],
)

envoy_cc_library(
name = "cluster_manager_lib",
srcs = ["cluster_manager_impl.cc"],
hdrs = ["cluster_manager_impl.h"],
deps = [
":cds_api_lib",
":cluster_discovery_manager_lib",
":load_balancer_lib",
":load_stats_reporter_lib",
":od_cds_api_lib",
":ring_hash_lb_lib",
":subset_lb_lib",
"//envoy/api:api_interface",
Expand Down
2 changes: 1 addition & 1 deletion source/common/upstream/cds_api_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class CdsApiHelper : Logger::Loggable<Logger::Id::upstream> {

private:
ClusterManager& cm_;
std::string name_;
const std::string name_;
std::string system_version_info_;
};

Expand Down
171 changes: 171 additions & 0 deletions source/common/upstream/cluster_discovery_manager.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
#include "source/common/upstream/cluster_discovery_manager.h"

#include <functional>

#include "source/common/common/enum_to_int.h"

namespace Envoy {
namespace Upstream {

namespace {

using ClusterAddedCb = std::function<void(ThreadLocalCluster&)>;

class ClusterCallbacks : public ClusterUpdateCallbacks {
public:
ClusterCallbacks(ClusterAddedCb cb) : cb_(std::move(cb)) {}

void onClusterAddOrUpdate(ThreadLocalCluster& cluster) override { cb_(cluster); };

void onClusterRemoval(const std::string&) override {}

private:
ClusterAddedCb cb_;
};

} // namespace

ClusterDiscoveryManager::ClusterDiscoveryManager(
std::string thread_name, ClusterLifecycleCallbackHandler& lifecycle_callbacks_handler)
htuch marked this conversation as resolved.
Show resolved Hide resolved
: thread_name_(std::move(thread_name)) {
callbacks_ = std::make_unique<ClusterCallbacks>([this](ThreadLocalCluster& cluster) {
ENVOY_LOG(trace,
"cm cdm: starting processing cluster name {} (status {}) from cluster lifecycle "
"callback in {}",
cluster.info()->name(), enumToInt(ClusterDiscoveryStatus::Available), thread_name_);
processClusterName(cluster.info()->name(), ClusterDiscoveryStatus::Available);
});
callbacks_handle_ = lifecycle_callbacks_handler.addClusterUpdateCallbacks(*callbacks_);
}

void ClusterDiscoveryManager::processClusterName(absl::string_view name,
ClusterDiscoveryStatus cluster_status) {
auto callback_items = extractCallbackList(name);
if (callback_items.empty()) {
ENVOY_LOG(trace, "cm cdm: no callbacks for the cluster name {} in {}", name, thread_name_);
return;
}
ENVOY_LOG(trace, "cm cdm: invoking {} callbacks for the cluster name {} in {}",
callback_items.size(), name, thread_name_);
for (auto& item : callback_items) {
auto callback = std::move(item->callback_);
// This invalidates the handle and the invoker.
item.reset();
// The callback could be null when handle was destroyed during the
// previous callback.
if (callback != nullptr) {
(*callback)(cluster_status);
}
}
}

ClusterDiscoveryManager::AddedCallbackData
ClusterDiscoveryManager::addCallback(std::string name, ClusterDiscoveryCallbackPtr callback) {
ENVOY_LOG(trace, "cm cdm: adding callback for the cluster name {} in {}", name, thread_name_);
auto& callbacks_list = pending_clusters_[name];
auto item_weak_ptr = addCallbackInternal(callbacks_list, std::move(callback));
auto handle = std::make_unique<ClusterDiscoveryCallbackHandleImpl>(*this, name, item_weak_ptr);
CallbackInvoker invoker(*this, std::move(name), std::move(item_weak_ptr));
auto discovery_in_progress = (callbacks_list.size() > 1);
return {std::move(handle), discovery_in_progress, std::move(invoker)};
}

void ClusterDiscoveryManager::swap(ClusterDiscoveryManager& other) {
thread_name_.swap(other.thread_name_);
pending_clusters_.swap(other.pending_clusters_);
callbacks_.swap(other.callbacks_);
callbacks_handle_.swap(other.callbacks_handle_);
}

void ClusterDiscoveryManager::invokeCallbackFromItem(absl::string_view name,
CallbackListItemWeakPtr item_weak_ptr,
ClusterDiscoveryStatus cluster_status) {
auto item_ptr = item_weak_ptr.lock();
if (item_ptr == nullptr) {
ENVOY_LOG(trace, "cm cdm: not invoking an already stale callback for cluster {} in {}", name,
thread_name_);
return;
}
ENVOY_LOG(trace, "cm cdm: invoking a callback for cluster {} in {}", name, thread_name_);
auto callback = std::move(item_ptr->callback_);
if (item_ptr->self_iterator_.has_value()) {
eraseItem(name, std::move(item_ptr));
} else {
ENVOY_LOG(trace,
"cm cdm: the callback for cluster {} in {} is prepared for invoking during "
"processing, yet some other callback tries to invoke this callback earlier",
name, thread_name_);
}
if (callback != nullptr) {
(*callback)(cluster_status);
} else {
ENVOY_LOG(trace, "cm cdm: the callback for cluster {} in {} is prepared for invoking during "
"processing, yet some other callback destroyed its handle in the meantime");
}
}

ClusterDiscoveryManager::CallbackList
ClusterDiscoveryManager::extractCallbackList(absl::string_view name) {
auto map_node_handle = pending_clusters_.extract(name);
if (map_node_handle.empty()) {
return {};
}
CallbackList extracted;
map_node_handle.mapped().swap(extracted);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is swapping necessary here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me think about this case, it's been a while since I wrote it. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function wants to steal the callback list from the pending_clusters_ map. It seems to me that the only way to steal something from a map is to use extract, which gives you a "map node". To actually steal the value, I'd need to either do a swap like I did, or maybe do:

CallbackList extracted = std::move(map_node_handle.mapped())

Maybe another way could also be:

CallbackList extracted = std::move(pending_clusters_[name]);
pending_clusters_.erase(name);
return extracted;

for (auto& item : extracted) {
item->self_iterator_.reset();
}
return extracted;
}

ClusterDiscoveryManager::CallbackListItemWeakPtr
ClusterDiscoveryManager::addCallbackInternal(CallbackList& list,
ClusterDiscoveryCallbackPtr callback) {
auto item = std::make_shared<CallbackListItem>(std::move(callback));
auto it = list.emplace(list.end(), item);
item->self_iterator_ = std::move(it);
return item;
}

void ClusterDiscoveryManager::erase(absl::string_view name, CallbackListItemWeakPtr item_weak_ptr) {
auto item_ptr = item_weak_ptr.lock();
if (item_ptr == nullptr) {
ENVOY_LOG(trace, "cm cdm: not dropping a stale callback for the cluster name {} in {}", name,
thread_name_);
return;
}
ENVOY_LOG(trace, "cm cdm: dropping callback for the cluster name {} in {}", name, thread_name_);
if (!item_ptr->self_iterator_.has_value()) {
ENVOY_LOG(trace,
"cm cdm: callback for the cluster name {} in {} is not on the callbacks list "
"anymore, which means it is about to be invoked; preventing it",
name, thread_name_);
item_ptr->callback_.reset();
return;
}
eraseItem(name, std::move(item_ptr));
}

void ClusterDiscoveryManager::eraseItem(absl::string_view name,
CallbackListItemSharedPtr item_ptr) {
ASSERT(item_ptr != nullptr);
ASSERT(item_ptr->self_iterator_.has_value());
const bool drop_list = eraseFromList(name, item_ptr->self_iterator_.value());
item_ptr->self_iterator_.reset();
if (drop_list) {
ENVOY_LOG(trace, "cm cdm: dropped last callback for the cluster name {} in {}", name,
thread_name_);
pending_clusters_.erase(name);
}
}

bool ClusterDiscoveryManager::eraseFromList(absl::string_view name, CallbackListIterator it) {
auto map_it = pending_clusters_.find(name);
ASSERT(map_it != pending_clusters_.end());
auto& list = map_it->second;
list.erase(it);
return list.empty();
}

} // namespace Upstream
} // namespace Envoy
Loading