diff --git a/envoy/upstream/cluster_manager.h b/envoy/upstream/cluster_manager.h index e24790038d24..3d5bdb57a901 100644 --- a/envoy/upstream/cluster_manager.h +++ b/envoy/upstream/cluster_manager.h @@ -40,7 +40,7 @@ namespace Envoy { namespace Upstream { /** - * ClusterUpdateCallbacks provide a way to exposes Cluster lifecycle events in the + * ClusterUpdateCallbacks provide a way to expose Cluster lifecycle events in the * ClusterManager. */ class ClusterUpdateCallbacks { @@ -73,6 +73,76 @@ class ClusterUpdateCallbacksHandle { using ClusterUpdateCallbacksHandlePtr = std::unique_ptr; +/** + * Status enum for the result of an attempted cluster discovery. + */ +enum class ClusterDiscoveryStatus { + /** + * The discovery process timed out. This means that we haven't yet received any reply from + * on-demand CDS about it. + */ + Timeout, + /** + * The discovery process has concluded and on-demand CDS has no such cluster. + */ + Missing, + /** + * Cluster found and currently available through ClusterManager. + */ + Available, +}; + +/** + * ClusterDiscoveryCallback is a callback called at the end of the on-demand cluster discovery + * process. The status of the discovery is sent as a parameter. + */ +using ClusterDiscoveryCallback = std::function; +using ClusterDiscoveryCallbackPtr = std::unique_ptr; + +/** + * ClusterDiscoveryCallbackHandle is a RAII wrapper for a ClusterDiscoveryCallback. Deleting the + * ClusterDiscoveryCallbackHandle will remove the callbacks from ClusterManager. + */ +class ClusterDiscoveryCallbackHandle { +public: + virtual ~ClusterDiscoveryCallbackHandle() = default; +}; + +using ClusterDiscoveryCallbackHandlePtr = std::unique_ptr; + +/** + * A handle to an on-demand CDS. + */ +class OdCdsApiHandle { +public: + virtual ~OdCdsApiHandle() = default; + + /** + * Request an on-demand discovery of a cluster with a passed name. This ODCDS may be used to + * perform the discovery process in the main thread if there is no discovery going on for this + * cluster. When the requested cluster is added and warmed up, the passed callback will be invoked + * in the same thread that invoked this function. + * + * The returned handle can be destroyed to prevent the callback from being invoked. Note that the + * handle can only be destroyed in the same thread that invoked the function. Destroying the + * handle might not stop the discovery process, though. As soon as the callback is invoked, + * destroying the handle does nothing. It is a responsibility of the caller to make sure that the + * objects captured in the callback outlive the callback. + * + * This function is thread-safe. + * + * @param name is the name of the cluster to be discovered. + * @param callback will be called when the discovery is finished. + * @param timeout describes how long the operation may take before failing. + * @return the discovery process handle. + */ + virtual ClusterDiscoveryCallbackHandlePtr + requestOnDemandClusterDiscovery(absl::string_view name, ClusterDiscoveryCallbackPtr callback, + std::chrono::milliseconds timeout) PURE; +}; + +using OdCdsApiHandlePtr = std::unique_ptr; + class ClusterManagerFactory; // These are per-cluster per-thread, so not "global" stats. @@ -327,6 +397,19 @@ class ClusterManager { * @param cluster, the cluster to check. */ virtual void checkActiveStaticCluster(const std::string& cluster) PURE; + + /** + * Allocates an on-demand CDS API provider from configuration proto or locator. + * + * @param odcds_config is a configuration proto. Used when odcds_resources_locator is a nullopt. + * @param odcds_resources_locator is a locator for ODCDS. Used over odcds_config if not a nullopt. + * @param validation_visitor + * @return OdCdsApiHandlePtr the ODCDS handle. + */ + virtual OdCdsApiHandlePtr + allocateOdCdsApi(const envoy::config::core::v3::ConfigSource& odcds_config, + OptRef odcds_resources_locator, + ProtobufMessage::ValidationVisitor& validation_visitor) PURE; }; using ClusterManagerPtr = std::unique_ptr; diff --git a/source/common/upstream/BUILD b/source/common/upstream/BUILD index 8daa21b6f2d5..0c1b847ec1fb 100644 --- a/source/common/upstream/BUILD +++ b/source/common/upstream/BUILD @@ -44,14 +44,46 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "od_cds_api_lib", + srcs = ["od_cds_api_impl.cc"], + hdrs = ["od_cds_api_impl.h"], + deps = [ + ":cds_api_helper_lib", + "//envoy/config:subscription_interface", + "//envoy/protobuf:message_validator_interface", + "//envoy/stats:stats_interface", + "//envoy/upstream:cluster_manager_interface", + "//source/common/common:minimal_logger_lib", + "//source/common/config:subscription_base_interface", + "//source/common/grpc:common_lib", + "//source/common/protobuf", + "@envoy_api//envoy/config/cluster/v3:pkg_cc_proto", + "@envoy_api//envoy/config/core/v3:pkg_cc_proto", + ], +) + +envoy_cc_library( + name = "cluster_discovery_manager_lib", + srcs = ["cluster_discovery_manager.cc"], + hdrs = ["cluster_discovery_manager.h"], + deps = [ + "//envoy/upstream:cluster_manager_interface", + "//source/common/common:enum_to_int", + "//source/common/common:minimal_logger_lib", + ], +) + envoy_cc_library( name = "cluster_manager_lib", srcs = ["cluster_manager_impl.cc"], hdrs = ["cluster_manager_impl.h"], deps = [ ":cds_api_lib", + ":cluster_discovery_manager_lib", ":load_balancer_lib", ":load_stats_reporter_lib", + ":od_cds_api_lib", ":ring_hash_lb_lib", ":subset_lb_lib", "//envoy/api:api_interface", diff --git a/source/common/upstream/cds_api_helper.h b/source/common/upstream/cds_api_helper.h index 17384a6fb602..ac19b66d642a 100644 --- a/source/common/upstream/cds_api_helper.h +++ b/source/common/upstream/cds_api_helper.h @@ -37,7 +37,7 @@ class CdsApiHelper : Logger::Loggable { private: ClusterManager& cm_; - std::string name_; + const std::string name_; std::string system_version_info_; }; diff --git a/source/common/upstream/cluster_discovery_manager.cc b/source/common/upstream/cluster_discovery_manager.cc new file mode 100644 index 000000000000..ac6a87f3a78c --- /dev/null +++ b/source/common/upstream/cluster_discovery_manager.cc @@ -0,0 +1,171 @@ +#include "source/common/upstream/cluster_discovery_manager.h" + +#include + +#include "source/common/common/enum_to_int.h" + +namespace Envoy { +namespace Upstream { + +namespace { + +using ClusterAddedCb = std::function; + +class ClusterCallbacks : public ClusterUpdateCallbacks { +public: + ClusterCallbacks(ClusterAddedCb cb) : cb_(std::move(cb)) {} + + void onClusterAddOrUpdate(ThreadLocalCluster& cluster) override { cb_(cluster); }; + + void onClusterRemoval(const std::string&) override {} + +private: + ClusterAddedCb cb_; +}; + +} // namespace + +ClusterDiscoveryManager::ClusterDiscoveryManager( + std::string thread_name, ClusterLifecycleCallbackHandler& lifecycle_callbacks_handler) + : thread_name_(std::move(thread_name)) { + callbacks_ = std::make_unique([this](ThreadLocalCluster& cluster) { + ENVOY_LOG(trace, + "cm cdm: starting processing cluster name {} (status {}) from cluster lifecycle " + "callback in {}", + cluster.info()->name(), enumToInt(ClusterDiscoveryStatus::Available), thread_name_); + processClusterName(cluster.info()->name(), ClusterDiscoveryStatus::Available); + }); + callbacks_handle_ = lifecycle_callbacks_handler.addClusterUpdateCallbacks(*callbacks_); +} + +void ClusterDiscoveryManager::processClusterName(absl::string_view name, + ClusterDiscoveryStatus cluster_status) { + auto callback_items = extractCallbackList(name); + if (callback_items.empty()) { + ENVOY_LOG(trace, "cm cdm: no callbacks for the cluster name {} in {}", name, thread_name_); + return; + } + ENVOY_LOG(trace, "cm cdm: invoking {} callbacks for the cluster name {} in {}", + callback_items.size(), name, thread_name_); + for (auto& item : callback_items) { + auto callback = std::move(item->callback_); + // This invalidates the handle and the invoker. + item.reset(); + // The callback could be null when handle was destroyed during the + // previous callback. + if (callback != nullptr) { + (*callback)(cluster_status); + } + } +} + +ClusterDiscoveryManager::AddedCallbackData +ClusterDiscoveryManager::addCallback(std::string name, ClusterDiscoveryCallbackPtr callback) { + ENVOY_LOG(trace, "cm cdm: adding callback for the cluster name {} in {}", name, thread_name_); + auto& callbacks_list = pending_clusters_[name]; + auto item_weak_ptr = addCallbackInternal(callbacks_list, std::move(callback)); + auto handle = std::make_unique(*this, name, item_weak_ptr); + CallbackInvoker invoker(*this, std::move(name), std::move(item_weak_ptr)); + auto discovery_in_progress = (callbacks_list.size() > 1); + return {std::move(handle), discovery_in_progress, std::move(invoker)}; +} + +void ClusterDiscoveryManager::swap(ClusterDiscoveryManager& other) { + thread_name_.swap(other.thread_name_); + pending_clusters_.swap(other.pending_clusters_); + callbacks_.swap(other.callbacks_); + callbacks_handle_.swap(other.callbacks_handle_); +} + +void ClusterDiscoveryManager::invokeCallbackFromItem(absl::string_view name, + CallbackListItemWeakPtr item_weak_ptr, + ClusterDiscoveryStatus cluster_status) { + auto item_ptr = item_weak_ptr.lock(); + if (item_ptr == nullptr) { + ENVOY_LOG(trace, "cm cdm: not invoking an already stale callback for cluster {} in {}", name, + thread_name_); + return; + } + ENVOY_LOG(trace, "cm cdm: invoking a callback for cluster {} in {}", name, thread_name_); + auto callback = std::move(item_ptr->callback_); + if (item_ptr->self_iterator_.has_value()) { + eraseItem(name, std::move(item_ptr)); + } else { + ENVOY_LOG(trace, + "cm cdm: the callback for cluster {} in {} is prepared for invoking during " + "processing, yet some other callback tries to invoke this callback earlier", + name, thread_name_); + } + if (callback != nullptr) { + (*callback)(cluster_status); + } else { + ENVOY_LOG(trace, "cm cdm: the callback for cluster {} in {} is prepared for invoking during " + "processing, yet some other callback destroyed its handle in the meantime"); + } +} + +ClusterDiscoveryManager::CallbackList +ClusterDiscoveryManager::extractCallbackList(absl::string_view name) { + auto map_node_handle = pending_clusters_.extract(name); + if (map_node_handle.empty()) { + return {}; + } + CallbackList extracted; + map_node_handle.mapped().swap(extracted); + for (auto& item : extracted) { + item->self_iterator_.reset(); + } + return extracted; +} + +ClusterDiscoveryManager::CallbackListItemWeakPtr +ClusterDiscoveryManager::addCallbackInternal(CallbackList& list, + ClusterDiscoveryCallbackPtr callback) { + auto item = std::make_shared(std::move(callback)); + auto it = list.emplace(list.end(), item); + item->self_iterator_ = std::move(it); + return item; +} + +void ClusterDiscoveryManager::erase(absl::string_view name, CallbackListItemWeakPtr item_weak_ptr) { + auto item_ptr = item_weak_ptr.lock(); + if (item_ptr == nullptr) { + ENVOY_LOG(trace, "cm cdm: not dropping a stale callback for the cluster name {} in {}", name, + thread_name_); + return; + } + ENVOY_LOG(trace, "cm cdm: dropping callback for the cluster name {} in {}", name, thread_name_); + if (!item_ptr->self_iterator_.has_value()) { + ENVOY_LOG(trace, + "cm cdm: callback for the cluster name {} in {} is not on the callbacks list " + "anymore, which means it is about to be invoked; preventing it", + name, thread_name_); + item_ptr->callback_.reset(); + return; + } + eraseItem(name, std::move(item_ptr)); +} + +void ClusterDiscoveryManager::eraseItem(absl::string_view name, + CallbackListItemSharedPtr item_ptr) { + ASSERT(item_ptr != nullptr); + ASSERT(item_ptr->self_iterator_.has_value()); + const bool drop_list = eraseFromList(name, item_ptr->self_iterator_.value()); + item_ptr->self_iterator_.reset(); + if (drop_list) { + ENVOY_LOG(trace, "cm cdm: dropped last callback for the cluster name {} in {}", name, + thread_name_); + pending_clusters_.erase(name); + } +} + +bool ClusterDiscoveryManager::eraseFromList(absl::string_view name, CallbackListIterator it) { + auto map_it = pending_clusters_.find(name); + ASSERT(map_it != pending_clusters_.end()); + auto& list = map_it->second; + list.erase(it); + return list.empty(); +} + +} // namespace Upstream +} // namespace Envoy diff --git a/source/common/upstream/cluster_discovery_manager.h b/source/common/upstream/cluster_discovery_manager.h new file mode 100644 index 000000000000..920e40684903 --- /dev/null +++ b/source/common/upstream/cluster_discovery_manager.h @@ -0,0 +1,176 @@ +#pragma once + +#include +#include +#include +#include + +#include "envoy/upstream/cluster_manager.h" + +#include "source/common/common/logger.h" + +#include "absl/container/flat_hash_map.h" +#include "absl/strings/string_view.h" +#include "absl/types/optional.h" + +namespace Envoy { +namespace Upstream { + +/** + * A base class for cluster lifecycle handler. Mostly to avoid a dependency on + * ThreadLocalClusterManagerImpl in ClusterDiscoveryManager. + */ +class ClusterLifecycleCallbackHandler { +public: + virtual ~ClusterLifecycleCallbackHandler() = default; + + virtual ClusterUpdateCallbacksHandlePtr + addClusterUpdateCallbacks(ClusterUpdateCallbacks& cb) PURE; +}; + +/** + * A thread-local on-demand cluster discovery manager. It takes care of invoking the discovery + * callbacks in the event of a finished discovery. It does it by installing a cluster lifecycle + * callback that invokes the discovery callbacks when a matching cluster just got added. + * + * The manager is the sole owner of the added discovery callbacks. The only way to remove the + * callback from the manager is by destroying the discovery handle. + */ +class ClusterDiscoveryManager : Logger::Loggable { +private: + struct CallbackListItem; + using CallbackListItemSharedPtr = std::shared_ptr; + using CallbackListItemWeakPtr = std::weak_ptr; + using CallbackList = std::list; + using CallbackListIterator = CallbackList::iterator; + +public: + /** + * This class is used in a case when the cluster manager in the main thread notices that it + * already has the requested cluster, so instead of starting the discovery process, it schedules + * the invocation of the callback back to the thread that made the request. Invoking the request + * removes it from the manager. + */ + class CallbackInvoker { + public: + void invokeCallback(ClusterDiscoveryStatus cluster_status) const { + parent_.invokeCallbackFromItem(name_, item_weak_ptr_, cluster_status); + } + + private: + friend class ClusterDiscoveryManager; + + CallbackInvoker(ClusterDiscoveryManager& parent, std::string name, + CallbackListItemWeakPtr item_weak_ptr) + : parent_(parent), name_(std::move(name)), item_weak_ptr_(std::move(item_weak_ptr)) {} + + ClusterDiscoveryManager& parent_; + const std::string name_; + CallbackListItemWeakPtr item_weak_ptr_; + }; + + ClusterDiscoveryManager(std::string thread_name, + ClusterLifecycleCallbackHandler& lifecycle_callbacks_handler); + + /** + * Invoke the callbacks for the given cluster name. The discovery status is passed to the + * callbacks. After invoking the callbacks, they are dropped from the manager. + */ + void processClusterName(absl::string_view name, ClusterDiscoveryStatus cluster_status); + + /** + * A struct containing a discovery handle, information whether a discovery for a given cluster + * was already requested in this thread, and an immediate invocation context. + */ + struct AddedCallbackData { + ClusterDiscoveryCallbackHandlePtr handle_ptr_; + bool discovery_in_progress_; + CallbackInvoker invoker_; + }; + + /** + * Adds the discovery callback. Returns a handle and a boolean indicating whether this worker + * thread has already requested the discovery of a cluster with a given name. + */ + AddedCallbackData addCallback(std::string name, ClusterDiscoveryCallbackPtr callback); + + /** + * Swaps this manager with another. Used for tests only. + */ + void swap(ClusterDiscoveryManager& other); + +private: + /** + * An item in the callbacks list. It contains the iterator to itself inside the callbacks + * list. Since the list contains shared pointers to items, we know that the iterator is valid as + * long as the item is alive. + */ + struct CallbackListItem { + CallbackListItem(ClusterDiscoveryCallbackPtr callback) : callback_(std::move(callback)) {} + + ClusterDiscoveryCallbackPtr callback_; + absl::optional self_iterator_; + }; + + /** + * An implementation of discovery handle. Destroy it to drop the callback from the discovery + * manager. It won't stop the discovery process, though. + */ + class ClusterDiscoveryCallbackHandleImpl : public ClusterDiscoveryCallbackHandle { + public: + ClusterDiscoveryCallbackHandleImpl(ClusterDiscoveryManager& parent, std::string name, + CallbackListItemWeakPtr item_weak_ptr) + : parent_(parent), name_(std::move(name)), item_weak_ptr_(std::move(item_weak_ptr)) {} + + ~ClusterDiscoveryCallbackHandleImpl() override { + parent_.erase(name_, std::move(item_weak_ptr_)); + } + + private: + ClusterDiscoveryManager& parent_; + const std::string name_; + CallbackListItemWeakPtr item_weak_ptr_; + }; + + /** + * Invokes a callback stored in the item and removes it from the callbacks list, so it won't be + * invoked again. + */ + void invokeCallbackFromItem(absl::string_view name, CallbackListItemWeakPtr item_weak_ptr, + ClusterDiscoveryStatus cluster_status); + + /** + * Extracts the list of callbacks from the pending_clusters_ map. This action invalidates the + * self iterators in the items, so destroying the handle won't try to erase the element from the + * list using an invalid iterator. + */ + CallbackList extractCallbackList(absl::string_view name); + /** + * Creates and sets up the callback list item, adds to the list and returns a weak_ptr to the + * item. + */ + CallbackListItemWeakPtr addCallbackInternal(CallbackList& list, + ClusterDiscoveryCallbackPtr callback); + /** + * Drops the callback item from the discovery manager. It the item wasn't stale, the callback + * will not be invoked. Called when the discovery handle is destroyed. + */ + void erase(absl::string_view name, CallbackListItemWeakPtr item_weak_ptr); + /** + * Drops the callback item from the discovery manager. + */ + void eraseItem(absl::string_view name, CallbackListItemSharedPtr item_ptr); + /** + * Try to erase a callback from under the given iterator. It returns a boolean value indicating + * whether the dropped callback was a last one for the given cluster. + */ + bool eraseFromList(absl::string_view name, CallbackListIterator it); + + std::string thread_name_; + absl::flat_hash_map pending_clusters_; + std::unique_ptr callbacks_; + ClusterUpdateCallbacksHandlePtr callbacks_handle_; +}; + +} // namespace Upstream +} // namespace Envoy diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index b378e8afd7fc..6de1df19629a 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -1042,6 +1042,7 @@ void ClusterManagerImpl::postThreadLocalClusterUpdate(ClusterManagerCluster& cm_ HostMapConstSharedPtr host_map = cm_cluster.cluster().prioritySet().crossPriorityHostMap(); + pending_cluster_creations_.erase(cm_cluster.cluster().info()->name()); tls_.runOnAllThreads([info = cm_cluster.cluster().info(), params = std::move(params), add_or_update_cluster, load_balancer_factory, map = std::move(host_map)]( OptRef cluster_manager) { @@ -1132,7 +1133,129 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainConnP ClusterUpdateCallbacksHandlePtr ClusterManagerImpl::addThreadLocalClusterUpdateCallbacks(ClusterUpdateCallbacks& cb) { ThreadLocalClusterManagerImpl& cluster_manager = *tls_; - return std::make_unique(cb, cluster_manager.update_callbacks_); + return cluster_manager.addClusterUpdateCallbacks(cb); +} + +OdCdsApiHandlePtr +ClusterManagerImpl::allocateOdCdsApi(const envoy::config::core::v3::ConfigSource& odcds_config, + OptRef odcds_resources_locator, + ProtobufMessage::ValidationVisitor& validation_visitor) { + // TODO(krnowak): Instead of creating a new handle every time, store the handles internally and + // return an already existing one if the config or locator matches. Note that this may need a way + // to clean up the unused handles, so we can close the unnecessary connections. + auto odcds = OdCdsApiImpl::create(odcds_config, odcds_resources_locator, *this, *this, stats_, + validation_visitor); + return OdCdsApiHandleImpl::create(*this, std::move(odcds)); +} + +ClusterDiscoveryCallbackHandlePtr +ClusterManagerImpl::requestOnDemandClusterDiscovery(OdCdsApiSharedPtr odcds, std::string name, + ClusterDiscoveryCallbackPtr callback, + std::chrono::milliseconds timeout) { + ThreadLocalClusterManagerImpl& cluster_manager = *tls_; + + auto [handle, discovery_in_progress, invoker] = + cluster_manager.cdm_.addCallback(name, std::move(callback)); + // This check will catch requests for discoveries from this thread only. If other thread requested + // the same discovery, we will detect it in the main thread later. + if (discovery_in_progress) { + ENVOY_LOG(debug, + "cm odcds: on-demand discovery for cluster {} is already in progress, something else " + "in thread {} has already requested it", + name, cluster_manager.thread_local_dispatcher_.name()); + // This worker thread has already requested a discovery of a cluster with this name, so nothing + // more left to do here. + // + // We can't "just" return handle here, because handle is a part of the structured binding done + // above. So it's not really a ClusterDiscoveryCallbackHandlePtr, but more like + // ClusterDiscoveryCallbackHandlePtr&, so named return value optimization does not apply here - + // it needs to be moved. + return std::move(handle); + } + ENVOY_LOG( + debug, + "cm odcds: forwarding the on-demand discovery request for cluster {} to the main thread", + name); + // This seems to be the first request for discovery of this cluster in this worker thread. Rest of + // the process may only happen in the main thread. + dispatcher_.post([this, odcds = std::move(odcds), timeout, name = std::move(name), + invoker = std::move(invoker), + &thread_local_dispatcher = cluster_manager.thread_local_dispatcher_] { + // Check for the cluster here too. It might have been added between the time when this closure + // was posted and when it is being executed. + if (getThreadLocalCluster(name) != nullptr) { + ENVOY_LOG( + debug, + "cm odcds: the requested cluster {} is already known, posting the callback back to {}", + name, thread_local_dispatcher.name()); + thread_local_dispatcher.post([invoker = std::move(invoker)] { + invoker.invokeCallback(ClusterDiscoveryStatus::Available); + }); + return; + } + + if (auto it = pending_cluster_creations_.find(name); it != pending_cluster_creations_.end()) { + ENVOY_LOG(debug, "cm odcds: on-demand discovery for cluster {} is already in progress", name); + // We already began the discovery process for this cluster, nothing to do. If we got here, it + // means that it was other worker thread that requested the discovery. + return; + } + // Start the discovery. If the cluster gets discovered, cluster manager will warm it up and + // invoke the cluster lifecycle callbacks, that will in turn invoke our callback. + odcds->updateOnDemand(name); + // Setup the discovery timeout timer to avoid keeping callbacks indefinitely. + auto timer = dispatcher_.createTimer([this, name] { notifyExpiredDiscovery(name); }); + timer->enableTimer(timeout); + // Keep odcds handle alive for the duration of the discovery process. + pending_cluster_creations_.insert( + {std::move(name), ClusterCreation{std::move(odcds), std::move(timer)}}); + }); + + // We can't "just" return handle here, because handle is a part of the structured binding done + // above. So it's not really a ClusterDiscoveryCallbackHandlePtr, but more like + // ClusterDiscoveryCallbackHandlePtr&, so named return value optimization does not apply here - it + // needs to be moved. + return std::move(handle); +} + +void ClusterManagerImpl::notifyMissingCluster(absl::string_view name) { + ENVOY_LOG(debug, "cm odcds: cluster {} not found during on-demand discovery", name); + notifyClusterDiscoveryStatus(name, ClusterDiscoveryStatus::Missing); +} + +void ClusterManagerImpl::notifyExpiredDiscovery(absl::string_view name) { + ENVOY_LOG(debug, "cm odcds: on-demand discovery for cluster {} timed out", name); + notifyClusterDiscoveryStatus(name, ClusterDiscoveryStatus::Timeout); +} + +void ClusterManagerImpl::notifyClusterDiscoveryStatus(absl::string_view name, + ClusterDiscoveryStatus status) { + auto map_node_handle = pending_cluster_creations_.extract(name); + if (map_node_handle.empty()) { + // Not a cluster we are interested in. This may happen when ODCDS + // receives some cluster name in removed resources field and + // notifies the cluster manager about it. + return; + } + // Let all the worker threads know that the discovery timed out. + tls_.runOnAllThreads( + [name = std::string(name), status](OptRef cluster_manager) { + ENVOY_LOG( + trace, + "cm cdm: starting processing cluster name {} (status {}) from the expired timer in {}", + name, enumToInt(status), cluster_manager->thread_local_dispatcher_.name()); + cluster_manager->cdm_.processClusterName(name, status); + }); +} + +ClusterDiscoveryManager +ClusterManagerImpl::createAndSwapClusterDiscoveryManager(std::string thread_name) { + ThreadLocalClusterManagerImpl& cluster_manager = *tls_; + ClusterDiscoveryManager cdm(std::move(thread_name), cluster_manager); + + cluster_manager.cdm_.swap(cdm); + + return cdm; } ProtobufTypes::MessagePtr @@ -1176,7 +1299,7 @@ ClusterManagerImpl::dumpClusterConfigs(const Matchers::StringMatcher& name_match ClusterManagerImpl::ThreadLocalClusterManagerImpl::ThreadLocalClusterManagerImpl( ClusterManagerImpl& parent, Event::Dispatcher& dispatcher, const absl::optional& local_cluster_params) - : parent_(parent), thread_local_dispatcher_(dispatcher) { + : parent_(parent), thread_local_dispatcher_(dispatcher), cdm_(dispatcher.name(), *this) { // If local cluster is defined then we need to initialize it first. if (local_cluster_params.has_value()) { const auto& local_cluster_name = local_cluster_params->info_->name(); @@ -1351,6 +1474,12 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::getHttpConnPoolsContainer( return &container_iter->second; } +ClusterUpdateCallbacksHandlePtr +ClusterManagerImpl::ThreadLocalClusterManagerImpl::addClusterUpdateCallbacks( + ClusterUpdateCallbacks& cb) { + return std::make_unique(cb, update_callbacks_); +} + ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::ClusterEntry( ThreadLocalClusterManagerImpl& parent, ClusterInfoConstSharedPtr cluster, const LoadBalancerFactorySharedPtr& lb_factory) diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 30d58cf5d8de..01499c150000 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -33,7 +33,9 @@ #include "source/common/http/alternate_protocols_cache_manager_impl.h" #include "source/common/http/async_client_impl.h" #include "source/common/quic/quic_stat_names.h" +#include "source/common/upstream/cluster_discovery_manager.h" #include "source/common/upstream/load_stats_reporter.h" +#include "source/common/upstream/od_cds_api_impl.h" #include "source/common/upstream/priority_conn_pool_map.h" #include "source/common/upstream/upstream_impl.h" #include "source/server/factory_context_base_impl.h" @@ -228,7 +230,9 @@ struct ClusterManagerStats { * Implementation of ClusterManager that reads from a proto configuration, maintains a central * cluster list, as well as thread local caches of each cluster and associated connection pools. */ -class ClusterManagerImpl : public ClusterManager, Logger::Loggable { +class ClusterManagerImpl : public ClusterManager, + public MissingClusterNotifier, + Logger::Loggable { public: ClusterManagerImpl(const envoy::config::bootstrap::v3::Bootstrap& bootstrap, ClusterManagerFactory& factory, Stats::Store& stats, @@ -293,6 +297,11 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable odcds_resources_locator, + ProtobufMessage::ValidationVisitor& validation_visitor) override; + ClusterManagerFactory& clusterManagerFactory() override { return factory_; } Config::SubscriptionFactory& subscriptionFactory() override { return subscription_factory_; } @@ -320,6 +329,9 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable per_priority_update_params_; }; + /** + * An implementation of an on-demand CDS handle. It forwards the discovery request to the cluster + * manager that created the handle. + * + * It's a protected type, so unit tests can use it. + */ + class OdCdsApiHandleImpl : public OdCdsApiHandle { + public: + static OdCdsApiHandlePtr create(ClusterManagerImpl& parent, OdCdsApiSharedPtr odcds) { + return std::make_unique(parent, std::move(odcds)); + } + + OdCdsApiHandleImpl(ClusterManagerImpl& parent, OdCdsApiSharedPtr odcds) + : parent_(parent), odcds_(std::move(odcds)) { + ASSERT(odcds_ != nullptr); + } + + ClusterDiscoveryCallbackHandlePtr + requestOnDemandClusterDiscovery(absl::string_view name, ClusterDiscoveryCallbackPtr callback, + std::chrono::milliseconds timeout) override { + return parent_.requestOnDemandClusterDiscovery(odcds_, std::string(name), std::move(callback), + timeout); + } + + private: + ClusterManagerImpl& parent_; + OdCdsApiSharedPtr odcds_; + }; + virtual void postThreadLocalClusterUpdate(ClusterManagerCluster& cm_cluster, ThreadLocalClusterUpdateParams&& params); + /** + * Notifies cluster discovery managers in each worker thread that the discovery process for the + * cluster with a passed name has timed out. + * + * It's protected, so the tests can use it. + */ + void notifyExpiredDiscovery(absl::string_view name); + + /** + * Creates a new discovery manager in current thread and swaps it with the one in thread local + * cluster manager. This could be used to simulate requesting a cluster from a different + * thread. Used for tests only. + * + * Protected, so tests can use it. + * + * @return the previous cluster discovery manager. + */ + ClusterDiscoveryManager createAndSwapClusterDiscoveryManager(std::string thread_name); + private: /** * Thread local cached cluster data. Each thread local cluster gets updates from the parent * central dynamic cluster (if applicable). It maintains load balancer state and any created * connection pools. */ - struct ThreadLocalClusterManagerImpl : public ThreadLocal::ThreadLocalObject { + struct ThreadLocalClusterManagerImpl : public ThreadLocal::ThreadLocalObject, + public ClusterLifecycleCallbackHandler { struct ConnPoolsContainer { ConnPoolsContainer(Event::Dispatcher& dispatcher, const HostConstSharedPtr& host) : pools_{std::make_shared(dispatcher, host)} {} @@ -493,6 +554,9 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable thread_local_clusters_; @@ -508,6 +572,7 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable update_callbacks_; const PrioritySet* local_priority_set_{}; bool destroying_{}; + ClusterDiscoveryManager cdm_; }; struct ClusterData : public ClusterManagerCluster { @@ -596,6 +661,17 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable; using ClusterUpdatesMap = absl::node_hash_map; + /** + * Holds a reference to an on-demand CDS to keep it alive for the duration of a cluster discovery, + * and an expiration timer notifying worker threads about discovery timing out. + */ + struct ClusterCreation { + OdCdsApiSharedPtr odcds_; + Event::TimerPtr expiration_timer_; + }; + + using ClusterCreationsMap = absl::flat_hash_map; + void applyUpdates(ClusterManagerCluster& cluster, uint32_t priority, PendingUpdates& updates); bool scheduleUpdate(ClusterManagerCluster& cluster, uint32_t priority, bool mergeable, const uint64_t timeout); @@ -617,10 +693,20 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable preconnect_pool); + ClusterDiscoveryCallbackHandlePtr + requestOnDemandClusterDiscovery(OdCdsApiSharedPtr odcds, std::string name, + ClusterDiscoveryCallbackPtr callback, + std::chrono::milliseconds timeout); + + void notifyClusterDiscoveryStatus(absl::string_view name, ClusterDiscoveryStatus status); + +private: ClusterManagerFactory& factory_; Runtime::Loader& runtime_; Stats::Store& stats_; ThreadLocal::TypedSlot tls_; + // Contains information about ongoing on-demand cluster discoveries. + ClusterCreationsMap pending_cluster_creations_; Random::RandomGenerator& random_; protected: diff --git a/source/common/upstream/od_cds_api_impl.cc b/source/common/upstream/od_cds_api_impl.cc new file mode 100644 index 000000000000..17c69424ad99 --- /dev/null +++ b/source/common/upstream/od_cds_api_impl.cc @@ -0,0 +1,112 @@ +#include "source/common/upstream/od_cds_api_impl.h" + +#include "source/common/common/assert.h" +#include "source/common/grpc/common.h" + +#include "absl/strings/str_join.h" + +namespace Envoy { +namespace Upstream { + +OdCdsApiSharedPtr +OdCdsApiImpl::create(const envoy::config::core::v3::ConfigSource& odcds_config, + OptRef odcds_resources_locator, + ClusterManager& cm, MissingClusterNotifier& notifier, Stats::Scope& scope, + ProtobufMessage::ValidationVisitor& validation_visitor) { + return OdCdsApiSharedPtr(new OdCdsApiImpl(odcds_config, odcds_resources_locator, cm, notifier, + scope, validation_visitor)); +} + +OdCdsApiImpl::OdCdsApiImpl(const envoy::config::core::v3::ConfigSource& odcds_config, + OptRef odcds_resources_locator, + ClusterManager& cm, MissingClusterNotifier& notifier, + Stats::Scope& scope, + ProtobufMessage::ValidationVisitor& validation_visitor) + : Envoy::Config::SubscriptionBase(validation_visitor, + "name"), + helper_(cm, "odcds"), cm_(cm), notifier_(notifier), + scope_(scope.createScope("cluster_manager.odcds.")), status_(StartStatus::NotStarted) { + // TODO(krnowak): Move the subscription setup to CdsApiHelper. Maybe make CdsApiHelper a base + // class for CDS and ODCDS. + const auto resource_name = getResourceName(); + if (!odcds_resources_locator.has_value()) { + subscription_ = cm_.subscriptionFactory().subscriptionFromConfigSource( + odcds_config, Grpc::Common::typeUrl(resource_name), *scope_, *this, resource_decoder_, {}); + } else { + subscription_ = cm.subscriptionFactory().collectionSubscriptionFromUrl( + *odcds_resources_locator, odcds_config, resource_name, *scope_, *this, resource_decoder_); + } +} + +void OdCdsApiImpl::onConfigUpdate(const std::vector& resources, + const std::string& version_info) { + UNREFERENCED_PARAMETER(resources); + UNREFERENCED_PARAMETER(version_info); + // On-demand cluster updates are only supported for delta, not sotw. + PANIC("not supported"); +} + +void OdCdsApiImpl::onConfigUpdate(const std::vector& added_resources, + const Protobuf::RepeatedPtrField& removed_resources, + const std::string& system_version_info) { + auto exception_msgs = + helper_.onConfigUpdate(added_resources, removed_resources, system_version_info); + sendAwaiting(); + status_ = StartStatus::InitialFetchDone; + // According to the XDS specification, the server can send a reply with names in the + // removed_resources field for requested resources that do not exist. That way we can notify the + // interested parties about the missing resource immediately without waiting for some timeout to + // be triggered. + for (const auto& resource_name : removed_resources) { + ENVOY_LOG(debug, "odcds: notifying about potential missing cluster {}", resource_name); + notifier_.notifyMissingCluster(resource_name); + } + if (!exception_msgs.empty()) { + throw EnvoyException( + fmt::format("Error adding/updating cluster(s) {}", absl::StrJoin(exception_msgs, ", "))); + } +} + +void OdCdsApiImpl::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason, + const EnvoyException*) { + ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason); + sendAwaiting(); + status_ = StartStatus::InitialFetchDone; +} + +void OdCdsApiImpl::sendAwaiting() { + if (awaiting_names_.empty()) { + return; + } + // The awaiting names are sent only once. After the state transition from Starting to + // InitialFetchDone (which happens on the first received response), the awaiting names list is not + // used any more. + ENVOY_LOG(debug, "odcds: sending request for awaiting cluster names {}", + fmt::join(awaiting_names_, ", ")); + subscription_->requestOnDemandUpdate(awaiting_names_); + awaiting_names_.clear(); +} + +void OdCdsApiImpl::updateOnDemand(std::string cluster_name) { + switch (status_) { + case StartStatus::NotStarted: + ENVOY_LOG(trace, "odcds: starting a subscription with cluster name {}", cluster_name); + status_ = StartStatus::Started; + subscription_->start({std::move(cluster_name)}); + return; + + case StartStatus::Started: + ENVOY_LOG(trace, "odcds: putting cluster name {} on awaiting list", cluster_name); + awaiting_names_.insert(std::move(cluster_name)); + return; + + case StartStatus::InitialFetchDone: + ENVOY_LOG(trace, "odcds: requesting for cluster name {}", cluster_name); + subscription_->requestOnDemandUpdate({std::move(cluster_name)}); + return; + } + PANIC("corrupt enum"); +} + +} // namespace Upstream +} // namespace Envoy diff --git a/source/common/upstream/od_cds_api_impl.h b/source/common/upstream/od_cds_api_impl.h new file mode 100644 index 000000000000..184cc6f5182e --- /dev/null +++ b/source/common/upstream/od_cds_api_impl.h @@ -0,0 +1,97 @@ +#pragma once + +#include +#include + +#include "envoy/config/cluster/v3/cluster.pb.h" +#include "envoy/config/cluster/v3/cluster.pb.validate.h" +#include "envoy/config/core/v3/config_source.pb.h" +#include "envoy/config/subscription.h" +#include "envoy/protobuf/message_validator.h" +#include "envoy/stats/scope.h" +#include "envoy/upstream/cluster_manager.h" + +#include "source/common/config/subscription_base.h" +#include "source/common/protobuf/protobuf.h" +#include "source/common/upstream/cds_api_helper.h" + +namespace Envoy { +namespace Upstream { + +enum class StartStatus { + // No initial fetch started. + NotStarted, + // Initial fetch started. + Started, + // Initial fetch arrived. + InitialFetchDone, +}; + +/** + * An interface for on-demand CDS. Defined to allow mocking. + */ +class OdCdsApi { +public: + virtual ~OdCdsApi() = default; + + // Subscribe to a cluster with a given name. It's meant to eventually send a discovery request + // with the cluster name to the management server. + virtual void updateOnDemand(std::string cluster_name) PURE; +}; + +using OdCdsApiSharedPtr = std::shared_ptr; + +/** + * An interface used by OdCdsApiImpl for sending notifications about the missing cluster that was + * requested. + */ +class MissingClusterNotifier { +public: + virtual ~MissingClusterNotifier() = default; + + virtual void notifyMissingCluster(absl::string_view name) PURE; +}; + +/** + * ODCDS API implementation that fetches via Subscription. + */ +class OdCdsApiImpl : public OdCdsApi, + Envoy::Config::SubscriptionBase, + Logger::Loggable { +public: + static OdCdsApiSharedPtr create(const envoy::config::core::v3::ConfigSource& odcds_config, + OptRef odcds_resources_locator, + ClusterManager& cm, MissingClusterNotifier& notifier, + Stats::Scope& scope, + ProtobufMessage::ValidationVisitor& validation_visitor); + + // Upstream::OdCdsApi + void updateOnDemand(std::string cluster_name) override; + +private: + // Config::SubscriptionCallbacks + void onConfigUpdate(const std::vector& resources, + const std::string& version_info) override; + void onConfigUpdate(const std::vector& added_resources, + const Protobuf::RepeatedPtrField& removed_resources, + const std::string& system_version_info) override; + void onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason, + const EnvoyException* e) override; + + OdCdsApiImpl(const envoy::config::core::v3::ConfigSource& odcds_config, + OptRef odcds_resources_locator, ClusterManager& cm, + MissingClusterNotifier& notifier, Stats::Scope& scope, + ProtobufMessage::ValidationVisitor& validation_visitor); + void sendAwaiting(); + + CdsApiHelper helper_; + ClusterManager& cm_; + MissingClusterNotifier& notifier_; + Stats::ScopePtr scope_; + StartStatus status_; + absl::flat_hash_set awaiting_names_; + Config::SubscriptionPtr subscription_; +}; + +} // namespace Upstream +} // namespace Envoy diff --git a/test/common/upstream/BUILD b/test/common/upstream/BUILD index 9e6af4ad59fb..07337eb30dc1 100644 --- a/test/common/upstream/BUILD +++ b/test/common/upstream/BUILD @@ -14,6 +14,20 @@ licenses(["notice"]) # Apache 2 envoy_package() +envoy_cc_test( + name = "od_cds_api_impl_test", + srcs = ["od_cds_api_impl_test.cc"], + deps = [ + "//envoy/config:subscription_interface", + "//source/common/stats:isolated_store_lib", + "//source/common/upstream:od_cds_api_lib", + "//test/mocks/protobuf:protobuf_mocks", + "//test/mocks/upstream:cluster_manager_mocks", + "//test/mocks/upstream:missing_cluster_notifier_mocks", + "@envoy_api//envoy/config/core/v3:pkg_cc_proto", + ], +) + envoy_cc_test( name = "cds_api_impl_test", srcs = ["cds_api_impl_test.cc"], @@ -32,6 +46,17 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "cluster_discovery_manager_test", + srcs = ["cluster_discovery_manager_test.cc"], + deps = [ + "//envoy/upstream:cluster_manager_interface", + "//source/common/common:cleanup_lib", + "//source/common/upstream:cluster_discovery_manager_lib", + "//test/mocks/upstream:thread_local_cluster_mocks", + ], +) + envoy_cc_test( name = "cluster_manager_impl_test", srcs = ["cluster_manager_impl_test.cc"], @@ -51,12 +76,14 @@ envoy_cc_test( "//test/config:v2_link_hacks", "//test/integration/load_balancers:custom_lb_policy", "//test/mocks/matcher:matcher_mocks", + "//test/mocks/protobuf:protobuf_mocks", "//test/mocks/upstream:cds_api_mocks", "//test/mocks/upstream:cluster_priority_set_mocks", "//test/mocks/upstream:cluster_real_priority_set_mocks", "//test/mocks/upstream:cluster_update_callbacks_mocks", "//test/mocks/upstream:health_checker_mocks", "//test/mocks/upstream:load_balancer_context_mock", + "//test/mocks/upstream:od_cds_api_mocks", "//test/mocks/upstream:thread_aware_load_balancer_mocks", "//test/test_common:test_runtime_lib", "@envoy_api//envoy/admin/v3:pkg_cc_proto", diff --git a/test/common/upstream/cluster_discovery_manager_test.cc b/test/common/upstream/cluster_discovery_manager_test.cc new file mode 100644 index 000000000000..e1a4175687e1 --- /dev/null +++ b/test/common/upstream/cluster_discovery_manager_test.cc @@ -0,0 +1,447 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "envoy/upstream/cluster_manager.h" + +#include "source/common/common/cleanup.h" +#include "source/common/upstream/cluster_discovery_manager.h" + +#include "test/mocks/upstream/thread_local_cluster.h" + +#include "gtest/gtest.h" + +namespace Envoy { +namespace Upstream { +namespace { + +class TestClusterUpdateCallbacksHandle : public ClusterUpdateCallbacksHandle, + RaiiListElement { +public: + TestClusterUpdateCallbacksHandle(ClusterUpdateCallbacks& cb, + std::list& parent) + : RaiiListElement(parent, &cb) {} +}; + +class TestClusterLifecycleCallbackHandler : public ClusterLifecycleCallbackHandler { +public: + // Upstream::ClusterLifecycleCallbackHandler + ClusterUpdateCallbacksHandlePtr addClusterUpdateCallbacks(ClusterUpdateCallbacks& cb) override { + return std::make_unique(cb, update_callbacks_); + } + + void invokeClusterAdded(ThreadLocalCluster& cluster) { + for (auto& cb : update_callbacks_) { + cb->onClusterAddOrUpdate(cluster); + } + } + + std::list update_callbacks_; +}; + +enum class Action { + InvokePrevious, + InvokeSelf, + InvokeNext, + InvokeLast, + InvokeOther, + ProcessFoo, + ProcessBar, + DestroyPrevious, + DestroySelf, + DestroyNext, + DestroyOther, + AddNewToFoo, +}; + +const char* actionToString(Action action) { + switch (action) { + case Action::InvokePrevious: + return "invoke previous"; + + case Action::InvokeSelf: + return "invoke self"; + + case Action::InvokeNext: + return "invoke next"; + + case Action::InvokeLast: + return "invoke last"; + + case Action::InvokeOther: + return "invoke other"; + + case Action::ProcessFoo: + return "process foo"; + + case Action::ProcessBar: + return "process bar"; + + case Action::DestroyPrevious: + return "destroy previous"; + + case Action::DestroySelf: + return "destroy self"; + + case Action::DestroyNext: + return "destroy next"; + + case Action::DestroyOther: + return "destroy other"; + + case Action::AddNewToFoo: + return "add new to foo"; + } + + return "invalid action"; +} + +std::ostream& operator<<(std::ostream& os, Action action) { + os << actionToString(action); + return os; +} + +enum class OtherActionsExecution { + AfterFirstAction, + WithinFirstAction, +}; + +struct ActionsParameter { + ActionsParameter(std::vector actions, std::vector called_callbacks, + OtherActionsExecution other_actions_execution) + : actions_(std::move(actions)), called_callbacks_(std::move(called_callbacks)), + other_actions_execution_(other_actions_execution) {} + + std::vector actions_; + std::vector called_callbacks_; + OtherActionsExecution other_actions_execution_; +}; + +std::ostream& operator<<(std::ostream& os, const ActionsParameter& param) { + const char* prefix = ""; + const char* first_separator = ", "; + if (param.other_actions_execution_ == OtherActionsExecution::WithinFirstAction) { + prefix = "during "; + first_separator = ": "; + } + os << prefix << param.actions_.front() << first_separator + << absl::StrJoin(param.actions_.begin() + 1, param.actions_.end(), ", ", + absl::StreamFormatter()) + << " => " << absl::StrJoin(param.called_callbacks_, ", "); + return os; +} + +class ActionExecutor { +public: + ActionExecutor() + : cdm_("test_thread", lifecycle_handler_), previous_(addCallback("foo", "previous")), + self_(addCallback("foo", "self")), next_(addCallback("foo", "next")), + last_(addCallback("foo", "last")), other_(addCallback("bar", "other")) {} + + void setSelfCallback(std::function self_callback) { + self_callback_ = std::move(self_callback); + } + + void execute(Action action) { + switch (action) { + case Action::InvokePrevious: + useInvoker(previous_.invoker_); + break; + + case Action::InvokeSelf: + useInvoker(self_.invoker_); + break; + + case Action::InvokeNext: + useInvoker(next_.invoker_); + break; + + case Action::InvokeLast: + useInvoker(last_.invoker_); + break; + + case Action::InvokeOther: + useInvoker(other_.invoker_); + break; + + case Action::ProcessFoo: + processClusterName("foo"); + break; + + case Action::ProcessBar: + processClusterName("bar"); + break; + + case Action::DestroyPrevious: + previous_.handle_ptr_.reset(); + break; + + case Action::DestroySelf: + self_.handle_ptr_.reset(); + break; + + case Action::DestroyNext: + next_.handle_ptr_.reset(); + break; + + case Action::DestroyOther: + other_.handle_ptr_.reset(); + break; + + case Action::AddNewToFoo: + std::string callback_name = "new" + std::to_string(new_.size()); + new_.emplace_back(addCallback("foo", std::move(callback_name))); + break; + } + } + + ClusterDiscoveryManager::AddedCallbackData addCallback(std::string cluster_name, + std::string callback_name) { + return cdm_.addCallback( + std::move(cluster_name), + std::make_unique( + [this, callback_name = std::move(callback_name)](ClusterDiscoveryStatus) { + // we ignore the status, it's a thing that always comes from outside the manager + bool is_self = callback_name == "self"; + called_callbacks_.push_back(std::move(callback_name)); + if (is_self && self_callback_) { + self_callback_(); + } + })); + } + + void processClusterName(std::string name) { + auto cluster = NiceMock(); + cluster.cluster_.info_->name_ = std::move(name); + lifecycle_handler_.invokeClusterAdded(cluster); + } + + void useInvoker(ClusterDiscoveryManager::CallbackInvoker& invoker) { + invoker.invokeCallback(ClusterDiscoveryStatus::Available); + } + + TestClusterLifecycleCallbackHandler lifecycle_handler_; + ClusterDiscoveryManager cdm_; + std::vector called_callbacks_; + ClusterDiscoveryManager::AddedCallbackData previous_, self_, next_, last_, other_; + std::vector new_; + std::function self_callback_; +}; + +class ActionExecutorTest : public testing::TestWithParam { +public: + void runTest() { + auto& [actions, expected_result, other_actions_execution] = GetParam(); + + ASSERT_FALSE(actions.empty()); + + switch (other_actions_execution) { + case OtherActionsExecution::AfterFirstAction: + for (auto action : actions) { + executor_.execute(action); + } + break; + + case OtherActionsExecution::WithinFirstAction: + executor_.setSelfCallback([this, begin = actions.begin() + 1, end = actions.end()]() { + for (auto it = begin; it != end; ++it) { + executor_.execute(*it); + } + }); + executor_.execute(actions.front()); + break; + } + + EXPECT_EQ(executor_.called_callbacks_, expected_result); + } + + ActionExecutor executor_; +}; + +std::vector all_actions = { + // invoke self twice in a row; expect it to be called once + ActionsParameter({Action::InvokeSelf, Action::InvokeSelf}, {"self"}, + OtherActionsExecution::AfterFirstAction), + // invoke self then other; expect them to be called normally + ActionsParameter({Action::InvokeSelf, Action::InvokeOther}, {"self", "other"}, + OtherActionsExecution::AfterFirstAction), + // invoke self then process foo; since self was already called, processing foo should not call + // it again + ActionsParameter({Action::InvokeSelf, Action::ProcessFoo}, {"self", "previous", "next", "last"}, + OtherActionsExecution::AfterFirstAction), + // invoke self then process bar; expect them to be called normally + ActionsParameter({Action::InvokeSelf, Action::ProcessBar}, {"self", "other"}, + OtherActionsExecution::AfterFirstAction), + // invoke self then destroy self; expect destroying to be a noop instead of corrupting things + // (this is mostly for address sanitizer) + ActionsParameter({Action::InvokeSelf, Action::DestroySelf}, {"self"}, + OtherActionsExecution::AfterFirstAction), + // process foo then invoke self; since self was called as a part of processing foo, invoke + // should be a noop + ActionsParameter({Action::ProcessFoo, Action::InvokeSelf}, {"previous", "self", "next", "last"}, + OtherActionsExecution::AfterFirstAction), + // process foo twice; expect the callbacks to be called once + ActionsParameter({Action::ProcessFoo, Action::ProcessFoo}, {"previous", "self", "next", "last"}, + OtherActionsExecution::AfterFirstAction), + // process foo then bar; expect the callbacks to be called normally + ActionsParameter({Action::ProcessFoo, Action::ProcessBar}, + {"previous", "self", "next", "last", "other"}, + OtherActionsExecution::AfterFirstAction), + // process foo then destroy self; expect destroying to be a noop instead of corrupting things + // (this is mostly for address sanitizer) + ActionsParameter({Action::ProcessFoo, Action::DestroySelf}, + {"previous", "self", "next", "last"}, OtherActionsExecution::AfterFirstAction), + // destroy self then invoke self; expect the invoke to be a noop + ActionsParameter({Action::DestroySelf, Action::InvokeSelf}, {}, + OtherActionsExecution::AfterFirstAction), + // destroy self then process foo; expect all callbacks but self to be invoked + ActionsParameter({Action::DestroySelf, Action::ProcessFoo}, {"previous", "next", "last"}, + OtherActionsExecution::AfterFirstAction), + // destroy self twice; expect the second destroying to be a noop instead of corrupting things + // (this is mostly for address sanitizer) + ActionsParameter({Action::DestroySelf, Action::DestroySelf}, {}, + OtherActionsExecution::AfterFirstAction), + + // when invoking self, invoke self; expect the second invoke to be a noop + ActionsParameter({Action::InvokeSelf, Action::InvokeSelf}, {"self"}, + OtherActionsExecution::WithinFirstAction), + // when invoking self, destroy self; expect the second destroying to be a noop instead of + // corrupting things (this is mostly for address sanitizer) + ActionsParameter({Action::InvokeSelf, Action::DestroySelf}, {"self"}, + OtherActionsExecution::WithinFirstAction), + // when invoking self, process foo; expect all callbacks but self to be invoked, since self was + // already invoked + ActionsParameter({Action::InvokeSelf, Action::ProcessFoo}, {"self", "previous", "next", "last"}, + OtherActionsExecution::WithinFirstAction), + // when invoking self, process bar; expect the callbacks to be called normally + ActionsParameter({Action::InvokeSelf, Action::ProcessBar}, {"self", "other"}, + OtherActionsExecution::WithinFirstAction), + // when processing foo, invoke previous; expect the invoke to be a noop, because previous has + // already been called and done + ActionsParameter({Action::ProcessFoo, Action::InvokePrevious}, + {"previous", "self", "next", "last"}, + OtherActionsExecution::WithinFirstAction), + // when processing foo, invoke self; expect the invoke to be a noop, because self is being + // called right now + ActionsParameter({Action::ProcessFoo, Action::InvokeSelf}, {"previous", "self", "next", "last"}, + OtherActionsExecution::WithinFirstAction), + // when processing foo, invoke next; expect next to be called once (with the invoke), while + // calling it during the process should become a noop + ActionsParameter({Action::ProcessFoo, Action::InvokeNext}, {"previous", "self", "next", "last"}, + OtherActionsExecution::WithinFirstAction), + // when processing foo, invoke last; expect last to be called out of order + ActionsParameter({Action::ProcessFoo, Action::InvokeLast}, {"previous", "self", "last", "next"}, + OtherActionsExecution::WithinFirstAction), + // when processing foo, process foo; expect the second process to be a noop + ActionsParameter({Action::ProcessFoo, Action::ProcessFoo}, {"previous", "self", "next", "last"}, + OtherActionsExecution::WithinFirstAction), + // when processing foo, process bar; expect the callbacks to be called normally, but bar + // callbacks should be called before the rest of foo callbacks + ActionsParameter({Action::ProcessFoo, Action::ProcessBar}, + {"previous", "self", "other", "next", "last"}, + OtherActionsExecution::WithinFirstAction), + // when processing foo, destroy self; expect the second destroying to be a noop (since self is + // being called at the moment), instead of corrupting things (this is mostly for address + // sanitizer) + ActionsParameter({Action::ProcessFoo, Action::DestroySelf}, + {"previous", "self", "next", "last"}, + OtherActionsExecution::WithinFirstAction), + // when processing foo, destroy next; expect next callback to be skipped + ActionsParameter({Action::ProcessFoo, Action::DestroyNext}, {"previous", "self", "last"}, + OtherActionsExecution::WithinFirstAction), + // when processing foo, add a new callback to foo; expect the new callback to be not invoked + // (could be invoked with a follow-up process) + ActionsParameter({Action::ProcessFoo, Action::AddNewToFoo}, + {"previous", "self", "next", "last"}, + OtherActionsExecution::WithinFirstAction), + + // when invoking self, invoke previous and process foo; expect the process to call only next and + // last + ActionsParameter({Action::InvokeSelf, Action::InvokePrevious, Action::ProcessFoo}, + {"self", "previous", "next", "last"}, + OtherActionsExecution::WithinFirstAction), + // when invoking self, invoke next and process foo; expect process to call only previous and + // last + ActionsParameter({Action::InvokeSelf, Action::InvokeNext, Action::ProcessFoo}, + {"self", "next", "previous", "last"}, + OtherActionsExecution::WithinFirstAction), + // when invoking self, invoke other invoke last, and process bar; expect the process to be a + // noop (invoking last for visibility of the noop) + ActionsParameter( + {Action::InvokeSelf, Action::InvokeOther, Action::InvokeLast, Action::ProcessBar}, + {"self", "other", "last"}, OtherActionsExecution::WithinFirstAction), + // when invoking self, process foo then invoke previous; expect the process to skip self (as + // it's being called at the moment) and invoking previous to be a noop (called during the + // process) + ActionsParameter({Action::InvokeSelf, Action::ProcessFoo, Action::InvokePrevious}, + {"self", "previous", "next", "last"}, + OtherActionsExecution::WithinFirstAction), + // when invoking self, process foo then invoke previous; expect the process to skip self (as + // it's being called at the moment) and invoking previous to be a noop (called during the + // process) + ActionsParameter({Action::InvokeSelf, Action::ProcessFoo, Action::DestroyPrevious}, + {"self", "previous", "next", "last"}, + OtherActionsExecution::WithinFirstAction), + // when invoking self, destroy previous and process foo; expect self and previous to be skipped + // when processing + ActionsParameter({Action::InvokeSelf, Action::DestroyPrevious, Action::ProcessFoo}, + {"self", "next", "last"}, OtherActionsExecution::WithinFirstAction), + // when invoking self, destroy other and process bar; expect the process to be a noop + ActionsParameter({Action::InvokeSelf, Action::DestroyOther, Action::ProcessBar}, {"self"}, + OtherActionsExecution::WithinFirstAction), + // when invoking self, add new callback to foo and process foo; expect self to be skipped, but + // new to be called along with the rest of the callbacks + ActionsParameter({Action::InvokeSelf, Action::AddNewToFoo, Action::ProcessFoo}, + {"self", "previous", "next", "last", "new0"}, + OtherActionsExecution::WithinFirstAction), + // when processing foo, add new callback to foo, process foo then invoke other; expect the + // second process to call only the new callback, then first process to resume with the rest of + // the callbacks (the other callback is added to see the split between two processes) + ActionsParameter( + {Action::ProcessFoo, Action::AddNewToFoo, Action::ProcessFoo, Action::InvokeOther}, + {"previous", "self", "new0", "other", "next", "last"}, + OtherActionsExecution::WithinFirstAction), + // when processing foo, add new to foo and destroy next; expect the new callback to be not + // called, same for the next callback + ActionsParameter({Action::ProcessFoo, Action::AddNewToFoo, Action::DestroyNext}, + {"previous", "self", "last"}, OtherActionsExecution::WithinFirstAction), + // when processing foo, destroy next and try to invoke next; + // expect the invoke to be noop, and processing to not call the + // next callback + ActionsParameter({Action::ProcessFoo, Action::DestroyNext, Action::InvokeNext}, + {"previous", "self", "last"}, OtherActionsExecution::WithinFirstAction), +}; + +class ClusterDiscoveryTest : public ActionExecutorTest {}; + +INSTANTIATE_TEST_SUITE_P(ClusterDiscoveryTestActions, ClusterDiscoveryTest, + testing::ValuesIn(all_actions)); + +TEST_P(ClusterDiscoveryTest, TestActions) { runTest(); } + +class ClusterDiscoveryManagerMiscTest : public testing::Test { +public: + ClusterDiscoveryManagerMiscTest() = default; + + ActionExecutor executor_; +}; + +// Test the the discovery in progress value is correct. +TEST_F(ClusterDiscoveryManagerMiscTest, TestDiscoveryInProgressValue) { + // previous is first callback added to foo + EXPECT_FALSE(executor_.previous_.discovery_in_progress_); + // self, next and last callbacks are follow-up callbacks in foo + EXPECT_TRUE(executor_.self_.discovery_in_progress_); + EXPECT_TRUE(executor_.next_.discovery_in_progress_); + EXPECT_TRUE(executor_.last_.discovery_in_progress_); + // other is first callback added to bar + EXPECT_FALSE(executor_.other_.discovery_in_progress_); +} + +} // namespace +} // namespace Upstream +} // namespace Envoy diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index 054e51b9e818..c36a9c4eec97 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -4,6 +4,7 @@ #include "envoy/config/cluster/v3/cluster.pb.validate.h" #include "envoy/config/core/v3/base.pb.h" +#include "source/common/config/xds_resource.h" #include "source/common/network/raw_buffer_socket.h" #include "source/common/network/resolver_impl.h" #include "source/common/router/context_impl.h" @@ -14,12 +15,14 @@ #include "test/config/v2_link_hacks.h" #include "test/mocks/http/conn_pool.h" #include "test/mocks/matcher/mocks.h" +#include "test/mocks/protobuf/mocks.h" #include "test/mocks/upstream/cds_api.h" #include "test/mocks/upstream/cluster_priority_set.h" #include "test/mocks/upstream/cluster_real_priority_set.h" #include "test/mocks/upstream/cluster_update_callbacks.h" #include "test/mocks/upstream/health_checker.h" #include "test/mocks/upstream/load_balancer_context.h" +#include "test/mocks/upstream/od_cds_api.h" #include "test/mocks/upstream/thread_aware_load_balancer.h" #include "test/test_common/test_runtime.h" #include "test/test_common/utility.h" @@ -203,6 +206,255 @@ envoy::config::bootstrap::v3::Bootstrap defaultConfig() { return parseBootstrapFromV3Yaml(yaml); } +class ODCDTest : public ClusterManagerImplTest { +public: + void SetUp() override { + create(defaultConfig()); + odcds_ = MockOdCdsApi::create(); + odcds_handle_ = cluster_manager_->createOdCdsApiHandle(odcds_); + } + + void TearDown() override { + odcds_.reset(); + odcds_handle_.reset(); + factory_.tls_.shutdownThread(); + } + + ClusterDiscoveryCallbackPtr createCallback() { + return std::make_unique( + [this](ClusterDiscoveryStatus cluster_status) { + UNREFERENCED_PARAMETER(cluster_status); + ++callback_call_count_; + }); + } + + ClusterDiscoveryCallbackPtr createCallback(ClusterDiscoveryStatus expected_cluster_status) { + return std::make_unique( + [this, expected_cluster_status](ClusterDiscoveryStatus cluster_status) { + EXPECT_EQ(expected_cluster_status, cluster_status); + ++callback_call_count_; + }); + } + + MockOdCdsApiSharedPtr odcds_; + OdCdsApiHandlePtr odcds_handle_; + std::chrono::milliseconds timeout_ = std::chrono::milliseconds(5000); + unsigned callback_call_count_ = 0u; +}; + +// Check that we create a valid handle for valid config source and null resource locator. +TEST_F(ODCDTest, TestAllocate) { + envoy::config::core::v3::ConfigSource config; + OptRef locator; + ProtobufMessage::MockValidationVisitor mock_visitor; + + config.mutable_api_config_source()->set_api_type( + envoy::config::core::v3::ApiConfigSource::DELTA_GRPC); + config.mutable_api_config_source()->set_transport_api_version(envoy::config::core::v3::V3); + config.mutable_api_config_source()->mutable_refresh_delay()->set_seconds(1); + config.mutable_api_config_source()->add_grpc_services()->mutable_envoy_grpc()->set_cluster_name( + "static_cluster"); + + auto handle = cluster_manager_->allocateOdCdsApi(config, locator, mock_visitor); + EXPECT_NE(handle, nullptr); +} + +// Check that we create a valid handle for valid config source and resource locator. +TEST_F(ODCDTest, TestAllocateWithLocator) { + envoy::config::core::v3::ConfigSource config; + ProtobufMessage::MockValidationVisitor mock_visitor; + + config.mutable_api_config_source()->set_api_type( + envoy::config::core::v3::ApiConfigSource::DELTA_GRPC); + config.mutable_api_config_source()->set_transport_api_version(envoy::config::core::v3::V3); + config.mutable_api_config_source()->mutable_refresh_delay()->set_seconds(1); + config.mutable_api_config_source()->add_grpc_services()->mutable_envoy_grpc()->set_cluster_name( + "static_cluster"); + + auto locator = + Config::XdsResourceIdentifier::decodeUrl("xdstp://foo/envoy.config.cluster.v3.Cluster/bar"); + auto handle = cluster_manager_->allocateOdCdsApi(config, locator, mock_visitor); + EXPECT_NE(handle, nullptr); +} + +// Check if requesting for an unknown cluster calls into ODCDS instead of invoking the callback. +TEST_F(ODCDTest, TestRequest) { + auto cb = createCallback(); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo")); + auto handle = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb), timeout_); + EXPECT_EQ(callback_call_count_, 0); +} + +// Check if repeatedly requesting for an unknown cluster calls only once into ODCDS instead of +// invoking the callbacks. +TEST_F(ODCDTest, TestRequestRepeated) { + auto cb1 = createCallback(); + auto cb2 = createCallback(); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo")); + auto handle1 = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb1), timeout_); + auto handle2 = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb2), timeout_); + EXPECT_EQ(callback_call_count_, 0); +} + +// Check if requesting an unknown cluster calls into ODCDS, even after the successful discovery of +// the cluster and its following expiration (removal). Also make sure that the callback is called on +// the successful discovery. +TEST_F(ODCDTest, TestClusterRediscovered) { + auto cb = createCallback(ClusterDiscoveryStatus::Available); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo")).Times(2); + auto handle = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb), timeout_); + cluster_manager_->addOrUpdateCluster(defaultStaticCluster("cluster_foo"), "version1"); + EXPECT_EQ(callback_call_count_, 1); + handle.reset(); + cluster_manager_->removeCluster("cluster_foo"); + cb = createCallback(); + handle = odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb), timeout_); + EXPECT_EQ(callback_call_count_, 1); +} + +// Check if requesting an unknown cluster calls into ODCDS, even after the expired discovery of the +// cluster. Also make sure that the callback is called on the expired discovery. +TEST_F(ODCDTest, TestClusterRediscoveredAfterExpiration) { + auto cb = createCallback(ClusterDiscoveryStatus::Timeout); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo")).Times(2); + auto handle = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb), timeout_); + cluster_manager_->notifyExpiredDiscovery("cluster_foo"); + EXPECT_EQ(callback_call_count_, 1); + handle.reset(); + cb = createCallback(); + handle = odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb), timeout_); + EXPECT_EQ(callback_call_count_, 1); +} + +// Check if requesting an unknown cluster calls into ODCDS, even after +// the discovery found out that the cluster is missing in the +// management server. Also make sure that the callback is called on +// the failed discovery. +TEST_F(ODCDTest, TestClusterRediscoveredAfterMissing) { + auto cb = createCallback(ClusterDiscoveryStatus::Missing); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo")).Times(2); + auto handle = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb), timeout_); + cluster_manager_->notifyMissingCluster("cluster_foo"); + EXPECT_EQ(callback_call_count_, 1); + handle.reset(); + cb = createCallback(); + handle = odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb), timeout_); + EXPECT_EQ(callback_call_count_, 1); +} + +// Check that we do nothing if we get a notification about irrelevant +// missing cluster. +TEST_F(ODCDTest, TestIrrelevantNotifyMissingCluster) { + auto cb = createCallback(ClusterDiscoveryStatus::Timeout); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo")); + auto handle = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb), timeout_); + cluster_manager_->notifyMissingCluster("cluster_bar"); + EXPECT_EQ(callback_call_count_, 0); +} + +// Check that the callback is not called when some other cluster is added. +TEST_F(ODCDTest, TestDiscoveryManagerIgnoresIrrelevantClusters) { + auto cb = std::make_unique([](ClusterDiscoveryStatus) { + ADD_FAILURE() << "The callback should not be called for irrelevant clusters"; + }); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo")); + auto handle = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb), timeout_); + cluster_manager_->addOrUpdateCluster(defaultStaticCluster("cluster_irrelevant"), "version1"); +} + +// Start a couple of discoveries and drop the discovery handles in different order, make sure no +// callbacks are invoked when discoveries are done. +TEST_F(ODCDTest, TestDroppingHandles) { + auto cb1 = std::make_unique( + [](ClusterDiscoveryStatus) { ADD_FAILURE() << "The callback 1 should not be called"; }); + auto cb2 = std::make_unique( + [](ClusterDiscoveryStatus) { ADD_FAILURE() << "The callback 2 should not be called"; }); + auto cb3 = std::make_unique( + [](ClusterDiscoveryStatus) { ADD_FAILURE() << "The callback 3 should not be called"; }); + auto cb4 = std::make_unique( + [](ClusterDiscoveryStatus) { ADD_FAILURE() << "The callback 4 should not be called"; }); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo1")); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo2")); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo3")); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo4")); + auto handle1 = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo1", std::move(cb1), timeout_); + auto handle2 = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo2", std::move(cb2), timeout_); + auto handle3 = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo3", std::move(cb3), timeout_); + auto handle4 = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo4", std::move(cb4), timeout_); + + handle2.reset(); + handle3.reset(); + handle1.reset(); + handle4.reset(); + + cluster_manager_->addOrUpdateCluster(defaultStaticCluster("cluster_foo1"), "version1"); + cluster_manager_->addOrUpdateCluster(defaultStaticCluster("cluster_foo2"), "version1"); + cluster_manager_->addOrUpdateCluster(defaultStaticCluster("cluster_foo3"), "version1"); + cluster_manager_->addOrUpdateCluster(defaultStaticCluster("cluster_foo4"), "version1"); +} + +// Checks that dropping discovery handles will result in callbacks not being invoked. +TEST_F(ODCDTest, TestHandles) { + auto cb1 = createCallback(ClusterDiscoveryStatus::Available); + auto cb2 = std::make_unique( + [](ClusterDiscoveryStatus) { ADD_FAILURE() << "The callback 2 should not be called"; }); + auto cb3 = std::make_unique( + [](ClusterDiscoveryStatus) { ADD_FAILURE() << "The callback 3 should not be called"; }); + auto cb4 = createCallback(ClusterDiscoveryStatus::Available); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo")); + auto handle1 = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb1), timeout_); + auto handle2 = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb2), timeout_); + auto handle3 = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb3), timeout_); + auto handle4 = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb4), timeout_); + + // handle1 and handle4 are left intact, so their respective callbacks will be invoked. + handle2.reset(); + handle3.reset(); + + cluster_manager_->addOrUpdateCluster(defaultStaticCluster("cluster_foo"), "version1"); + EXPECT_EQ(callback_call_count_, 2); +} + +// Check if callback is invoked when trying to discover a cluster we already know about. It should +// not call into ODCDS in such case. +TEST_F(ODCDTest, TestCallbackWithExistingCluster) { + auto cb = createCallback(ClusterDiscoveryStatus::Available); + cluster_manager_->addOrUpdateCluster(defaultStaticCluster("cluster_foo"), "version1"); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo")).Times(0); + auto handle = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb), timeout_); + EXPECT_EQ(callback_call_count_, 1); +} + +// Checks that the cluster manager detects that a thread has requested a cluster that some other +// thread already did earlier, so it does not start another discovery process. +TEST_F(ODCDTest, TestMainThreadDiscoveryInProgressDetection) { + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo")); + auto cb1 = createCallback(); + auto cb2 = createCallback(); + auto handle1 = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb1), timeout_); + auto cdm = cluster_manager_->createAndSwapClusterDiscoveryManager("another_fake_thread"); + auto handle2 = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb2), timeout_); +} + class AlpnSocketFactory : public Network::RawBufferSocketFactory { public: bool supportsAlpn() const override { return true; } diff --git a/test/common/upstream/od_cds_api_impl_test.cc b/test/common/upstream/od_cds_api_impl_test.cc new file mode 100644 index 000000000000..aed57101edef --- /dev/null +++ b/test/common/upstream/od_cds_api_impl_test.cc @@ -0,0 +1,189 @@ +#include "envoy/config/core/v3/config_source.pb.h" +#include "envoy/config/subscription.h" + +#include "source/common/stats/isolated_store_impl.h" +#include "source/common/upstream/od_cds_api_impl.h" + +#include "test/mocks/protobuf/mocks.h" +#include "test/mocks/upstream/cluster_manager.h" +#include "test/mocks/upstream/missing_cluster_notifier.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Upstream { +namespace { + +using ::testing::ElementsAre; +using ::testing::InSequence; +using ::testing::UnorderedElementsAre; + +class OdCdsApiImplTest : public testing::Test { +public: + void SetUp() override { + envoy::config::core::v3::ConfigSource odcds_config; + OptRef null_locator; + odcds_ = OdCdsApiImpl::create(odcds_config, null_locator, cm_, notifier_, store_, + validation_visitor_); + odcds_callbacks_ = cm_.subscription_factory_.callbacks_; + } + + NiceMock cm_; + Stats::IsolatedStoreImpl store_; + MockMissingClusterNotifier notifier_; + OdCdsApiSharedPtr odcds_; + Config::SubscriptionCallbacks* odcds_callbacks_ = nullptr; + NiceMock validation_visitor_; +}; + +// Check that the subscription is started on the first (initial) request. +TEST_F(OdCdsApiImplTest, FirstUpdateStarts) { + InSequence s; + + EXPECT_CALL(*cm_.subscription_factory_.subscription_, start(ElementsAre("fake_cluster"))); + odcds_->updateOnDemand("fake_cluster"); +} + +// Check that the cluster names are added to the awaiting list, when we still wait for the response +// for the initial request. +TEST_F(OdCdsApiImplTest, FollowingClusterNamesHitAwaitingList) { + InSequence s; + + EXPECT_CALL(*cm_.subscription_factory_.subscription_, start(ElementsAre("fake_cluster"))); + EXPECT_CALL(*cm_.subscription_factory_.subscription_, requestOnDemandUpdate(_)).Times(0); + odcds_->updateOnDemand("fake_cluster"); + odcds_->updateOnDemand("another_cluster"); +} + +// Check that the awaiting list is processed when we receive a successful response for the initial +// request. +TEST_F(OdCdsApiImplTest, AwaitingListIsProcessedOnConfigUpdate) { + InSequence s; + + odcds_->updateOnDemand("fake_cluster"); + odcds_->updateOnDemand("another_cluster_1"); + odcds_->updateOnDemand("another_cluster_2"); + + envoy::config::cluster::v3::Cluster cluster; + cluster.set_name("fake_cluster"); + const auto decoded_resources = TestUtility::decodeResources({cluster}); + EXPECT_CALL( + *cm_.subscription_factory_.subscription_, + requestOnDemandUpdate(UnorderedElementsAre("another_cluster_1", "another_cluster_2"))); + odcds_callbacks_->onConfigUpdate(decoded_resources.refvec_, {}, "0"); +} + +// Check that the awaiting list is processed when we receive a failure response for the initial +// request. +TEST_F(OdCdsApiImplTest, AwaitingListIsProcessedOnConfigUpdateFailed) { + InSequence s; + + odcds_->updateOnDemand("fake_cluster"); + odcds_->updateOnDemand("another_cluster_1"); + odcds_->updateOnDemand("another_cluster_2"); + + EXPECT_CALL( + *cm_.subscription_factory_.subscription_, + requestOnDemandUpdate(UnorderedElementsAre("another_cluster_1", "another_cluster_2"))); + odcds_callbacks_->onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout, + nullptr); +} + +// Check that the awaiting list is processed only once, so on the first config update or config +// update failed. +TEST_F(OdCdsApiImplTest, AwaitingListIsProcessedOnceOnly) { + InSequence s; + + odcds_->updateOnDemand("fake_cluster"); + odcds_->updateOnDemand("another_cluster_1"); + odcds_->updateOnDemand("another_cluster_2"); + + EXPECT_CALL( + *cm_.subscription_factory_.subscription_, + requestOnDemandUpdate(UnorderedElementsAre("another_cluster_1", "another_cluster_2"))); + odcds_callbacks_->onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout, + nullptr); + odcds_callbacks_->onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout, + nullptr); +} + +// Check that we don't do extra request if there's nothing on the awaiting list. +TEST_F(OdCdsApiImplTest, NothingIsRequestedOnEmptyAwaitingList) { + InSequence s; + + odcds_->updateOnDemand("fake_cluster"); + + EXPECT_CALL(*cm_.subscription_factory_.subscription_, requestOnDemandUpdate(_)).Times(0); + odcds_callbacks_->onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout, + nullptr); +} + +// Check that we send the requests for clusters after receiving the initial response instead of +// putting the names into the awaiting list. +TEST_F(OdCdsApiImplTest, OnDemandUpdateIsRequestedAfterInitialFetch) { + InSequence s; + + odcds_->updateOnDemand("fake_cluster"); + envoy::config::cluster::v3::Cluster cluster; + cluster.set_name("fake_cluster"); + const auto decoded_resources = TestUtility::decodeResources({cluster}); + odcds_callbacks_->onConfigUpdate(decoded_resources.refvec_, {}, "0"); + EXPECT_CALL(*cm_.subscription_factory_.subscription_, + requestOnDemandUpdate(UnorderedElementsAre("another_cluster"))); + odcds_->updateOnDemand("another_cluster"); +} + +// Check that we report an error when we received a duplicated cluster. +TEST_F(OdCdsApiImplTest, ValidateDuplicateClusters) { + InSequence s; + + envoy::config::cluster::v3::Cluster cluster_1; + cluster_1.set_name("duplicate_cluster"); + const auto decoded_resources = TestUtility::decodeResources({cluster_1, cluster_1}); + + EXPECT_THROW_WITH_MESSAGE(odcds_callbacks_->onConfigUpdate(decoded_resources.refvec_, {}, ""), + EnvoyException, + "Error adding/updating cluster(s) duplicate_cluster: duplicate cluster " + "duplicate_cluster found"); +} + +// Check that notifier gets a message about potentially missing cluster. +TEST_F(OdCdsApiImplTest, NotifierGetsUsed) { + InSequence s; + + odcds_->updateOnDemand("cluster"); + EXPECT_CALL(notifier_, notifyMissingCluster("missing_cluster")); + std::vector v{"missing_cluster"}; + Protobuf::RepeatedPtrField removed(v.begin(), v.end()); + odcds_callbacks_->onConfigUpdate({}, removed, ""); +} + +// Check that notifier won't be used for a requested cluster that did +// not appear in the response. +TEST_F(OdCdsApiImplTest, NotifierNotUsed) { + InSequence s; + + envoy::config::cluster::v3::Cluster cluster; + cluster.set_name("some_cluster"); + const auto some_cluster_resource = TestUtility::decodeResources({cluster}); + cluster.set_name("some_cluster2"); + const auto some_cluster2_resource = TestUtility::decodeResources({cluster}); + + std::vector v{"another_cluster"}; + Protobuf::RepeatedPtrField removed(v.begin(), v.end()); + std::vector v2{"another_cluster2"}; + Protobuf::RepeatedPtrField removed2(v2.begin(), v2.end()); + + odcds_->updateOnDemand("cluster"); + EXPECT_CALL(notifier_, notifyMissingCluster(_)).Times(2); + EXPECT_CALL(notifier_, notifyMissingCluster("cluster")).Times(0); + odcds_callbacks_->onConfigUpdate(some_cluster_resource.refvec_, {}, ""); + odcds_callbacks_->onConfigUpdate({}, removed, ""); + odcds_callbacks_->onConfigUpdate({}, {}, ""); + odcds_callbacks_->onConfigUpdate(some_cluster2_resource.refvec_, removed2, ""); +} + +} // namespace +} // namespace Upstream +} // namespace Envoy diff --git a/test/common/upstream/test_cluster_manager.h b/test/common/upstream/test_cluster_manager.h index 8bc91e7ac877..63cd5a122567 100644 --- a/test/common/upstream/test_cluster_manager.h +++ b/test/common/upstream/test_cluster_manager.h @@ -188,6 +188,18 @@ class TestClusterManagerImpl : public ClusterManagerImpl { } return clusters; } + + OdCdsApiHandlePtr createOdCdsApiHandle(OdCdsApiSharedPtr odcds) { + return ClusterManagerImpl::OdCdsApiHandleImpl::create(*this, std::move(odcds)); + } + + void notifyExpiredDiscovery(absl::string_view name) { + ClusterManagerImpl::notifyExpiredDiscovery(name); + } + + ClusterDiscoveryManager createAndSwapClusterDiscoveryManager(std::string thread_name) { + return ClusterManagerImpl::createAndSwapClusterDiscoveryManager(std::move(thread_name)); + } }; // Override postThreadLocalClusterUpdate so we can test that merged updates calls diff --git a/test/mocks/upstream/BUILD b/test/mocks/upstream/BUILD index 85eadc255f91..624a336837ac 100644 --- a/test/mocks/upstream/BUILD +++ b/test/mocks/upstream/BUILD @@ -72,6 +72,7 @@ envoy_cc_mock( deps = [ ":basic_resource_limit_mocks", ":cds_api_mocks", + ":cluster_discovery_callback_handle_mocks", ":cluster_info_factory_mocks", ":cluster_manager_factory_mocks", ":cluster_manager_mocks", @@ -85,6 +86,9 @@ envoy_cc_mock( ":host_set_mocks", ":load_balancer_context_mock", ":load_balancer_mocks", + ":missing_cluster_notifier_mocks", + ":od_cds_api_handle_mocks", + ":od_cds_api_mocks", ":priority_set_mocks", ":retry_host_predicate_mocks", ":retry_priority_factory_mocks", @@ -260,6 +264,7 @@ envoy_cc_mock( "//test/mocks/http:http_mocks", "//test/mocks/tcp:tcp_mocks", "//test/mocks/upstream:cluster_manager_factory_mocks", + "//test/mocks/upstream:od_cds_api_handle_mocks", "//test/mocks/upstream:thread_local_cluster_mocks", ], ) @@ -291,6 +296,43 @@ envoy_cc_mock( ], ) +envoy_cc_mock( + name = "missing_cluster_notifier_mocks", + srcs = ["missing_cluster_notifier.cc"], + hdrs = ["missing_cluster_notifier.h"], + deps = [ + "//source/common/upstream:od_cds_api_lib", + ], +) + +envoy_cc_mock( + name = "od_cds_api_mocks", + srcs = ["od_cds_api.cc"], + hdrs = ["od_cds_api.h"], + deps = [ + "//source/common/upstream:od_cds_api_lib", + ], +) + +envoy_cc_mock( + name = "od_cds_api_handle_mocks", + srcs = ["od_cds_api_handle.cc"], + hdrs = ["od_cds_api_handle.h"], + deps = [ + ":cluster_discovery_callback_handle_mocks", + "//envoy/upstream:cluster_manager_interface", + ], +) + +envoy_cc_mock( + name = "cluster_discovery_callback_handle_mocks", + srcs = ["cluster_discovery_callback_handle.cc"], + hdrs = ["cluster_discovery_callback_handle.h"], + deps = [ + "//envoy/upstream:cluster_manager_interface", + ], +) + envoy_cc_mock( name = "cluster_update_callbacks_mocks", srcs = ["cluster_update_callbacks.cc"], diff --git a/test/mocks/upstream/cluster_discovery_callback_handle.cc b/test/mocks/upstream/cluster_discovery_callback_handle.cc new file mode 100644 index 000000000000..05513c714e1b --- /dev/null +++ b/test/mocks/upstream/cluster_discovery_callback_handle.cc @@ -0,0 +1,10 @@ +#include "cluster_discovery_callback_handle.h" + +namespace Envoy { +namespace Upstream { + +MockClusterDiscoveryCallbackHandle::MockClusterDiscoveryCallbackHandle() = default; +MockClusterDiscoveryCallbackHandle::~MockClusterDiscoveryCallbackHandle() = default; + +} // namespace Upstream +} // namespace Envoy diff --git a/test/mocks/upstream/cluster_discovery_callback_handle.h b/test/mocks/upstream/cluster_discovery_callback_handle.h new file mode 100644 index 000000000000..d7568d10dd3c --- /dev/null +++ b/test/mocks/upstream/cluster_discovery_callback_handle.h @@ -0,0 +1,18 @@ +#pragma once + +#include "envoy/upstream/cluster_manager.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Upstream { + +class MockClusterDiscoveryCallbackHandle : public ClusterDiscoveryCallbackHandle { +public: + MockClusterDiscoveryCallbackHandle(); + ~MockClusterDiscoveryCallbackHandle() override; +}; + +} // namespace Upstream +} // namespace Envoy diff --git a/test/mocks/upstream/cluster_manager.cc b/test/mocks/upstream/cluster_manager.cc index 7e17320ba63d..05c5afcd8aac 100644 --- a/test/mocks/upstream/cluster_manager.cc +++ b/test/mocks/upstream/cluster_manager.cc @@ -24,6 +24,12 @@ MockClusterManager::MockClusterManager() ON_CALL(*this, grpcAsyncClientManager()).WillByDefault(ReturnRef(async_client_manager_)); ON_CALL(*this, localClusterName()).WillByDefault((ReturnRef(local_cluster_name_))); ON_CALL(*this, subscriptionFactory()).WillByDefault(ReturnRef(subscription_factory_)); + ON_CALL(*this, allocateOdCdsApi(_, _, _)) + .WillByDefault(Invoke([](const envoy::config::core::v3::ConfigSource&, + OptRef, + ProtobufMessage::ValidationVisitor&) -> OdCdsApiHandlePtr { + return MockOdCdsApiHandle::create(); + })); } MockClusterManager::~MockClusterManager() = default; diff --git a/test/mocks/upstream/cluster_manager.h b/test/mocks/upstream/cluster_manager.h index 08f4c1c56328..69f75b443804 100644 --- a/test/mocks/upstream/cluster_manager.h +++ b/test/mocks/upstream/cluster_manager.h @@ -10,6 +10,7 @@ #include "cluster_manager_factory.h" #include "gmock/gmock.h" #include "gtest/gtest.h" +#include "od_cds_api_handle.h" #include "thread_local_cluster.h" namespace Envoy { @@ -71,6 +72,10 @@ class MockClusterManager : public ClusterManager { MOCK_METHOD(void, drainConnections, (const std::string& cluster)); MOCK_METHOD(void, drainConnections, ()); MOCK_METHOD(void, checkActiveStaticCluster, (const std::string& cluster)); + MOCK_METHOD(OdCdsApiHandlePtr, allocateOdCdsApi, + (const envoy::config::core::v3::ConfigSource& odcds_config, + OptRef odcds_resources_locator, + ProtobufMessage::ValidationVisitor& validation_visitor)); NiceMock thread_local_cluster_; envoy::config::core::v3::BindConfig bind_config_; @@ -89,5 +94,4 @@ class MockClusterManager : public ClusterManager { ClusterTimeoutBudgetStatNames cluster_timeout_budget_stat_names_; }; } // namespace Upstream - } // namespace Envoy diff --git a/test/mocks/upstream/missing_cluster_notifier.cc b/test/mocks/upstream/missing_cluster_notifier.cc new file mode 100644 index 000000000000..2c15a4321230 --- /dev/null +++ b/test/mocks/upstream/missing_cluster_notifier.cc @@ -0,0 +1,10 @@ +#include "missing_cluster_notifier.h" + +namespace Envoy { +namespace Upstream { + +MockMissingClusterNotifier::MockMissingClusterNotifier() = default; +MockMissingClusterNotifier::~MockMissingClusterNotifier() = default; + +} // namespace Upstream +} // namespace Envoy diff --git a/test/mocks/upstream/missing_cluster_notifier.h b/test/mocks/upstream/missing_cluster_notifier.h new file mode 100644 index 000000000000..da37fadef838 --- /dev/null +++ b/test/mocks/upstream/missing_cluster_notifier.h @@ -0,0 +1,20 @@ +#pragma once + +#include "source/common/upstream/od_cds_api_impl.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Upstream { + +class MockMissingClusterNotifier : public MissingClusterNotifier { +public: + MockMissingClusterNotifier(); + ~MockMissingClusterNotifier() override; + + MOCK_METHOD(void, notifyMissingCluster, (absl::string_view name)); +}; + +} // namespace Upstream +} // namespace Envoy diff --git a/test/mocks/upstream/mocks.h b/test/mocks/upstream/mocks.h index 433aec61f06e..f256737945d0 100644 --- a/test/mocks/upstream/mocks.h +++ b/test/mocks/upstream/mocks.h @@ -27,6 +27,7 @@ #include "test/mocks/upstream/basic_resource_limit.h" #include "test/mocks/upstream/cds_api.h" #include "test/mocks/upstream/cluster.h" +#include "test/mocks/upstream/cluster_discovery_callback_handle.h" #include "test/mocks/upstream/cluster_info.h" #include "test/mocks/upstream/cluster_info_factory.h" #include "test/mocks/upstream/cluster_manager.h" @@ -40,6 +41,8 @@ #include "test/mocks/upstream/host_set.h" #include "test/mocks/upstream/load_balancer.h" #include "test/mocks/upstream/load_balancer_context.h" +#include "test/mocks/upstream/od_cds_api.h" +#include "test/mocks/upstream/od_cds_api_handle.h" #include "test/mocks/upstream/priority_set.h" #include "test/mocks/upstream/retry_host_predicate.h" #include "test/mocks/upstream/retry_priority.h" diff --git a/test/mocks/upstream/od_cds_api.cc b/test/mocks/upstream/od_cds_api.cc new file mode 100644 index 000000000000..c5438817977a --- /dev/null +++ b/test/mocks/upstream/od_cds_api.cc @@ -0,0 +1,13 @@ +#include "od_cds_api.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Upstream { + +MockOdCdsApi::MockOdCdsApi() = default; +MockOdCdsApi::~MockOdCdsApi() = default; + +} // namespace Upstream +} // namespace Envoy diff --git a/test/mocks/upstream/od_cds_api.h b/test/mocks/upstream/od_cds_api.h new file mode 100644 index 000000000000..4f76c5324c7e --- /dev/null +++ b/test/mocks/upstream/od_cds_api.h @@ -0,0 +1,28 @@ +#pragma once + +#include +#include + +#include "source/common/upstream/od_cds_api_impl.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Upstream { + +class MockOdCdsApi; +using MockOdCdsApiSharedPtr = std::shared_ptr; + +class MockOdCdsApi : public OdCdsApi { +public: + static MockOdCdsApiSharedPtr create() { return std::make_shared(); } + + MockOdCdsApi(); + ~MockOdCdsApi() override; + + MOCK_METHOD(void, updateOnDemand, (std::string cluster_name)); +}; + +} // namespace Upstream +} // namespace Envoy diff --git a/test/mocks/upstream/od_cds_api_handle.cc b/test/mocks/upstream/od_cds_api_handle.cc new file mode 100644 index 000000000000..3f09cbb2e222 --- /dev/null +++ b/test/mocks/upstream/od_cds_api_handle.cc @@ -0,0 +1,23 @@ +#include "od_cds_api_handle.h" + +#include "cluster_discovery_callback_handle.h" + +namespace Envoy { +namespace Upstream { + +using ::testing::_; +using ::testing::Invoke; +using ::testing::NiceMock; + +MockOdCdsApiHandle::MockOdCdsApiHandle() { + ON_CALL(*this, requestOnDemandClusterDiscovery(_, _, _)) + .WillByDefault(Invoke([](absl::string_view, ClusterDiscoveryCallbackPtr, + std::chrono::milliseconds) -> ClusterDiscoveryCallbackHandlePtr { + return std::make_unique>(); + })); +} + +MockOdCdsApiHandle::~MockOdCdsApiHandle() = default; + +} // namespace Upstream +} // namespace Envoy diff --git a/test/mocks/upstream/od_cds_api_handle.h b/test/mocks/upstream/od_cds_api_handle.h new file mode 100644 index 000000000000..49a96e4223f5 --- /dev/null +++ b/test/mocks/upstream/od_cds_api_handle.h @@ -0,0 +1,26 @@ +#pragma once + +#include "envoy/upstream/cluster_manager.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Upstream { + +class MockOdCdsApiHandle; +using MockOdCdsApiHandlePtr = std::unique_ptr; + +class MockOdCdsApiHandle : public OdCdsApiHandle { +public: + static MockOdCdsApiHandlePtr create() { return std::make_unique(); } + MockOdCdsApiHandle(); + ~MockOdCdsApiHandle() override; + + MOCK_METHOD(ClusterDiscoveryCallbackHandlePtr, requestOnDemandClusterDiscovery, + (absl::string_view name, ClusterDiscoveryCallbackPtr callback, + std::chrono::milliseconds timeout)); +}; + +} // namespace Upstream +} // namespace Envoy diff --git a/tools/spelling/spelling_dictionary.txt b/tools/spelling/spelling_dictionary.txt index 80223b742b94..087ab849ec5b 100644 --- a/tools/spelling/spelling_dictionary.txt +++ b/tools/spelling/spelling_dictionary.txt @@ -238,6 +238,8 @@ Nilsson Nonhashable Oauth OCSP +OD +ODCDS OID OK OOM @@ -756,6 +758,7 @@ interpretable intra ints invariance +invoker iovec iovecs ips