diff --git a/api/envoy/extensions/filters/http/on_demand/v3/BUILD b/api/envoy/extensions/filters/http/on_demand/v3/BUILD index 7d1afef788cc..5479c90294f4 100644 --- a/api/envoy/extensions/filters/http/on_demand/v3/BUILD +++ b/api/envoy/extensions/filters/http/on_demand/v3/BUILD @@ -6,6 +6,7 @@ licenses(["notice"]) # Apache 2 api_proto_package( deps = [ + "//envoy/config/core/v3:pkg", "//envoy/config/filter/http/on_demand/v2:pkg", "@com_github_cncf_udpa//udpa/annotations:pkg", ], diff --git a/api/envoy/extensions/filters/http/on_demand/v3/on_demand.proto b/api/envoy/extensions/filters/http/on_demand/v3/on_demand.proto index 27e709f7a8d6..665ffea388bf 100644 --- a/api/envoy/extensions/filters/http/on_demand/v3/on_demand.proto +++ b/api/envoy/extensions/filters/http/on_demand/v3/on_demand.proto @@ -2,6 +2,8 @@ syntax = "proto3"; package envoy.extensions.filters.http.on_demand.v3; +import "envoy/config/core/v3/config_source.proto"; + import "udpa/annotations/status.proto"; import "udpa/annotations/versioning.proto"; @@ -10,11 +12,31 @@ option java_outer_classname = "OnDemandProto"; option java_multiple_files = true; option (udpa.annotations.file_status).package_version_status = ACTIVE; -// [#protodoc-title: OnDemand] -// IP tagging :ref:`configuration overview `. +// [#protodoc-title: On Demand Discovery] +// On Demand Discovery :ref:`configuration overview `. // [#extension: envoy.filters.http.on_demand] +// On Demand Discovery filter config. message OnDemand { option (udpa.annotations.versioning).previous_message_type = "envoy.config.filter.http.on_demand.v2.OnDemand"; + + // An optional configuration for on-demand cluster discovery + // service. If not specified, the on-demand cluster discovery will + // be disabled. When it's specified, the filter will pause a request + // to an unknown cluster and will begin a cluster discovery + // process. When the discovery is finished (successfully or not), + // the request will be resumed for further processing. + config.core.v3.ConfigSource odcds_config = 1; +} + +// Per-route configuration for On Demand Discovery. +message PerRouteConfig { + // An optional configuration for on-demand cluster discovery + // service. If not specified, the on-demand cluster discovery will + // be disabled. When it's specified, the filter will pause a request + // to an unknown cluster and will begin a cluster discovery + // process. When the discovery is finished (successfully or not), + // the request will be resumed for further processing. + config.core.v3.ConfigSource odcds_config = 1; } diff --git a/api/envoy/extensions/filters/http/on_demand/v4alpha/BUILD b/api/envoy/extensions/filters/http/on_demand/v4alpha/BUILD new file mode 100644 index 000000000000..52f1451f0f4d --- /dev/null +++ b/api/envoy/extensions/filters/http/on_demand/v4alpha/BUILD @@ -0,0 +1,13 @@ +# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py. + +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") + +licenses(["notice"]) # Apache 2 + +api_proto_package( + deps = [ + "//envoy/config/core/v4alpha:pkg", + "//envoy/extensions/filters/http/on_demand/v3:pkg", + "@com_github_cncf_udpa//udpa/annotations:pkg", + ], +) diff --git a/api/envoy/extensions/filters/http/on_demand/v4alpha/on_demand.proto b/api/envoy/extensions/filters/http/on_demand/v4alpha/on_demand.proto new file mode 100644 index 000000000000..f9a9dd60341c --- /dev/null +++ b/api/envoy/extensions/filters/http/on_demand/v4alpha/on_demand.proto @@ -0,0 +1,45 @@ +syntax = "proto3"; + +package envoy.extensions.filters.http.on_demand.v4alpha; + +import "envoy/config/core/v4alpha/config_source.proto"; + +import "udpa/annotations/status.proto"; +import "udpa/annotations/versioning.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.filters.http.on_demand.v4alpha"; +option java_outer_classname = "OnDemandProto"; +option java_multiple_files = true; +option (udpa.annotations.file_status).package_version_status = NEXT_MAJOR_VERSION_CANDIDATE; + +// [#protodoc-title: On Demand Discovery] +// On Demand Discovery :ref:`configuration overview `. +// [#extension: envoy.filters.http.on_demand] + +// On Demand Discovery filter config. +message OnDemand { + option (udpa.annotations.versioning).previous_message_type = + "envoy.extensions.filters.http.on_demand.v3.OnDemand"; + + // An optional configuration for on-demand cluster discovery + // service. If not specified, the on-demand cluster discovery will + // be disabled. When it's specified, the filter will pause a request + // to an unknown cluster and will begin a cluster discovery + // process. When the discovery is finished (successfully or not), + // the request will be resumed for further processing. + config.core.v4alpha.ConfigSource odcds_config = 1; +} + +// Per-route configuration for On Demand Discovery. +message PerRouteConfig { + option (udpa.annotations.versioning).previous_message_type = + "envoy.extensions.filters.http.on_demand.v3.PerRouteConfig"; + + // An optional configuration for on-demand cluster discovery + // service. If not specified, the on-demand cluster discovery will + // be disabled. When it's specified, the filter will pause a request + // to an unknown cluster and will begin a cluster discovery + // process. When the discovery is finished (successfully or not), + // the request will be resumed for further processing. + config.core.v4alpha.ConfigSource odcds_config = 1; +} diff --git a/generated_api_shadow/envoy/extensions/filters/http/on_demand/v3/BUILD b/generated_api_shadow/envoy/extensions/filters/http/on_demand/v3/BUILD index 7d1afef788cc..5479c90294f4 100644 --- a/generated_api_shadow/envoy/extensions/filters/http/on_demand/v3/BUILD +++ b/generated_api_shadow/envoy/extensions/filters/http/on_demand/v3/BUILD @@ -6,6 +6,7 @@ licenses(["notice"]) # Apache 2 api_proto_package( deps = [ + "//envoy/config/core/v3:pkg", "//envoy/config/filter/http/on_demand/v2:pkg", "@com_github_cncf_udpa//udpa/annotations:pkg", ], diff --git a/generated_api_shadow/envoy/extensions/filters/http/on_demand/v3/on_demand.proto b/generated_api_shadow/envoy/extensions/filters/http/on_demand/v3/on_demand.proto index 27e709f7a8d6..665ffea388bf 100644 --- a/generated_api_shadow/envoy/extensions/filters/http/on_demand/v3/on_demand.proto +++ b/generated_api_shadow/envoy/extensions/filters/http/on_demand/v3/on_demand.proto @@ -2,6 +2,8 @@ syntax = "proto3"; package envoy.extensions.filters.http.on_demand.v3; +import "envoy/config/core/v3/config_source.proto"; + import "udpa/annotations/status.proto"; import "udpa/annotations/versioning.proto"; @@ -10,11 +12,31 @@ option java_outer_classname = "OnDemandProto"; option java_multiple_files = true; option (udpa.annotations.file_status).package_version_status = ACTIVE; -// [#protodoc-title: OnDemand] -// IP tagging :ref:`configuration overview `. +// [#protodoc-title: On Demand Discovery] +// On Demand Discovery :ref:`configuration overview `. // [#extension: envoy.filters.http.on_demand] +// On Demand Discovery filter config. message OnDemand { option (udpa.annotations.versioning).previous_message_type = "envoy.config.filter.http.on_demand.v2.OnDemand"; + + // An optional configuration for on-demand cluster discovery + // service. If not specified, the on-demand cluster discovery will + // be disabled. When it's specified, the filter will pause a request + // to an unknown cluster and will begin a cluster discovery + // process. When the discovery is finished (successfully or not), + // the request will be resumed for further processing. + config.core.v3.ConfigSource odcds_config = 1; +} + +// Per-route configuration for On Demand Discovery. +message PerRouteConfig { + // An optional configuration for on-demand cluster discovery + // service. If not specified, the on-demand cluster discovery will + // be disabled. When it's specified, the filter will pause a request + // to an unknown cluster and will begin a cluster discovery + // process. When the discovery is finished (successfully or not), + // the request will be resumed for further processing. + config.core.v3.ConfigSource odcds_config = 1; } diff --git a/generated_api_shadow/envoy/extensions/filters/http/on_demand/v4alpha/BUILD b/generated_api_shadow/envoy/extensions/filters/http/on_demand/v4alpha/BUILD new file mode 100644 index 000000000000..52f1451f0f4d --- /dev/null +++ b/generated_api_shadow/envoy/extensions/filters/http/on_demand/v4alpha/BUILD @@ -0,0 +1,13 @@ +# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py. + +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") + +licenses(["notice"]) # Apache 2 + +api_proto_package( + deps = [ + "//envoy/config/core/v4alpha:pkg", + "//envoy/extensions/filters/http/on_demand/v3:pkg", + "@com_github_cncf_udpa//udpa/annotations:pkg", + ], +) diff --git a/generated_api_shadow/envoy/extensions/filters/http/on_demand/v4alpha/on_demand.proto b/generated_api_shadow/envoy/extensions/filters/http/on_demand/v4alpha/on_demand.proto new file mode 100644 index 000000000000..f9a9dd60341c --- /dev/null +++ b/generated_api_shadow/envoy/extensions/filters/http/on_demand/v4alpha/on_demand.proto @@ -0,0 +1,45 @@ +syntax = "proto3"; + +package envoy.extensions.filters.http.on_demand.v4alpha; + +import "envoy/config/core/v4alpha/config_source.proto"; + +import "udpa/annotations/status.proto"; +import "udpa/annotations/versioning.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.filters.http.on_demand.v4alpha"; +option java_outer_classname = "OnDemandProto"; +option java_multiple_files = true; +option (udpa.annotations.file_status).package_version_status = NEXT_MAJOR_VERSION_CANDIDATE; + +// [#protodoc-title: On Demand Discovery] +// On Demand Discovery :ref:`configuration overview `. +// [#extension: envoy.filters.http.on_demand] + +// On Demand Discovery filter config. +message OnDemand { + option (udpa.annotations.versioning).previous_message_type = + "envoy.extensions.filters.http.on_demand.v3.OnDemand"; + + // An optional configuration for on-demand cluster discovery + // service. If not specified, the on-demand cluster discovery will + // be disabled. When it's specified, the filter will pause a request + // to an unknown cluster and will begin a cluster discovery + // process. When the discovery is finished (successfully or not), + // the request will be resumed for further processing. + config.core.v4alpha.ConfigSource odcds_config = 1; +} + +// Per-route configuration for On Demand Discovery. +message PerRouteConfig { + option (udpa.annotations.versioning).previous_message_type = + "envoy.extensions.filters.http.on_demand.v3.PerRouteConfig"; + + // An optional configuration for on-demand cluster discovery + // service. If not specified, the on-demand cluster discovery will + // be disabled. When it's specified, the filter will pause a request + // to an unknown cluster and will begin a cluster discovery + // process. When the discovery is finished (successfully or not), + // the request will be resumed for further processing. + config.core.v4alpha.ConfigSource odcds_config = 1; +} diff --git a/include/envoy/upstream/cluster_manager.h b/include/envoy/upstream/cluster_manager.h index 813476b4fdc4..2c0e02cbe208 100644 --- a/include/envoy/upstream/cluster_manager.h +++ b/include/envoy/upstream/cluster_manager.h @@ -39,7 +39,7 @@ namespace Envoy { namespace Upstream { /** - * ClusterUpdateCallbacks provide a way to exposes Cluster lifecycle events in the + * ClusterUpdateCallbacks provide a way to expose Cluster lifecycle events in the * ClusterManager. */ class ClusterUpdateCallbacks { @@ -72,6 +72,39 @@ class ClusterUpdateCallbacksHandle { using ClusterUpdateCallbacksHandlePtr = std::unique_ptr; +/** + * Status enum for the result of an attempted cluster discovery. + */ +enum class ClusterDiscoveryStatus { + /** + * Cluster was not found during the discovery process. + */ + Missing, + /** + * Cluster found and currently available through ClusterManager. + */ + Available, +}; + +/** + * ClusterDiscoveryCallback is a callback called at the end of the on-demand cluster discovery + * process. The status of the discovery is sent as a parameter. + */ +using ClusterDiscoveryCallback = std::function; +using ClusterDiscoveryCallbackWeakPtr = std::weak_ptr; +using ClusterDiscoveryCallbackSharedPtr = std::shared_ptr; + +/** + * ClusterDiscoveryCallbackHandle is a RAII wrapper for a ClusterDiscoveryCallback. Deleting the + * ClusterDiscoveryCallbackHandle will remove the callbacks from ClusterManager. + */ +class ClusterDiscoveryCallbackHandle { +public: + virtual ~ClusterDiscoveryCallbackHandle() = default; +}; + +using ClusterDiscoveryCallbackHandlePtr = std::unique_ptr; + class ClusterManagerFactory; // These are per-cluster per-thread, so not "global" stats. @@ -119,6 +152,9 @@ struct ClusterConnectivityState { int64_t connecting_stream_capacity_{}; }; +class OdCdsApi; +using OdCdsApiSharedPtr = std::shared_ptr; + /** * Manages connection pools and load balancing for upstream clusters. The cluster manager is * persistent and shared among multiple ongoing requests/connections. @@ -309,6 +345,24 @@ class ClusterManager { virtual const ClusterRequestResponseSizeStatNames& clusterRequestResponseSizeStatNames() const PURE; virtual const ClusterTimeoutBudgetStatNames& clusterTimeoutBudgetStatNames() const PURE; + + /** + * Request an on-demand discovery of a cluster with a passed name. Passed ODCDS may be used to + * perform the discovery process in the main thread if there is no discovery going on for this + * cluster. The passed callback will be invoked when the cluster is added and warmed up. It is + * expected that the callback will be destroyed when it is invoked. To cancel the discovery, + * destroy the returned handle and the callback. + * + * This function is thread-safe. + * + * @param odcds is a pointer to ODCDS used for discovery. Must not be a nullptr. + * @param name is the name of the cluster to be discovered. + * @param callback will be called when the discovery is finished. + * @return ClusterDiscoveryCallbackHandlePtr the discovery process handle. + */ + virtual ClusterDiscoveryCallbackHandlePtr + requestOnDemandClusterDiscovery(OdCdsApiSharedPtr odcds, const std::string& name, + ClusterDiscoveryCallbackWeakPtr callback) PURE; }; using ClusterManagerPtr = std::unique_ptr; @@ -339,6 +393,21 @@ class CdsApi { using CdsApiPtr = std::unique_ptr; +/** + * Abstract interface for a On-Demand CDS API provider. + */ +class OdCdsApi { +public: + virtual ~OdCdsApi() = default; + + /** + * File an on-demand request for a cluster. + */ + virtual void updateOnDemand(const std::string& cluster_name) PURE; +}; + +using OdCdsApiPtr = std::unique_ptr; + /** * Factory for objects needed during cluster manager operation. */ diff --git a/source/common/upstream/BUILD b/source/common/upstream/BUILD index 850e6c5eaadf..661aecadea4f 100644 --- a/source/common/upstream/BUILD +++ b/source/common/upstream/BUILD @@ -44,6 +44,25 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "odcds_api_lib", + srcs = ["odcds_api_impl.cc"], + hdrs = ["odcds_api_impl.h"], + deps = [ + ":cds_api_helper_lib", + "//include/envoy/config:subscription_interface", + "//include/envoy/protobuf:message_validator_interface", + "//include/envoy/stats:stats_interface", + "//include/envoy/upstream:cluster_manager_interface", + "//source/common/common:minimal_logger_lib", + "//source/common/config:subscription_base_interface", + "//source/common/grpc:common_lib", + "//source/common/protobuf", + "@envoy_api//envoy/config/cluster/v3:pkg_cc_proto", + "@envoy_api//envoy/config/core/v3:pkg_cc_proto", + ], +) + envoy_cc_library( name = "cluster_manager_lib", srcs = ["cluster_manager_impl.cc"], @@ -52,6 +71,7 @@ envoy_cc_library( ":cds_api_lib", ":load_balancer_lib", ":load_stats_reporter_lib", + ":odcds_api_lib", ":ring_hash_lb_lib", ":subset_lb_lib", "//include/envoy/api:api_interface", diff --git a/source/common/upstream/cds_api_helper.h b/source/common/upstream/cds_api_helper.h index a7a91c4ebe46..ef88afddaf3c 100644 --- a/source/common/upstream/cds_api_helper.h +++ b/source/common/upstream/cds_api_helper.h @@ -37,7 +37,7 @@ class CdsApiHelper : Logger::Loggable { private: ClusterManager& cm_; - std::string name_; + const std::string name_; std::string system_version_info_; }; diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index a2f97ff9afc1..1ecea8f129e9 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -957,6 +957,7 @@ void ClusterManagerImpl::postThreadLocalClusterUpdate(ClusterManagerCluster& cm_ per_priority.overprovisioning_factor_ = host_set->overprovisioningFactor(); } + pending_cluster_creations_.erase(cm_cluster.cluster().info()->name()); tls_.runOnAllThreads( [info = cm_cluster.cluster().info(), params = std::move(params), add_or_update_cluster, load_balancer_factory](OptRef cluster_manager) { @@ -1027,6 +1028,185 @@ ClusterManagerImpl::addThreadLocalClusterUpdateCallbacks(ClusterUpdateCallbacks& return std::make_unique(cb, cluster_manager.update_callbacks_); } +namespace { + +using ClusterAddedCb = std::function; + +class ClusterCallbacks : public ClusterUpdateCallbacks { +public: + ClusterCallbacks(ClusterAddedCb cb) : cb_(std::move(cb)) {} + + void onClusterAddOrUpdate(ThreadLocalCluster& cluster) override { cb_(cluster); }; + + void onClusterRemoval(const std::string&) override {} + +private: + ClusterAddedCb cb_; +}; + +} // namespace + +ClusterManagerImpl::ClusterDiscoveryManager::ClusterDiscoveryManager( + ThreadLocalClusterManagerImpl& parent) + : parent_(parent) {} + +void ClusterManagerImpl::ClusterDiscoveryManager::ensureCallbacksAreInstalled() { + if (callbacks_handle_) { + return; + } + auto cb = ClusterAddedCb([this](ThreadLocalCluster& cluster) { + processClusterName(cluster.info()->name(), ClusterDiscoveryStatus::Available); + }); + callbacks_ = std::make_unique(cb); + callbacks_handle_ = parent_.parent_.addThreadLocalClusterUpdateCallbacks(*callbacks_); +} + +void ClusterManagerImpl::ClusterDiscoveryManager::processClusterName( + const std::string& name, ClusterDiscoveryStatus cluster_status) { + auto map_node_handle = pending_clusters_.extract(name); + if (map_node_handle.empty()) { + return; + } + for (const auto& weak_callback : map_node_handle.mapped()) { + auto callback = weak_callback.lock(); + if (callback != nullptr) { + (*callback)(cluster_status); + } + } + maybePostResetCallbacks(); +} + +ClusterManagerImpl::ClusterDiscoveryManager::Pair +ClusterManagerImpl::ClusterDiscoveryManager::addCallback( + const std::string& name, ClusterDiscoveryCallbackWeakPtr weak_callback) { + auto& callbacks_deque = pending_clusters_[name]; + callbacks_deque.emplace_back(weak_callback); + auto handle = + std::make_unique(*this, std::move(weak_callback), name); + auto discovery_in_progress = (callbacks_deque.size() > 1); + return {std::move(handle), discovery_in_progress}; +} + +void ClusterManagerImpl::ClusterDiscoveryManager::erase(const std::string& name, + ClusterDiscoveryCallbackWeakPtr cb) { + const bool drop_deque = eraseFromDeque(name, cb); + if (drop_deque) { + pending_clusters_.erase(name); + } + maybePostResetCallbacks(); +} + +bool ClusterManagerImpl::ClusterDiscoveryManager::eraseFromDeque( + const std::string& name, ClusterDiscoveryCallbackWeakPtr cb) { + auto it = pending_clusters_.find(name); + if (it == pending_clusters_.end()) { + return false; + } + auto& deque = it->second; + // Could use std::erase_if, but it's only in C++20. + auto it2 = std::remove_if(deque.begin(), deque.end(), + [cb](ClusterDiscoveryCallbackWeakPtr const& weak_ptr) { + if (cb.owner_before(weak_ptr)) { + return false; + } + if (weak_ptr.owner_before(cb)) { + return false; + } + return true; + }); + deque.erase(it2, deque.end()); + return deque.empty(); +} + +void ClusterManagerImpl::ClusterDiscoveryManager::maybePostResetCallbacks() { + if (!callbacks_handle_cleanup_posted_ && pending_clusters_.empty()) { + parent_.thread_local_dispatcher_.post([this] { + // Something might got added in the meantime, so check the + // map again. + if (pending_clusters_.empty()) { + callbacks_handle_.reset(); + callbacks_.reset(); + } + callbacks_handle_cleanup_posted_ = false; + }); + callbacks_handle_cleanup_posted_ = true; + } +} + +ClusterDiscoveryCallbackHandlePtr +ClusterManagerImpl::requestOnDemandClusterDiscovery(OdCdsApiSharedPtr odcds, + const std::string& name, + ClusterDiscoveryCallbackWeakPtr weak_callback) { + ThreadLocalClusterManagerImpl& cluster_manager = *tls_; + + cluster_manager.cdm_.ensureCallbacksAreInstalled(); + + auto [handle, discovery_in_progress] = cluster_manager.cdm_.addCallback(name, weak_callback); + if (discovery_in_progress) { + // We have already started a discovery process for a cluster with + // this name, so nothing more left to do here. + return std::move(handle); + } + dispatcher_.post([this, odcds = std::move(odcds), weak_callback, name, + &thread_local_dispatcher = cluster_manager.thread_local_dispatcher_] { + // Check for the cluster here too. It might have been added + // between the time when this callback was posted and when it is + // being executed. + if (getThreadLocalCluster(name) != nullptr) { + if (weak_callback.expired()) { + // Not only the cluster was added, but it was also already + // handled, so don't bother with posting the callback back to + // the worker thread. + return; + } + thread_local_dispatcher.post([weak_callback] { + if (auto callback = weak_callback.lock(); callback != nullptr) { + // If this gets called here, it means that we requested a + // discovery of a cluster without checking if that cluster + // is already known by cluster manager. + (*callback)(ClusterDiscoveryStatus::Available); + } + }); + return; + } + + auto it = pending_cluster_creations_.find(name); + if (it != pending_cluster_creations_.end()) { + // We already began the discovery process for this cluster, + // nothing to do. + return; + } + odcds->updateOnDemand(name); + auto timer_cb = Event::TimerCb([this, name] { notifyExpiredDiscovery(name); }); + auto timer = dispatcher_.createTimer(timer_cb); + timer->enableTimer(std::chrono::milliseconds(5000)); + // Keep odcds alive for the duration of the discovery process. + pending_cluster_creations_.insert( + {std::move(name), ClusterCreation{std::move(odcds), std::move(timer)}}); + }); + + return std::move(handle); +} + +void ClusterManagerImpl::notifyExpiredDiscovery(const std::string& name) { + auto map_node_handle = pending_cluster_creations_.extract(name); + if (map_node_handle.empty()) { + return; + } + // Defer destroying the timer, so it's not destroyed during its + // callback. TimerPtr is a unique_ptr, which is not copyable, but + // std::function is copyable, we turn a move-only unique_ptr into a + // copyable shared_ptr and pass that to the std::function. + dispatcher_.post([timer = std::shared_ptr( + std::move(map_node_handle.mapped().expiration_timer_))] {}); + tls_.runOnAllThreads([name](OptRef cluster_manager) { + if (!cluster_manager.has_value()) { + return; + } + cluster_manager->cdm_.processClusterName(name, ClusterDiscoveryStatus::Missing); + }); +} + ProtobufTypes::MessagePtr ClusterManagerImpl::dumpClusterConfigs() { auto config_dump = std::make_unique(); config_dump->set_version_info(cds_api_ != nullptr ? cds_api_->versionInfo() : ""); @@ -1061,7 +1241,7 @@ ProtobufTypes::MessagePtr ClusterManagerImpl::dumpClusterConfigs() { ClusterManagerImpl::ThreadLocalClusterManagerImpl::ThreadLocalClusterManagerImpl( ClusterManagerImpl& parent, Event::Dispatcher& dispatcher, const absl::optional& local_cluster_params) - : parent_(parent), thread_local_dispatcher_(dispatcher) { + : parent_(parent), thread_local_dispatcher_(dispatcher), cdm_(*this) { // If local cluster is defined then we need to initialize it first. if (local_cluster_params.has_value()) { const auto& local_cluster_name = local_cluster_params->info_->name(); diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index ec75f0af9467..a2ad7ce2fbe3 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -285,6 +285,10 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable> pending_clusters_; + ClusterUpdateCallbacksHandlePtr callbacks_handle_; + std::unique_ptr callbacks_; + bool callbacks_handle_cleanup_posted_ = false; + }; + + class ClusterDiscoveryCallbackHandleImpl : public ClusterDiscoveryCallbackHandle { + public: + ClusterDiscoveryCallbackHandleImpl(ClusterDiscoveryManager& parent, + ClusterDiscoveryCallbackWeakPtr cb, std::string name) + : parent_(parent), cb_(std::move(cb)), name_(std::move(name)) {} + + ~ClusterDiscoveryCallbackHandleImpl() override { parent_.erase(name_, cb_); } + + private: + ClusterDiscoveryManager& parent_; + ClusterDiscoveryCallbackWeakPtr cb_; + std::string name_; + }; + /** * Thread local cached cluster data. Each thread local cluster gets updates from the parent * central dynamic cluster (if applicable). It maintains load balancer state and any created @@ -465,6 +512,7 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable update_callbacks_; const PrioritySet* local_priority_set_{}; bool destroying_{}; + ClusterDiscoveryManager cdm_; }; struct ClusterData : public ClusterManagerCluster { @@ -574,10 +622,27 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable preconnect_pool); + struct ClusterCreation { + OdCdsApiSharedPtr odcds_; + Event::TimerPtr expiration_timer_; + }; + + using ClusterCreationsMap = absl::flat_hash_map; + +protected: + /** + * Notifies cluster discovery managers in each worker thread that + * the discovery process for the cluster with a passed name has + * timed out. + */ + void notifyExpiredDiscovery(const std::string& name); + +private: ClusterManagerFactory& factory_; Runtime::Loader& runtime_; Stats::Store& stats_; ThreadLocal::TypedSlot tls_; + ClusterCreationsMap pending_cluster_creations_; Random::RandomGenerator& random_; protected: diff --git a/source/common/upstream/odcds_api_impl.cc b/source/common/upstream/odcds_api_impl.cc new file mode 100644 index 000000000000..2d35fb99d398 --- /dev/null +++ b/source/common/upstream/odcds_api_impl.cc @@ -0,0 +1,85 @@ +#include "common/upstream/odcds_api_impl.h" + +#include "common/common/assert.h" +#include "common/grpc/common.h" + +#include "absl/strings/str_join.h" + +namespace Envoy { +namespace Upstream { + +OdCdsApiPtr OdCdsApiImpl::create(const envoy::config::core::v3::ConfigSource& cds_config, + ClusterManager& cm, Stats::Scope& scope, + ProtobufMessage::ValidationVisitor& validation_visitor) { + return OdCdsApiPtr{new OdCdsApiImpl(cds_config, cm, scope, validation_visitor)}; +} + +OdCdsApiImpl::OdCdsApiImpl(const envoy::config::core::v3::ConfigSource& cds_config, + ClusterManager& cm, Stats::Scope& scope, + ProtobufMessage::ValidationVisitor& validation_visitor) + : Envoy::Config::SubscriptionBase( + cds_config.resource_api_version(), validation_visitor, "name"), + helper_(cm, "odcds"), cm_(cm), scope_(scope.createScope("odcds.")), + status_(StartStatus::NotStarted) { + const auto resource_name = getResourceName(); + subscription_ = cm_.subscriptionFactory().subscriptionFromConfigSource( + cds_config, Grpc::Common::typeUrl(resource_name), *scope_, *this, resource_decoder_, {}); +} + +void OdCdsApiImpl::onConfigUpdate(const std::vector& resources, + const std::string& version_info) { + UNREFERENCED_PARAMETER(resources); + UNREFERENCED_PARAMETER(version_info); + NOT_IMPLEMENTED_GCOVR_EXCL_LINE; +} + +void OdCdsApiImpl::onConfigUpdate(const std::vector& added_resources, + const Protobuf::RepeatedPtrField& removed_resources, + const std::string& system_version_info) { + auto exception_msgs = + helper_.onConfigUpdate(added_resources, removed_resources, system_version_info); + sendAwaiting(); + if (!exception_msgs.empty()) { + throw EnvoyException( + fmt::format("Error adding/updating cluster(s) {}", absl::StrJoin(exception_msgs, ", "))); + } +} + +void OdCdsApiImpl::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason, + const EnvoyException*) { + ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason); + sendAwaiting(); +} + +void OdCdsApiImpl::sendAwaiting() { + if (status_ != StartStatus::Started) { + return; + } + status_ = StartStatus::InitialFetchDone; + if (awaiting_names_.empty()) { + return; + } + subscription_->requestOnDemandUpdate(awaiting_names_); + awaiting_names_.clear(); +} + +void OdCdsApiImpl::updateOnDemand(const std::string& cluster_name) { + switch (status_) { + case StartStatus::NotStarted: + status_ = StartStatus::Started; + subscription_->start({cluster_name}); + return; + + case StartStatus::Started: + awaiting_names_.insert(cluster_name); + return; + + case StartStatus::InitialFetchDone: + subscription_->requestOnDemandUpdate({cluster_name}); + return; + } + NOT_REACHED_GCOVR_EXCL_LINE; +} + +} // namespace Upstream +} // namespace Envoy diff --git a/source/common/upstream/odcds_api_impl.h b/source/common/upstream/odcds_api_impl.h new file mode 100644 index 000000000000..50814eb253d2 --- /dev/null +++ b/source/common/upstream/odcds_api_impl.h @@ -0,0 +1,67 @@ +#pragma once + +#include +#include + +#include "envoy/config/cluster/v3/cluster.pb.h" +#include "envoy/config/cluster/v3/cluster.pb.validate.h" +#include "envoy/config/core/v3/config_source.pb.h" +#include "envoy/config/subscription.h" +#include "envoy/protobuf/message_validator.h" +#include "envoy/stats/scope.h" +#include "envoy/upstream/cluster_manager.h" + +#include "common/config/subscription_base.h" +#include "common/protobuf/protobuf.h" +#include "common/upstream/cds_api_helper.h" + +namespace Envoy { +namespace Upstream { + +enum class StartStatus { + // No initial fetch started. + NotStarted, + // Initial fetch started. + Started, + // Initial fetch arrived. + InitialFetchDone, +}; + +/** + * ODCDS API implementation that fetches via Subscription. + */ +class OdCdsApiImpl : public OdCdsApi, + Envoy::Config::SubscriptionBase, + Logger::Loggable { +public: + static OdCdsApiPtr create(const envoy::config::core::v3::ConfigSource& cds_config, + ClusterManager& cm, Stats::Scope& scope, + ProtobufMessage::ValidationVisitor& validation_visitor); + + // Upstream::OdCdsApi + void updateOnDemand(const std::string& cluster_name) override; + +private: + // Config::SubscriptionCallbacks + void onConfigUpdate(const std::vector& resources, + const std::string& version_info) override; + void onConfigUpdate(const std::vector& added_resources, + const Protobuf::RepeatedPtrField& removed_resources, + const std::string& system_version_info) override; + void onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason, + const EnvoyException* e) override; + + OdCdsApiImpl(const envoy::config::core::v3::ConfigSource& cds_config, ClusterManager& cm, + Stats::Scope& scope, ProtobufMessage::ValidationVisitor& validation_visitor); + void sendAwaiting(); + + CdsApiHelper helper_; + ClusterManager& cm_; + Stats::ScopePtr scope_; + StartStatus status_; + absl::flat_hash_set awaiting_names_; + Config::SubscriptionPtr subscription_; +}; + +} // namespace Upstream +} // namespace Envoy diff --git a/source/extensions/filters/http/on_demand/BUILD b/source/extensions/filters/http/on_demand/BUILD index 72c5f6b33d56..dae3ad1d6383 100644 --- a/source/extensions/filters/http/on_demand/BUILD +++ b/source/extensions/filters/http/on_demand/BUILD @@ -19,10 +19,14 @@ envoy_cc_library( "//include/envoy/event:dispatcher_interface", "//include/envoy/http:filter_interface", "//include/envoy/server:filter_config_interface", + "//include/envoy/upstream:cluster_manager_interface", "//source/common/common:assert_lib", "//source/common/common:enum_to_int", "//source/common/http:codes_lib", "//source/common/http:header_map_lib", + "//source/common/upstream:odcds_api_lib", + "//source/extensions/filters/http:well_known_names", + "@envoy_api//envoy/extensions/filters/http/on_demand/v3:pkg_cc_proto", ], ) diff --git a/source/extensions/filters/http/on_demand/config.cc b/source/extensions/filters/http/on_demand/config.cc index 42d07ef4dca6..04e766fae8b3 100644 --- a/source/extensions/filters/http/on_demand/config.cc +++ b/source/extensions/filters/http/on_demand/config.cc @@ -1,7 +1,5 @@ #include "extensions/filters/http/on_demand/config.h" -#include "envoy/extensions/filters/http/on_demand/v3/on_demand.pb.validate.h" - #include "extensions/filters/http/on_demand/on_demand_update.h" namespace Envoy { @@ -10,14 +8,24 @@ namespace HttpFilters { namespace OnDemand { Http::FilterFactoryCb OnDemandFilterFactory::createFilterFactoryFromProtoTyped( - const envoy::extensions::filters::http::on_demand::v3::OnDemand&, const std::string&, - Server::Configuration::FactoryContext&) { - return [](Http::FilterChainFactoryCallbacks& callbacks) -> void { - callbacks.addStreamDecoderFilter( - std::make_shared()); + const envoy::extensions::filters::http::on_demand::v3::OnDemand& proto_config, + const std::string&, Server::Configuration::FactoryContext& context) { + OnDemandFilterConfigSharedPtr config = std::make_shared( + proto_config, context.clusterManager(), context.scope(), context.messageValidationVisitor()); + return [config](Http::FilterChainFactoryCallbacks& callbacks) -> void { + callbacks.addStreamDecoderFilter(std::make_shared(config)); }; } +Router::RouteSpecificFilterConfigConstSharedPtr +OnDemandFilterFactory::createRouteSpecificFilterConfigTyped( + const envoy::extensions::filters::http::on_demand::v3::PerRouteConfig& proto_config, + Server::Configuration::ServerFactoryContext& context, + ProtobufMessage::ValidationVisitor& validation_visitor) { + return std::make_shared(proto_config, context.clusterManager(), + context.scope(), validation_visitor); +} + /** * Static registration for the on-demand filter. @see RegisterFactory. */ diff --git a/source/extensions/filters/http/on_demand/config.h b/source/extensions/filters/http/on_demand/config.h index eb48654ab34c..d07701c5d2db 100644 --- a/source/extensions/filters/http/on_demand/config.h +++ b/source/extensions/filters/http/on_demand/config.h @@ -15,7 +15,8 @@ namespace OnDemand { * Config registration for the OnDemand filter. @see NamedHttpFilterConfigFactory. */ class OnDemandFilterFactory - : public Common::FactoryBase { + : public Common::FactoryBase { public: OnDemandFilterFactory() : FactoryBase(HttpFilterNames::get().OnDemand) {} @@ -23,6 +24,11 @@ class OnDemandFilterFactory Http::FilterFactoryCb createFilterFactoryFromProtoTyped( const envoy::extensions::filters::http::on_demand::v3::OnDemand& proto_config, const std::string&, Server::Configuration::FactoryContext& context) override; + + Router::RouteSpecificFilterConfigConstSharedPtr createRouteSpecificFilterConfigTyped( + const envoy::extensions::filters::http::on_demand::v3::PerRouteConfig& config, + Server::Configuration::ServerFactoryContext& context, + ProtobufMessage::ValidationVisitor& visitor) override; }; } // namespace OnDemand diff --git a/source/extensions/filters/http/on_demand/on_demand_update.cc b/source/extensions/filters/http/on_demand/on_demand_update.cc index 2e392846f3b0..7a226075e6fb 100644 --- a/source/extensions/filters/http/on_demand/on_demand_update.cc +++ b/source/extensions/filters/http/on_demand/on_demand_update.cc @@ -4,16 +4,49 @@ #include "common/common/enum_to_int.h" #include "common/common/logger.h" #include "common/http/codes.h" +#include "common/upstream/odcds_api_impl.h" + +#include "extensions/filters/http/well_known_names.h" namespace Envoy { namespace Extensions { namespace HttpFilters { namespace OnDemand { +namespace { + +template +Upstream::OdCdsApiSharedPtr createOdCdsApi(const ProtoConfig& proto_config, + Upstream::ClusterManager& cm, Stats::Scope& scope, + ProtobufMessage::ValidationVisitor& validation_visitor) { + if (!proto_config.has_odcds_config()) { + return nullptr; + } + return Upstream::OdCdsApiImpl::create(proto_config.odcds_config(), cm, scope, validation_visitor); +} + +} // namespace + +OnDemandFilterConfig::OnDemandFilterConfig(Upstream::ClusterManager& cm, + Upstream::OdCdsApiSharedPtr odcds) + : cm_(cm), odcds_(std::move(odcds)) {} + +OnDemandFilterConfig::OnDemandFilterConfig( + const envoy::extensions::filters::http::on_demand::v3::OnDemand& proto_config, + Upstream::ClusterManager& cm, Stats::Scope& scope, + ProtobufMessage::ValidationVisitor& validation_visitor) + : OnDemandFilterConfig(cm, createOdCdsApi(proto_config, cm, scope, validation_visitor)) {} + +OnDemandFilterConfig::OnDemandFilterConfig( + const envoy::extensions::filters::http::on_demand::v3::PerRouteConfig& proto_config, + Upstream::ClusterManager& cm, Stats::Scope& scope, + ProtobufMessage::ValidationVisitor& validation_visitor) + : OnDemandFilterConfig(cm, createOdCdsApi(proto_config, cm, scope, validation_visitor)) {} + Http::FilterHeadersStatus OnDemandRouteUpdate::decodeHeaders(Http::RequestHeaderMap&, bool) { - if (callbacks_->route() != nullptr) { - filter_iteration_state_ = Http::FilterHeadersStatus::Continue; + if (auto route = callbacks_->route(); route != nullptr) { + handleOnDemandCDS(*route); return filter_iteration_state_; } // decodeHeaders() is interrupted. @@ -28,6 +61,55 @@ Http::FilterHeadersStatus OnDemandRouteUpdate::decodeHeaders(Http::RequestHeader return filter_iteration_state_; } +void OnDemandRouteUpdate::handleOnDemandCDS(const Router::Route& route) { + auto entry = route.routeEntry(); + if (entry == nullptr) { + // No entry? Nothing we can do here. + filter_iteration_state_ = Http::FilterHeadersStatus::Continue; + return; + } + auto config = getConfig(*entry); + if (config == nullptr) { + filter_iteration_state_ = Http::FilterHeadersStatus::Continue; + return; + } + auto odcds = config->odcds(); + if (odcds == nullptr) { + // No ODCDS? It means that on-demand discovery is disabled. + filter_iteration_state_ = Http::FilterHeadersStatus::Continue; + return; + } + auto& cm = config->clusterManager(); + if (callbacks_->clusterInfo() != nullptr) { + // Cluster already exists, so nothing to do here. + filter_iteration_state_ = Http::FilterHeadersStatus::Continue; + return; + } + auto cluster_name = entry->clusterName(); + if (cluster_name.empty()) { + // Empty cluster name may be a result of a missing HTTP header + // used for getting the cluster name. Nothing we can do here. + filter_iteration_state_ = Http::FilterHeadersStatus::Continue; + return; + } + filter_iteration_state_ = Http::FilterHeadersStatus::StopIteration; + cluster_discovery_callback_ = std::make_shared( + [this](Upstream::ClusterDiscoveryStatus cluster_status) { + onClusterDiscoveryCompletion(cluster_status); + }); + cluster_discovery_handle_ = + cm.requestOnDemandClusterDiscovery(odcds, cluster_name, cluster_discovery_callback_); +} + +const OnDemandFilterConfig* OnDemandRouteUpdate::getConfig(const Router::RouteEntry& entry) { + auto config = + entry.mostSpecificPerFilterConfigTyped(HttpFilterNames::get().OnDemand); + if (config != nullptr) { + return config; + } + return config_.get(); +} + Http::FilterDataStatus OnDemandRouteUpdate::decodeData(Buffer::Instance&, bool) { return filter_iteration_state_ == Http::FilterHeadersStatus::StopIteration ? Http::FilterDataStatus::StopIterationAndWatermark @@ -42,10 +124,15 @@ void OnDemandRouteUpdate::setDecoderFilterCallbacks(Http::StreamDecoderFilterCal callbacks_ = &callbacks; } -// A weak_ptr copy of the route_config_updated_callback_ is kept by RdsRouteConfigProviderImpl -// in config_update_callbacks_. By resetting the pointer in onDestroy() callback we ensure -// that this filter/filter-chain will not be resumed if the corresponding has been closed -void OnDemandRouteUpdate::onDestroy() { route_config_updated_callback_.reset(); } +// A weak_ptr copy of the route_config_updated_callback_ is kept by RdsRouteConfigProviderImpl in +// config_update_callbacks_. Same about cluster_discovery_callback kept by ClusterDiscoveryManager +// in pending_clusters_. By resetting the pointers in onDestroy() callback we ensure that this +// filter/filter-chain will not be resumed if the corresponding has been closed. +void OnDemandRouteUpdate::onDestroy() { + route_config_updated_callback_.reset(); + cluster_discovery_callback_.reset(); + cluster_discovery_handle_.reset(); +} // This is the callback which is called when an update requested in requestRouteConfigUpdate() // has been propagated to workers, at which point the request processing is restarted from the @@ -70,6 +157,25 @@ void OnDemandRouteUpdate::onRouteConfigUpdateCompletion(bool route_exists) { callbacks_->continueDecoding(); } +void OnDemandRouteUpdate::onClusterDiscoveryCompletion( + Upstream::ClusterDiscoveryStatus cluster_status) { + filter_iteration_state_ = Http::FilterHeadersStatus::Continue; + cluster_discovery_callback_.reset(); + cluster_discovery_handle_.reset(); + if (cluster_status == Upstream::ClusterDiscoveryStatus::Available && + !callbacks_->decodingBuffer()) { // Redirects with body not yet supported. + const Http::ResponseHeaderMap* headers = nullptr; + if (callbacks_->recreateStream(headers)) { + callbacks_->clearRouteCache(); + return; + } + } + + // Cluster still does not exist or we failed to recreate the + // stream. Either way, continue with the filter-chain. + callbacks_->continueDecoding(); +} + } // namespace OnDemand } // namespace HttpFilters } // namespace Extensions diff --git a/source/extensions/filters/http/on_demand/on_demand_update.h b/source/extensions/filters/http/on_demand/on_demand_update.h index 55c9a382453c..79972b9fa095 100644 --- a/source/extensions/filters/http/on_demand/on_demand_update.h +++ b/source/extensions/filters/http/on_demand/on_demand_update.h @@ -1,17 +1,45 @@ #pragma once +#include "envoy/extensions/filters/http/on_demand/v3/on_demand.pb.h" +#include "envoy/extensions/filters/http/on_demand/v3/on_demand.pb.validate.h" #include "envoy/http/filter.h" +#include "envoy/upstream/cluster_manager.h" namespace Envoy { namespace Extensions { namespace HttpFilters { namespace OnDemand { +class OnDemandFilterConfig : public Router::RouteSpecificFilterConfig { +public: + OnDemandFilterConfig(Upstream::ClusterManager& cm, Upstream::OdCdsApiSharedPtr odcds); + OnDemandFilterConfig( + const envoy::extensions::filters::http::on_demand::v3::OnDemand& proto_config, + Upstream::ClusterManager& cm, Stats::Scope& scope, + ProtobufMessage::ValidationVisitor& validation_visitor); + OnDemandFilterConfig( + const envoy::extensions::filters::http::on_demand::v3::PerRouteConfig& proto_config, + Upstream::ClusterManager& cm, Stats::Scope& scope, + ProtobufMessage::ValidationVisitor& validation_visitor); + + Upstream::ClusterManager& clusterManager() const { return cm_; } + Upstream::OdCdsApiSharedPtr odcds() const { return odcds_; } + +private: + Upstream::ClusterManager& cm_; + Upstream::OdCdsApiSharedPtr odcds_; +}; + +using OnDemandFilterConfigSharedPtr = std::shared_ptr; + class OnDemandRouteUpdate : public Http::StreamDecoderFilter { public: - OnDemandRouteUpdate() = default; + OnDemandRouteUpdate(OnDemandFilterConfigSharedPtr config) : config_(std::move(config)) {} void onRouteConfigUpdateCompletion(bool route_exists); + void onClusterDiscoveryCompletion(Upstream::ClusterDiscoveryStatus cluster_status); + void handleOnDemandCDS(const Router::Route& route); + const OnDemandFilterConfig* getConfig(const Router::RouteEntry& entry); void setFilterIterationState(Envoy::Http::FilterHeadersStatus status) { filter_iteration_state_ = status; @@ -30,8 +58,11 @@ class OnDemandRouteUpdate : public Http::StreamDecoderFilter { void onDestroy() override; private: + OnDemandFilterConfigSharedPtr config_; Http::StreamDecoderFilterCallbacks* callbacks_{}; Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_callback_; + Upstream::ClusterDiscoveryCallbackHandlePtr cluster_discovery_handle_; + Upstream::ClusterDiscoveryCallbackSharedPtr cluster_discovery_callback_; Envoy::Http::FilterHeadersStatus filter_iteration_state_{Http::FilterHeadersStatus::Continue}; bool decode_headers_active_{false}; }; diff --git a/test/common/upstream/BUILD b/test/common/upstream/BUILD index e4b4b0948812..fe335e0c4bb5 100644 --- a/test/common/upstream/BUILD +++ b/test/common/upstream/BUILD @@ -14,6 +14,19 @@ licenses(["notice"]) # Apache 2 envoy_package() +envoy_cc_test( + name = "odcds_api_impl_test", + srcs = ["odcds_api_impl_test.cc"], + deps = [ + "//include/envoy/config:subscription_interface", + "//source/common/stats:isolated_store_lib", + "//source/common/upstream:odcds_api_lib", + "//test/mocks/protobuf:protobuf_mocks", + "//test/mocks/upstream:cluster_manager_mocks", + "@envoy_api//envoy/config/core/v3:pkg_cc_proto", + ], +) + envoy_cc_test( name = "cds_api_impl_test", srcs = ["cds_api_impl_test.cc"], @@ -48,6 +61,7 @@ envoy_cc_test( "//test/mocks/upstream:cluster_update_callbacks_mocks", "//test/mocks/upstream:health_checker_mocks", "//test/mocks/upstream:load_balancer_context_mock", + "//test/mocks/upstream:odcds_api_mocks", "//test/mocks/upstream:thread_aware_load_balancer_mocks", "//test/test_common:test_runtime_lib", "@envoy_api//envoy/admin/v3:pkg_cc_proto", diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index de00bb302046..2da90078e514 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -16,6 +16,7 @@ #include "test/mocks/upstream/cluster_update_callbacks.h" #include "test/mocks/upstream/health_checker.h" #include "test/mocks/upstream/load_balancer_context.h" +#include "test/mocks/upstream/odcds_api.h" #include "test/mocks/upstream/thread_aware_load_balancer.h" #include "test/test_common/test_runtime.h" @@ -164,6 +165,140 @@ envoy::config::bootstrap::v3::Bootstrap defaultConfig() { return parseBootstrapFromV3Yaml(yaml); } +class ODCDTest : public ClusterManagerImplTest { +public: + void SetUp() override { + create(defaultConfig()); + odcds_ = MockOdCdsApi::createShared(); + // These tmp things are kept to keep the cluster lifecycle callback, + // because mock would cause the callback to be removed while + // iterating the list of those callbacks. + tmp_cb_ = std::make_shared([](ClusterDiscoveryStatus) {}); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_tmp")); + tmp_handle_ = cluster_manager_->requestOnDemandClusterDiscovery(odcds_, "cluster_tmp", tmp_cb_); + } + + void TearDown() override { + tmp_cb_.reset(); + tmp_handle_.reset(); + factory_.tls_.shutdownThread(); + } + + ClusterDiscoveryCallbackSharedPtr createCallback() { + return std::make_shared( + [this](ClusterDiscoveryStatus cluster_status) { + UNREFERENCED_PARAMETER(cluster_status); + this->testFunction(); + }); + } + + ClusterDiscoveryCallbackSharedPtr createCallback(ClusterDiscoveryStatus expected_cluster_status) { + return std::make_shared( + [this, expected_cluster_status](ClusterDiscoveryStatus cluster_status) { + EXPECT_EQ(expected_cluster_status, cluster_status); + this->testFunction(); + }); + } + + MOCK_METHOD(void, testFunction, ()); + + MockOdCdsApiSharedPtr odcds_; + ClusterDiscoveryCallbackSharedPtr tmp_cb_; + ClusterDiscoveryCallbackHandlePtr tmp_handle_; +}; + +TEST_F(ODCDTest, TestRequest) { + auto cb = createCallback(); + EXPECT_CALL(*this, testFunction()).Times(0); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo")); + auto handle = cluster_manager_->requestOnDemandClusterDiscovery(odcds_, "cluster_foo", cb); +} + +TEST_F(ODCDTest, TestRequestRepeated) { + auto cb1 = createCallback(); + auto cb2 = createCallback(); + EXPECT_CALL(*this, testFunction()).Times(0); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo")); + auto handle1 = cluster_manager_->requestOnDemandClusterDiscovery(odcds_, "cluster_foo", cb1); + auto handle2 = cluster_manager_->requestOnDemandClusterDiscovery(odcds_, "cluster_foo", cb2); +} + +TEST_F(ODCDTest, TestClusterRediscovered) { + auto cb = createCallback(ClusterDiscoveryStatus::Available); + EXPECT_CALL(*this, testFunction()); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo")).Times(2); + auto handle = cluster_manager_->requestOnDemandClusterDiscovery(odcds_, "cluster_foo", cb); + cluster_manager_->addOrUpdateCluster(defaultStaticCluster("cluster_foo"), "version1"); + handle.reset(); + cluster_manager_->removeCluster("cluster_foo"); + cb = createCallback(); + handle = cluster_manager_->requestOnDemandClusterDiscovery(odcds_, "cluster_foo", cb); +} + +TEST_F(ODCDTest, TestClusterRediscoveredAfterFail) { + auto cb = createCallback(ClusterDiscoveryStatus::Missing); + EXPECT_CALL(*this, testFunction()); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo")).Times(2); + auto handle = cluster_manager_->requestOnDemandClusterDiscovery(odcds_, "cluster_foo", cb); + cluster_manager_->notifyExpiredDiscovery("cluster_foo"); + handle.reset(); + cb = createCallback(); + handle = cluster_manager_->requestOnDemandClusterDiscovery(odcds_, "cluster_foo", cb); +} + +TEST_F(ODCDTest, TestDiscoveryManagerIgnoresIrrelevantClusters) { + auto cb = std::make_shared([](ClusterDiscoveryStatus) { + ADD_FAILURE() << "The callback should not be called for irrelevant clusters"; + }); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo")); + auto handle = cluster_manager_->requestOnDemandClusterDiscovery(odcds_, "cluster_foo", cb); + cluster_manager_->addOrUpdateCluster(defaultStaticCluster("cluster_irrelevant"), "version1"); +} + +TEST_F(ODCDTest, TestDroppingHandles) { + auto cb1 = std::make_shared( + [](ClusterDiscoveryStatus) { ADD_FAILURE() << "The callback 1 should not be called"; }); + auto cb2 = std::make_shared( + [](ClusterDiscoveryStatus) { ADD_FAILURE() << "The callback 2 should not be called"; }); + auto cb3 = std::make_shared( + [](ClusterDiscoveryStatus) { ADD_FAILURE() << "The callback 3 should not be called"; }); + auto cb4 = std::make_shared( + [](ClusterDiscoveryStatus) { ADD_FAILURE() << "The callback 4 should not be called"; }); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo1")); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo2")); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo3")); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo4")); + auto handle1 = cluster_manager_->requestOnDemandClusterDiscovery(odcds_, "cluster_foo1", cb1); + auto handle2 = cluster_manager_->requestOnDemandClusterDiscovery(odcds_, "cluster_foo2", cb2); + auto handle3 = cluster_manager_->requestOnDemandClusterDiscovery(odcds_, "cluster_foo3", cb3); + auto handle4 = cluster_manager_->requestOnDemandClusterDiscovery(odcds_, "cluster_foo4", cb4); + + handle2.reset(); + handle3.reset(); + handle1.reset(); + handle4.reset(); + + cluster_manager_->addOrUpdateCluster(defaultStaticCluster("cluster_foo1"), "version1"); + cluster_manager_->addOrUpdateCluster(defaultStaticCluster("cluster_foo2"), "version1"); + cluster_manager_->addOrUpdateCluster(defaultStaticCluster("cluster_foo3"), "version1"); + cluster_manager_->addOrUpdateCluster(defaultStaticCluster("cluster_foo4"), "version1"); +} + +TEST_F(ODCDTest, TestCallbackWithExistingCluster) { + auto cb = createCallback(ClusterDiscoveryStatus::Available); + EXPECT_CALL(*this, testFunction()); + cluster_manager_->addOrUpdateCluster(defaultStaticCluster("cluster_foo"), "version1"); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo")).Times(0); + auto handle = cluster_manager_->requestOnDemandClusterDiscovery(odcds_, "cluster_foo", cb); +} + +TEST_F(ODCDTest, TestExpiredCallbackWithExistingCluster) { + cluster_manager_->addOrUpdateCluster(defaultStaticCluster("cluster_foo"), "version1"); + ClusterDiscoveryCallbackSharedPtr null_cb; + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo")).Times(0); + auto handle = cluster_manager_->requestOnDemandClusterDiscovery(odcds_, "cluster_foo", null_cb); +} + class AlpnSocketFactory : public Network::RawBufferSocketFactory { public: bool supportsAlpn() const override { return true; } diff --git a/test/common/upstream/odcds_api_impl_test.cc b/test/common/upstream/odcds_api_impl_test.cc new file mode 100644 index 000000000000..0fcef10e9043 --- /dev/null +++ b/test/common/upstream/odcds_api_impl_test.cc @@ -0,0 +1,136 @@ +#include "envoy/config/core/v3/config_source.pb.h" +#include "envoy/config/subscription.h" + +#include "common/stats/isolated_store_impl.h" +#include "common/upstream/odcds_api_impl.h" + +#include "test/mocks/protobuf/mocks.h" +#include "test/mocks/upstream/cluster_manager.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Upstream { +namespace { + +using ::testing::ElementsAre; +using ::testing::InSequence; +using ::testing::UnorderedElementsAre; + +class OdCdsApiImplTest : public testing::Test { +public: + void SetUp() override { + envoy::config::core::v3::ConfigSource odcds_config; + odcds_ = OdCdsApiImpl::create(odcds_config, cm_, store_, validation_visitor_); + odcds_callbacks_ = cm_.subscription_factory_.callbacks_; + } + + NiceMock cm_; + Stats::IsolatedStoreImpl store_; + OdCdsApiPtr odcds_; + Config::SubscriptionCallbacks* odcds_callbacks_ = nullptr; + NiceMock validation_visitor_; +}; + +TEST_F(OdCdsApiImplTest, FirstUpdateStarts) { + InSequence s; + + EXPECT_CALL(*cm_.subscription_factory_.subscription_, start(ElementsAre("fake_cluster"))); + odcds_->updateOnDemand("fake_cluster"); +} + +TEST_F(OdCdsApiImplTest, FollowingClusterNamesHitAwaitingList) { + InSequence s; + + EXPECT_CALL(*cm_.subscription_factory_.subscription_, start(ElementsAre("fake_cluster"))); + EXPECT_CALL(*cm_.subscription_factory_.subscription_, requestOnDemandUpdate(_)).Times(0); + odcds_->updateOnDemand("fake_cluster"); + odcds_->updateOnDemand("another_cluster"); +} + +TEST_F(OdCdsApiImplTest, AwaitingListIsProcessedOnConfigUpdate) { + InSequence s; + + odcds_->updateOnDemand("fake_cluster"); + odcds_->updateOnDemand("another_cluster_1"); + odcds_->updateOnDemand("another_cluster_2"); + + envoy::config::cluster::v3::Cluster cluster; + cluster.set_name("fake_cluster"); + const auto decoded_resources = TestUtility::decodeResources({cluster}); + EXPECT_CALL( + *cm_.subscription_factory_.subscription_, + requestOnDemandUpdate(UnorderedElementsAre("another_cluster_1", "another_cluster_2"))); + odcds_callbacks_->onConfigUpdate(decoded_resources.refvec_, {}, "0"); +} + +TEST_F(OdCdsApiImplTest, AwaitingListIsProcessedOnConfigUpdateFailed) { + InSequence s; + + odcds_->updateOnDemand("fake_cluster"); + odcds_->updateOnDemand("another_cluster_1"); + odcds_->updateOnDemand("another_cluster_2"); + + EXPECT_CALL( + *cm_.subscription_factory_.subscription_, + requestOnDemandUpdate(UnorderedElementsAre("another_cluster_1", "another_cluster_2"))); + odcds_callbacks_->onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout, + nullptr); +} + +TEST_F(OdCdsApiImplTest, AwaitingListIsProcessedOnceOnly) { + InSequence s; + + odcds_->updateOnDemand("fake_cluster"); + odcds_->updateOnDemand("another_cluster_1"); + odcds_->updateOnDemand("another_cluster_2"); + + EXPECT_CALL( + *cm_.subscription_factory_.subscription_, + requestOnDemandUpdate(UnorderedElementsAre("another_cluster_1", "another_cluster_2"))); + odcds_callbacks_->onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout, + nullptr); + odcds_callbacks_->onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout, + nullptr); +} + +TEST_F(OdCdsApiImplTest, NothingIsRequestedOnEmptyAwaitingList) { + InSequence s; + + odcds_->updateOnDemand("fake_cluster"); + + EXPECT_CALL(*cm_.subscription_factory_.subscription_, requestOnDemandUpdate(_)).Times(0); + odcds_callbacks_->onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout, + nullptr); +} + +TEST_F(OdCdsApiImplTest, OnDemandUpdateIsRequestedAfterInitialFetch) { + InSequence s; + + odcds_->updateOnDemand("fake_cluster"); + envoy::config::cluster::v3::Cluster cluster; + cluster.set_name("fake_cluster"); + const auto decoded_resources = TestUtility::decodeResources({cluster}); + odcds_callbacks_->onConfigUpdate(decoded_resources.refvec_, {}, "0"); + EXPECT_CALL(*cm_.subscription_factory_.subscription_, + requestOnDemandUpdate(UnorderedElementsAre("another_cluster"))); + odcds_->updateOnDemand("another_cluster"); +} + +TEST_F(OdCdsApiImplTest, ValidateDuplicateClusters) { + InSequence s; + + envoy::config::cluster::v3::Cluster cluster_1; + cluster_1.set_name("duplicate_cluster"); + const auto decoded_resources = TestUtility::decodeResources({cluster_1, cluster_1}); + + EXPECT_THROW_WITH_MESSAGE(odcds_callbacks_->onConfigUpdate(decoded_resources.refvec_, {}, ""), + EnvoyException, + "Error adding/updating cluster(s) duplicate_cluster: duplicate cluster " + "duplicate_cluster found"); +} + +} // namespace +} // namespace Upstream +} // namespace Envoy diff --git a/test/common/upstream/test_cluster_manager.h b/test/common/upstream/test_cluster_manager.h index f57acc673889..b11344c24409 100644 --- a/test/common/upstream/test_cluster_manager.h +++ b/test/common/upstream/test_cluster_manager.h @@ -183,6 +183,10 @@ class TestClusterManagerImpl : public ClusterManagerImpl { } return clusters; } + + void notifyExpiredDiscovery(const std::string& name) { + ClusterManagerImpl::notifyExpiredDiscovery(name); + } }; // Override postThreadLocalClusterUpdate so we can test that merged updates calls diff --git a/test/extensions/filters/http/on_demand/BUILD b/test/extensions/filters/http/on_demand/BUILD index d9412a137039..ab34f115e503 100644 --- a/test/extensions/filters/http/on_demand/BUILD +++ b/test/extensions/filters/http/on_demand/BUILD @@ -20,7 +20,9 @@ envoy_extension_cc_test( "//source/common/protobuf:utility_lib", "//source/extensions/filters/http/on_demand:on_demand_update_lib", "//test/mocks/http:http_mocks", + "//test/mocks/router:router_mocks", "//test/mocks/runtime:runtime_mocks", + "//test/mocks/upstream:upstream_mocks", "//test/test_common:utility_lib", ], ) diff --git a/test/extensions/filters/http/on_demand/on_demand_filter_test.cc b/test/extensions/filters/http/on_demand/on_demand_filter_test.cc index d2978064b2d9..44307acf3677 100644 --- a/test/extensions/filters/http/on_demand/on_demand_filter_test.cc +++ b/test/extensions/filters/http/on_demand/on_demand_filter_test.cc @@ -5,7 +5,9 @@ #include "extensions/filters/http/on_demand/on_demand_update.h" #include "test/mocks/http/mocks.h" +#include "test/mocks/router/mocks.h" #include "test/mocks/runtime/mocks.h" +#include "test/mocks/upstream/mocks.h" #include "test/test_common/utility.h" #include "gmock/gmock.h" @@ -21,33 +23,66 @@ namespace OnDemand { class OnDemandFilterTest : public testing::Test { public: void SetUp() override { - filter_ = std::make_unique(); + auto config = std::make_shared(cm_, nullptr); + setupWithConfig(std::move(config)); + } + + void setupWithCDS() { + auto odcds = std::make_shared(); + auto config = std::make_shared(cm_, odcds); + setupWithConfig(std::move(config)); + } + + void setupWithConfig(OnDemandFilterConfigSharedPtr config) { + filter_ = std::make_unique(std::move(config)); filter_->setDecoderFilterCallbacks(decoder_callbacks_); } + NiceMock cm_; std::unique_ptr filter_; NiceMock decoder_callbacks_; }; -// tests decodeHeaders() when no cached route is available and vhds is configured -TEST_F(OnDemandFilterTest, TestDecodeHeaders) { +TEST_F(OnDemandFilterTest, TestDecodeHeadersWhenRouteAvailableButHasNoEntry) { Http::TestRequestHeaderMapImpl headers; - std::shared_ptr route_config_ptr{new NiceMock()}; - EXPECT_CALL(decoder_callbacks_, route()).WillOnce(Return(nullptr)); - EXPECT_CALL(decoder_callbacks_, requestRouteConfigUpdate(_)); + EXPECT_CALL(*decoder_callbacks_.route_, routeEntry()).WillOnce(Return(nullptr)); + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(headers, true)); +} + +TEST_F(OnDemandFilterTest, TestDecodeHeadersWhenRouteAvailableAndConfigIsNull) { + setupWithConfig(nullptr); + Http::TestRequestHeaderMapImpl headers; + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(headers, true)); +} + +TEST_F(OnDemandFilterTest, TestDecodeHeadersWhenRouteAvailableButODCDSIsDisabled) { + Http::TestRequestHeaderMapImpl headers; + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(headers, true)); +} + +TEST_F(OnDemandFilterTest, TestDecodeHeadersWhenRouteAvailableAndClusterIsAvailable) { + setupWithCDS(); + Http::TestRequestHeaderMapImpl headers; + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(headers, true)); +} + +TEST_F(OnDemandFilterTest, TestDecodeHeadersWhenRouteAvailableButClusterIsNotAvailable) { + setupWithCDS(); + Http::TestRequestHeaderMapImpl headers; + EXPECT_CALL(decoder_callbacks_, clusterInfo()).WillOnce(Return(nullptr)); + EXPECT_CALL(cm_, requestOnDemandClusterDiscovery(_, _, _)); EXPECT_EQ(Http::FilterHeadersStatus::StopIteration, filter_->decodeHeaders(headers, true)); } -// tests decodeHeaders() when no cached route is available -TEST_F(OnDemandFilterTest, TestDecodeHeadersWhenRouteAvailable) { +TEST_F(OnDemandFilterTest, TestDecodeHeadersWhenRouteAvailableButClusterNameIsEmpty) { + setupWithCDS(); Http::TestRequestHeaderMapImpl headers; + decoder_callbacks_.route_->route_entry_.cluster_name_ = ""; EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(headers, true)); } -// tests decodeHeaders() when no route configuration is available -TEST_F(OnDemandFilterTest, TestDecodeHeadersWhenRouteConfigIsNotAvailable) { +TEST_F(OnDemandFilterTest, TestDecodeHeadersWhenRouteIsNotAvailable) { Http::TestRequestHeaderMapImpl headers; - std::shared_ptr route_config_ptr{new NiceMock()}; EXPECT_CALL(decoder_callbacks_, route()).WillOnce(Return(nullptr)); EXPECT_CALL(decoder_callbacks_, requestRouteConfigUpdate(_)); EXPECT_EQ(Http::FilterHeadersStatus::StopIteration, filter_->decodeHeaders(headers, true)); @@ -102,6 +137,36 @@ TEST_F(OnDemandFilterTest, OnRouteConfigUpdateCompletionRestartsActiveStream) { filter_->onRouteConfigUpdateCompletion(true); } +TEST_F(OnDemandFilterTest, OnClusterDiscoveryCompletionClusterNotFound) { + EXPECT_CALL(decoder_callbacks_, clearRouteCache()).Times(0); + EXPECT_CALL(decoder_callbacks_, continueDecoding()); + filter_->onClusterDiscoveryCompletion(Upstream::ClusterDiscoveryStatus::Missing); +} + +TEST_F(OnDemandFilterTest, OnClusterDiscoveryCompletionClusterFound) { + EXPECT_CALL(decoder_callbacks_, continueDecoding()).Times(0); + EXPECT_CALL(decoder_callbacks_, clearRouteCache()); + EXPECT_CALL(decoder_callbacks_, decodingBuffer()).WillOnce(Return(nullptr)); + EXPECT_CALL(decoder_callbacks_, recreateStream(_)).WillOnce(Return(true)); + filter_->onClusterDiscoveryCompletion(Upstream::ClusterDiscoveryStatus::Available); +} + +TEST_F(OnDemandFilterTest, OnClusterDiscoveryCompletionClusterFoundRecreateStreamFailed) { + EXPECT_CALL(decoder_callbacks_, continueDecoding()); + EXPECT_CALL(decoder_callbacks_, clearRouteCache()).Times(0); + EXPECT_CALL(decoder_callbacks_, decodingBuffer()).WillOnce(Return(nullptr)); + EXPECT_CALL(decoder_callbacks_, recreateStream(_)).WillOnce(Return(false)); + filter_->onClusterDiscoveryCompletion(Upstream::ClusterDiscoveryStatus::Available); +} + +TEST_F(OnDemandFilterTest, OnClusterDiscoveryCompletionClusterFoundRedirectWithBody) { + Buffer::OwnedImpl buffer; + EXPECT_CALL(decoder_callbacks_, continueDecoding()); + EXPECT_CALL(decoder_callbacks_, clearRouteCache()).Times(0); + EXPECT_CALL(decoder_callbacks_, decodingBuffer()).WillOnce(Return(&buffer)); + filter_->onClusterDiscoveryCompletion(Upstream::ClusterDiscoveryStatus::Available); +} + } // namespace OnDemand } // namespace HttpFilters } // namespace Extensions diff --git a/test/integration/BUILD b/test/integration/BUILD index 40639457921e..63e0292c2ac4 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -220,6 +220,23 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "odcds_integration_test", + srcs = ["odcds_integration_test.cc"], + data = [ + "//test/config/integration/certs", + ], + deps = [ + ":fake_upstream_lib", + ":http_integration_lib", + "//source/common/common:macros", + "//test/common/grpc:grpc_client_integration_lib", + "//test/test_common:resources_lib", + "//test/test_common:utility_lib", + "@envoy_api//envoy/config/cluster/v3:pkg_cc_proto", + ], +) + envoy_cc_test( name = "drain_close_integration_test", srcs = [ diff --git a/test/integration/odcds_integration_test.cc b/test/integration/odcds_integration_test.cc new file mode 100644 index 000000000000..b4d7753129d4 --- /dev/null +++ b/test/integration/odcds_integration_test.cc @@ -0,0 +1,291 @@ +#include + +#include "envoy/common/platform.h" +#include "envoy/config/cluster/v3/cluster.pb.h" + +#include "common/common/fmt.h" +#include "common/common/macros.h" + +#include "test/common/grpc/grpc_client_integration.h" +#include "test/integration/fake_upstream.h" +#include "test/integration/http_integration.h" +#include "test/test_common/resources.h" +#include "test/test_common/utility.h" + +#include "gtest/gtest.h" + +namespace Envoy { +namespace { + +const std::string& config() { + CONSTRUCT_ON_FIRST_USE(std::string, fmt::format(R"EOF( +admin: + access_log: + - name: envoy.access_loggers.file + typed_config: + "@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog + path: "{}" + address: + socket_address: + address: 127.0.0.1 + port_value: 0 +static_resources: + clusters: + - name: xds_cluster + type: STATIC + typed_extension_protocol_options: + envoy.extensions.upstreams.http.v3.HttpProtocolOptions: + "@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions + explicit_http_config: + http2_protocol_options: {{}} + load_assignment: + cluster_name: xds_cluster + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 0 + listeners: + - name: http + address: + socket_address: + address: 127.0.0.1 + port_value: 0 + filter_chains: + - filters: + - name: http + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: config_test + http_filters: + - name: envoy.filters.http.on_demand + - name: envoy.filters.http.router + codec_type: HTTP2 + route_config: + name: local_route + virtual_hosts: + - name: local_service + domains: ["*"] + typed_per_filter_config: + envoy.filters.http.on_demand: + "@type": type.googleapis.com/envoy.extensions.filters.http.on_demand.v3.PerRouteConfig + odcds_config: + resource_api_version: V3 + api_config_source: + api_type: DELTA_GRPC + transport_api_version: V3 + grpc_services: + envoy_grpc: + cluster_name: xds_cluster + routes: + - match: {{ prefix: "/" }} + route: + cluster_header: "Pick-This-Cluster" +)EOF", + Platform::null_device_path)); +} + +class OdCdsIntegrationTestBase : public HttpIntegrationTest, + public Grpc::GrpcClientIntegrationParamTest { +public: + OdCdsIntegrationTestBase(const std::string& config) + : HttpIntegrationTest(Http::CodecClient::Type::HTTP2, ipVersion(), config) { + use_lds_ = false; + } + + void TearDown() override { + if (doCleanUpXdsConnection_) { + cleanUpXdsConnection(); + } + } + + void initialize() override { + // Controls how many addFakeUpstream() will happen in + // BaseIntegrationTest::createUpstreams() (which is part of initialize()). + // Make sure this number matches the size of the 'clusters' repeated field in the bootstrap + // config that you use! + setUpstreamCount(1); // The xDS cluster + setUpstreamProtocol(FakeHttpConnection::Type::HTTP2); // CDS uses gRPC uses HTTP2. + + // BaseIntegrationTest::initialize() does many things: + // 1) It appends to fake_upstreams_ as many as you asked for via setUpstreamCount(). + // 2) It updates your bootstrap config with the ports your fake upstreams are actually listening + // on (since you're supposed to leave them as 0). + // 3) It creates and starts an IntegrationTestServer - the thing that wraps the almost-actual + // Envoy used in the tests. + // 4) Bringing up the server usually entails waiting to ensure that any listeners specified in + // the bootstrap config have come up, and registering them in a port map (see lookupPort()). + HttpIntegrationTest::initialize(); + + addFakeUpstream(FakeHttpConnection::Type::HTTP2); + new_cluster_ = ConfigHelper::buildStaticCluster( + "new_cluster", fake_upstreams_[1]->localAddress()->ip()->port(), + Network::Test::getLoopbackAddressString(ipVersion())); + + test_server_->waitUntilListenersReady(); + registerTestServerPorts({"http"}); + } + + FakeStreamPtr odcds_stream_; + envoy::config::cluster::v3::Cluster new_cluster_; + bool doCleanUpXdsConnection_ = true; +}; + +class OdCdsIntegrationTest : public OdCdsIntegrationTestBase { +public: + OdCdsIntegrationTest() : OdCdsIntegrationTestBase(config()) {} +}; + +INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, OdCdsIntegrationTest, + GRPC_CLIENT_INTEGRATION_PARAMS); + +// tests a scenario when: +// - making a request to an unknown cluster +// - odcds initiates a connection with a request for the cluster +// - a response contains the cluster +// - request is resumed +TEST_P(OdCdsIntegrationTest, OnDemandClusterDiscoveryWorksWithClusterHeader) { + initialize(); + codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http"))); + Http::TestRequestHeaderMapImpl request_headers{{":method", "GET"}, + {":path", "/"}, + {":scheme", "http"}, + {":authority", "vhost.first"}, + {"Pick-This-Cluster", "new_cluster"}}; + IntegrationStreamDecoderPtr response = codec_client_->makeHeaderOnlyRequest(request_headers); + + auto result = // xds_connection_ is filled with the new FakeHttpConnection. + fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, xds_connection_); + RELEASE_ASSERT(result, result.message()); + result = xds_connection_->waitForNewStream(*dispatcher_, odcds_stream_); + RELEASE_ASSERT(result, result.message()); + odcds_stream_->startGrpcStream(); + + EXPECT_TRUE(compareDeltaDiscoveryRequest(Config::TypeUrl::get().Cluster, {"new_cluster"}, {}, + odcds_stream_)); + sendDeltaDiscoveryResponse( + Config::TypeUrl::get().Cluster, {new_cluster_}, {}, "1", odcds_stream_); + EXPECT_TRUE(compareDeltaDiscoveryRequest(Config::TypeUrl::get().Cluster, {}, {}, odcds_stream_)); + + waitForNextUpstreamRequest(1); + // Send response headers, and end_stream if there is no response body. + upstream_request_->encodeHeaders(default_response_headers_, true); + + response->waitForHeaders(); + EXPECT_EQ("200", response->headers().getStatusValue()); + + cleanupUpstreamAndDownstream(); +} + +const std::string& configODCDSVhostEnabledRouteDisabled() { + CONSTRUCT_ON_FIRST_USE(std::string, fmt::format(R"EOF( +admin: + access_log: + - name: envoy.access_loggers.file + typed_config: + "@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog + path: "{}" + address: + socket_address: + address: 127.0.0.1 + port_value: 0 +static_resources: + clusters: + - name: xds_cluster + type: STATIC + typed_extension_protocol_options: + envoy.extensions.upstreams.http.v3.HttpProtocolOptions: + "@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions + explicit_http_config: + http2_protocol_options: {{}} + load_assignment: + cluster_name: xds_cluster + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 0 + listeners: + - name: http + address: + socket_address: + address: 127.0.0.1 + port_value: 0 + filter_chains: + - filters: + - name: http + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: config_test + http_filters: + - name: envoy.filters.http.on_demand + - name: envoy.filters.http.router + codec_type: HTTP2 + route_config: + name: local_route + virtual_hosts: + - name: local_service + domains: ["*"] + typed_per_filter_config: + envoy.filters.http.on_demand: + "@type": type.googleapis.com/envoy.extensions.filters.http.on_demand.v3.PerRouteConfig + odcds_config: + resource_api_version: V3 + api_config_source: + api_type: DELTA_GRPC + transport_api_version: V3 + grpc_services: + envoy_grpc: + cluster_name: xds_cluster + routes: + - match: {{ prefix: "/" }} + route: + cluster_header: "Pick-This-Cluster" + typed_per_filter_config: + envoy.filters.http.on_demand: + "@type": type.googleapis.com/envoy.extensions.filters.http.on_demand.v3.PerRouteConfig +)EOF", + Platform::null_device_path)); +} + +class OdCdsIntegrationTestDisabled : public OdCdsIntegrationTestBase { +public: + OdCdsIntegrationTestDisabled() + : OdCdsIntegrationTestBase(configODCDSVhostEnabledRouteDisabled()) { + doCleanUpXdsConnection_ = false; + } +}; + +INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, OdCdsIntegrationTestDisabled, + GRPC_CLIENT_INTEGRATION_PARAMS); + +// tests a scenario when: +// - ODCDS is enabled at a virtual host level +// - ODCDS is disabled at a route level +// - making a request to an unknown cluster +// - request fails +TEST_P(OdCdsIntegrationTestDisabled, DisablingODCDSAtRouteLevelWorks) { + initialize(); + codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http"))); + Http::TestRequestHeaderMapImpl request_headers{{":method", "GET"}, + {":path", "/"}, + {":scheme", "http"}, + {":authority", "vhost.first"}, + {"Pick-This-Cluster", "new_cluster"}}; + IntegrationStreamDecoderPtr response = codec_client_->makeHeaderOnlyRequest(request_headers); + + EXPECT_FALSE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, xds_connection_, + std::chrono::milliseconds(1000))); + + response->waitForHeaders(); + EXPECT_EQ("503", response->headers().getStatusValue()); + + cleanupUpstreamAndDownstream(); +} + +} // namespace +} // namespace Envoy diff --git a/test/mocks/upstream/BUILD b/test/mocks/upstream/BUILD index 5ab7a4d10109..aec759b4a556 100644 --- a/test/mocks/upstream/BUILD +++ b/test/mocks/upstream/BUILD @@ -85,6 +85,7 @@ envoy_cc_mock( ":host_set_mocks", ":load_balancer_context_mock", ":load_balancer_mocks", + ":odcds_api_mocks", ":priority_set_mocks", ":retry_host_predicate_mocks", ":retry_priority_factory_mocks", @@ -279,6 +280,15 @@ envoy_cc_mock( ], ) +envoy_cc_mock( + name = "odcds_api_mocks", + srcs = ["odcds_api.cc"], + hdrs = ["odcds_api.h"], + deps = [ + "//include/envoy/upstream:cluster_manager_interface", + ], +) + envoy_cc_mock( name = "cluster_update_callbacks_mocks", srcs = ["cluster_update_callbacks.cc"], diff --git a/test/mocks/upstream/cluster_manager.h b/test/mocks/upstream/cluster_manager.h index ff5649caf517..dcaf232e4d35 100644 --- a/test/mocks/upstream/cluster_manager.h +++ b/test/mocks/upstream/cluster_manager.h @@ -68,6 +68,9 @@ class MockClusterManager : public ClusterManager { const ClusterTimeoutBudgetStatNames& clusterTimeoutBudgetStatNames() const override { return cluster_timeout_budget_stat_names_; } + MOCK_METHOD(ClusterDiscoveryCallbackHandlePtr, requestOnDemandClusterDiscovery, + (OdCdsApiSharedPtr odcds, const std::string& name, + ClusterDiscoveryCallbackWeakPtr callback)); NiceMock thread_local_cluster_; envoy::config::core::v3::BindConfig bind_config_; diff --git a/test/mocks/upstream/mocks.h b/test/mocks/upstream/mocks.h index 879280b0aef1..4a39119653f4 100644 --- a/test/mocks/upstream/mocks.h +++ b/test/mocks/upstream/mocks.h @@ -40,6 +40,7 @@ #include "test/mocks/upstream/host_set.h" #include "test/mocks/upstream/load_balancer.h" #include "test/mocks/upstream/load_balancer_context.h" +#include "test/mocks/upstream/odcds_api.h" #include "test/mocks/upstream/priority_set.h" #include "test/mocks/upstream/retry_host_predicate.h" #include "test/mocks/upstream/retry_priority.h" diff --git a/test/mocks/upstream/odcds_api.cc b/test/mocks/upstream/odcds_api.cc new file mode 100644 index 000000000000..41a1ff4de87a --- /dev/null +++ b/test/mocks/upstream/odcds_api.cc @@ -0,0 +1,13 @@ +#include "odcds_api.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Upstream { + +MockOdCdsApi::MockOdCdsApi() = default; +MockOdCdsApi::~MockOdCdsApi() = default; + +} // namespace Upstream +} // namespace Envoy diff --git a/test/mocks/upstream/odcds_api.h b/test/mocks/upstream/odcds_api.h new file mode 100644 index 000000000000..b6b2a414e9da --- /dev/null +++ b/test/mocks/upstream/odcds_api.h @@ -0,0 +1,30 @@ +#pragma once + +#include + +#include "envoy/upstream/cluster_manager.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Upstream { + +class MockOdCdsApi; +using MockOdCdsApiPtr = std::unique_ptr; +using MockOdCdsApiSharedPtr = std::shared_ptr; + +class MockOdCdsApi : public OdCdsApi { +public: + static MockOdCdsApiPtr create() { return std::make_unique(); } + + static MockOdCdsApiSharedPtr createShared() { return create(); } + + MockOdCdsApi(); + ~MockOdCdsApi() override; + + MOCK_METHOD(void, updateOnDemand, (const std::string& cluster_name)); +}; + +} // namespace Upstream +} // namespace Envoy diff --git a/tools/spelling/spelling_dictionary.txt b/tools/spelling/spelling_dictionary.txt index 52b0f16228c8..da6bde64b798 100644 --- a/tools/spelling/spelling_dictionary.txt +++ b/tools/spelling/spelling_dictionary.txt @@ -220,6 +220,8 @@ Nilsson Nonhashable Oauth OCSP +OD +ODCDS OID OK OOM