diff --git a/bazel/repositories.bzl b/bazel/repositories.bzl index 0e462af4a4c2..e0f154ebc174 100644 --- a/bazel/repositories.bzl +++ b/bazel/repositories.bzl @@ -120,6 +120,7 @@ def envoy_api_deps(skip_targets): "bootstrap", "discovery", "cds", + "discovery", "eds", "health_check", "lds", diff --git a/include/envoy/config/BUILD b/include/envoy/config/BUILD index da1056353367..a1e52966c99f 100644 --- a/include/envoy/config/BUILD +++ b/include/envoy/config/BUILD @@ -9,8 +9,8 @@ load( envoy_package() envoy_cc_library( - name = "ads_interface", - hdrs = ["ads.h"], + name = "grpc_mux_interface", + hdrs = ["grpc_mux.h"], deps = [ "//source/common/protobuf", ], diff --git a/include/envoy/config/ads.h b/include/envoy/config/ads.h deleted file mode 100644 index ca6c0cba0fe7..000000000000 --- a/include/envoy/config/ads.h +++ /dev/null @@ -1,65 +0,0 @@ -#pragma once - -#include "envoy/common/exception.h" -#include "envoy/common/pure.h" - -#include "common/protobuf/protobuf.h" - -namespace Envoy { -namespace Config { - -class AdsCallbacks { -public: - virtual ~AdsCallbacks() {} - - /** - * Called when a configuration update is received. - * @param resources vector of fetched resources corresponding to the configuration update. - * @throw EnvoyException with reason if the configuration is rejected. Otherwise the configuration - * is accepted. Accepted configurations have their version_info reflected in subsequent - * requests. - */ - virtual void onConfigUpdate(const Protobuf::RepeatedPtrField& resources) PURE; - - /** - * Called when either the subscription is unable to fetch a config update or when onConfigUpdate - * invokes an exception. - * @param e supplies any exception data on why the fetch failed. May be nullptr. - */ - virtual void onConfigUpdateFailed(const EnvoyException* e) PURE; -}; - -/** - * Handle on an ADS subscription. The subscription is canceled on destruction. - */ -class AdsWatch { -public: - virtual ~AdsWatch() {} -}; - -typedef std::unique_ptr AdsWatchPtr; - -/** - * Aggregated Discovery Service interface. - */ -class AdsApi { -public: - virtual ~AdsApi() {} - - /** - * Start a configuration subscription asynchronously for some API type and resources. - * @param type_url type URL corresponding to xDS API, e.g. - * type.googleapis.com/envoy.api.v2.Cluster. - * @param resources vector of resource names to fetch. - * @param callbacks the callbacks to be notified of configuration updates. These must be valid - * until AdsWatch::cancel() is invoked. - * @return AdsWatchPtr a handle to cancel the subscription with. E.g. when a cluster goes away, - * its EDS updates should be cancelled. Ownership of the AdsWatch belongs to the AdsApi object. - */ - virtual AdsWatchPtr subscribe(const std::string& type_url, - const std::vector& resources, - AdsCallbacks& calllbacks) PURE; -}; - -} // namespace Config -} // namespace Envoy diff --git a/include/envoy/config/grpc_mux.h b/include/envoy/config/grpc_mux.h new file mode 100644 index 000000000000..4b0c27c0d375 --- /dev/null +++ b/include/envoy/config/grpc_mux.h @@ -0,0 +1,76 @@ +#pragma once + +#include "envoy/common/exception.h" +#include "envoy/common/pure.h" + +#include "common/protobuf/protobuf.h" + +namespace Envoy { +namespace Config { + +class GrpcMuxCallbacks { +public: + virtual ~GrpcMuxCallbacks() {} + + /** + * Called when a configuration update is received. + * @param resources vector of fetched resources corresponding to the configuration update. + * @param version_info update version. + * @throw EnvoyException with reason if the configuration is rejected. Otherwise the configuration + * is accepted. Accepted configurations have their version_info reflected in subsequent + * requests. + */ + virtual void onConfigUpdate(const Protobuf::RepeatedPtrField& resources, + const std::string& version_info) PURE; + + /** + * Called when either the subscription is unable to fetch a config update or when onConfigUpdate + * invokes an exception. + * @param e supplies any exception data on why the fetch failed. May be nullptr. + */ + virtual void onConfigUpdateFailed(const EnvoyException* e) PURE; +}; + +/** + * Handle on an muxed gRPC subscription. The subscription is canceled on destruction. + */ +class GrpcMuxWatch { +public: + virtual ~GrpcMuxWatch() {} +}; + +typedef std::unique_ptr GrpcMuxWatchPtr; + +/** + * Manage one or more gRPC subscriptions on a single stream to management server. This can be used + * for a single xDS API, e.g. EDS, or to combined multiple xDS APIs for ADS. + */ +class GrpcMux { +public: + virtual ~GrpcMux() {} + + /** + * Initiate stream with management server. + */ + virtual void start() PURE; + + /** + * Start a configuration subscription asynchronously for some API type and resources. + * @param type_url type URL corresponding to xDS API, e.g. + * type.googleapis.com/envoy.api.v2.Cluster. + * @param resources vector of resource names to watch for. If this is empty, then all + * resources for type_url will result in callbacks. + * @param callbacks the callbacks to be notified of configuration updates. These must be valid + * until GrpcMuxWatch is destroyed. + * @return GrpcMuxWatchPtr a handle to cancel the subscription with. E.g. when a cluster goes + * away, its EDS updates should be cancelled by destroying the GrpcMuxWatchPtr. + */ + virtual GrpcMuxWatchPtr subscribe(const std::string& type_url, + const std::vector& resources, + GrpcMuxCallbacks& callbacks) PURE; +}; + +typedef std::unique_ptr GrpcMuxPtr; + +} // namespace Config +} // namespace Envoy diff --git a/include/envoy/upstream/BUILD b/include/envoy/upstream/BUILD index 36f6865817c5..82ee80d3919b 100644 --- a/include/envoy/upstream/BUILD +++ b/include/envoy/upstream/BUILD @@ -20,7 +20,7 @@ envoy_cc_library( ":thread_local_cluster_interface", ":upstream_interface", "//include/envoy/access_log:access_log_interface", - "//include/envoy/config:ads_interface", + "//include/envoy/config:grpc_mux_interface", "//include/envoy/http:async_client_interface", "//include/envoy/http:conn_pool_interface", "//include/envoy/local_info:local_info_interface", diff --git a/include/envoy/upstream/cluster_manager.h b/include/envoy/upstream/cluster_manager.h index fb48040e0782..6db3be950d64 100644 --- a/include/envoy/upstream/cluster_manager.h +++ b/include/envoy/upstream/cluster_manager.h @@ -7,7 +7,7 @@ #include #include "envoy/access_log/access_log.h" -#include "envoy/config/ads.h" +#include "envoy/config/grpc_mux.h" #include "envoy/http/async_client.h" #include "envoy/http/conn_pool.h" #include "envoy/local_info/local_info.h" @@ -123,9 +123,9 @@ class ClusterManager { * the management of clusters but instead is required early in ClusterManager/server * initialization and in various sites that need ClusterManager for xDS API interfacing. * - * @return AdsApi& ADS API provider referencee. + * @return GrpcMux& ADS API provider referencee. */ - virtual Config::AdsApi& adsProvider() PURE; + virtual Config::GrpcMux& adsMux() PURE; }; typedef std::unique_ptr ClusterManagerPtr; diff --git a/source/common/config/BUILD b/source/common/config/BUILD index feac39d0f8fc..4f3895d25897 100644 --- a/source/common/config/BUILD +++ b/source/common/config/BUILD @@ -20,31 +20,6 @@ envoy_cc_library( ], ) -envoy_cc_library( - name = "ads_api_lib", - hdrs = ["ads_api_impl.h"], - external_deps = ["envoy_discovery"], - deps = [ - "//include/envoy/config:ads_interface", - "//include/envoy/config:subscription_interface", - "//include/envoy/upstream:cluster_manager_interface", - ], -) - -envoy_cc_library( - name = "ads_subscription_lib", - hdrs = ["ads_subscription_impl.h"], - external_deps = ["envoy_discovery"], - deps = [ - "//include/envoy/config:ads_interface", - "//include/envoy/config:subscription_interface", - "//source/common/common:assert_lib", - "//source/common/common:logger_lib", - "//source/common/grpc:common_lib", - "//source/common/protobuf", - ], -) - envoy_cc_library( name = "bootstrap_json_lib", srcs = ["bootstrap_json.cc"], @@ -129,18 +104,47 @@ envoy_cc_library( ) envoy_cc_library( - name = "grpc_subscription_lib", - hdrs = ["grpc_subscription_impl.h"], - external_deps = ["envoy_base"], + name = "grpc_mux_lib", + srcs = ["grpc_mux_impl.cc"], + hdrs = ["grpc_mux_impl.h"], + external_deps = ["envoy_discovery"], deps = [ + ":utility_lib", + "//include/envoy/config:grpc_mux_interface", "//include/envoy/config:subscription_interface", + "//include/envoy/upstream:cluster_manager_interface", "//source/common/common:logger_lib", - "//source/common/config:utility_lib", "//source/common/grpc:async_client_lib", "//source/common/protobuf", ], ) +envoy_cc_library( + name = "grpc_mux_subscription_lib", + hdrs = ["grpc_mux_subscription_impl.h"], + external_deps = ["envoy_discovery"], + deps = [ + "//include/envoy/config:grpc_mux_interface", + "//include/envoy/config:subscription_interface", + "//source/common/common:assert_lib", + "//source/common/common:logger_lib", + "//source/common/grpc:common_lib", + "//source/common/protobuf", + ], +) + +envoy_cc_library( + name = "grpc_subscription_lib", + hdrs = ["grpc_subscription_impl.h"], + external_deps = ["envoy_base"], + deps = [ + ":grpc_mux_lib", + ":grpc_mux_subscription_lib", + "//include/envoy/config:subscription_interface", + "//include/envoy/event:dispatcher_interface", + ], +) + envoy_cc_library( name = "http_subscription_lib", hdrs = ["http_subscription_impl.h"], @@ -209,6 +213,12 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "resources_lib", + hdrs = ["resources.h"], + deps = ["//source/common/common:singleton"], +) + envoy_cc_library( name = "rds_json_lib", srcs = ["rds_json.cc"], @@ -230,8 +240,8 @@ envoy_cc_library( hdrs = ["subscription_factory.h"], external_deps = ["envoy_base"], deps = [ - ":ads_subscription_lib", ":filesystem_subscription_lib", + ":grpc_mux_subscription_lib", ":grpc_subscription_lib", ":http_subscription_lib", ":utility_lib", @@ -258,10 +268,16 @@ envoy_cc_library( hdrs = ["utility.h"], external_deps = [ "envoy_base", + "envoy_cds", + "envoy_eds", + "envoy_lds", + "envoy_rds", "envoy_filter_http_connection_manager", ], deps = [ ":json_utility_lib", + ":resources_lib", + "//include/envoy/config:grpc_mux_interface", "//include/envoy/config:subscription_interface", "//include/envoy/local_info:local_info_interface", "//include/envoy/registry", @@ -270,6 +286,7 @@ envoy_cc_library( "//source/common/common:hash_lib", "//source/common/common:hex_lib", "//source/common/common:singleton", + "//source/common/grpc:common_lib", "//source/common/json:config_schemas_lib", "//source/common/protobuf", "//source/common/protobuf:utility_lib", diff --git a/source/common/config/ads_api_impl.h b/source/common/config/ads_api_impl.h deleted file mode 100644 index 1c2aacfc8cb7..000000000000 --- a/source/common/config/ads_api_impl.h +++ /dev/null @@ -1,37 +0,0 @@ -#pragma once - -#include "envoy/config/ads.h" -#include "envoy/config/subscription.h" -#include "envoy/upstream/cluster_manager.h" - -#include "common/common/logger.h" - -#include "api/discovery.pb.h" - -namespace Envoy { -namespace Config { - -/** - * ADS API implementation that fetches via gRPC. - * TODO(htuch): Implement ADS. This should look similar to GrpcSubscriptionImpl, except it manages - * multiple in-flight DiscoveryRequests, one per type URL. - */ -class AdsApiImpl : public AdsApi, Logger::Loggable { -public: - AdsApiImpl(const envoy::api::v2::ApiConfigSource& ads_config, - Upstream::ClusterManager& cluster_manager) { - UNREFERENCED_PARAMETER(ads_config); - UNREFERENCED_PARAMETER(cluster_manager); - } - - AdsWatchPtr subscribe(const std::string& type_url, const std::vector& resources, - AdsCallbacks& callbacks) override { - UNREFERENCED_PARAMETER(type_url); - UNREFERENCED_PARAMETER(resources); - UNREFERENCED_PARAMETER(callbacks); - return nullptr; - } -}; - -} // namespace Config -} // namespace Envoy diff --git a/source/common/config/grpc_mux_impl.cc b/source/common/config/grpc_mux_impl.cc new file mode 100644 index 000000000000..0451419f4299 --- /dev/null +++ b/source/common/config/grpc_mux_impl.cc @@ -0,0 +1,181 @@ +#include "common/config/grpc_mux_impl.h" + +#include "common/config/utility.h" +#include "common/protobuf/protobuf.h" + +namespace Envoy { +namespace Config { + +GrpcMuxImpl::GrpcMuxImpl( + const envoy::api::v2::Node& node, + std::unique_ptr< + Grpc::AsyncClient> + async_client, + Event::Dispatcher& dispatcher, const Protobuf::MethodDescriptor& service_method) + : node_(node), async_client_(std::move(async_client)), service_method_(service_method) { + retry_timer_ = dispatcher.createTimer([this]() -> void { establishNewStream(); }); +} + +GrpcMuxImpl::GrpcMuxImpl(const envoy::api::v2::Node& node, + Upstream::ClusterManager& cluster_manager, + const std::string& remote_cluster_name, Event::Dispatcher& dispatcher, + const Protobuf::MethodDescriptor& service_method) + : GrpcMuxImpl(node, + std::unique_ptr>( + new Grpc::AsyncClientImpl( + cluster_manager, remote_cluster_name)), + dispatcher, service_method) {} + +GrpcMuxImpl::~GrpcMuxImpl() { + for (auto watches : watches_) { + for (auto watch : watches.second) { + watch->inserted_ = false; + } + } +} + +void GrpcMuxImpl::start() { establishNewStream(); } + +void GrpcMuxImpl::setRetryTimer() { + retry_timer_->enableTimer(std::chrono::milliseconds(RETRY_DELAY_MS)); +} + +void GrpcMuxImpl::establishNewStream() { + ENVOY_LOG(debug, "Establishing new gRPC bidi stream for {}", service_method_.DebugString()); + stream_ = async_client_->start(service_method_, *this); + if (stream_ == nullptr) { + ENVOY_LOG(warn, "Unable to establish new stream"); + handleFailure(); + return; + } + + for (const auto type_url : subscriptions_) { + sendDiscoveryRequest(type_url); + } +} + +void GrpcMuxImpl::sendDiscoveryRequest(const std::string& type_url) { + if (stream_ == nullptr) { + return; + } + + auto& request = requests_[type_url]; + request.mutable_resource_names()->Clear(); + + // Maintain a set to avoid dupes. + std::unordered_set resources; + for (const auto* watch : watches_[type_url]) { + for (const std::string& resource : watch->resources_) { + if (resources.count(resource) == 0) { + resources.emplace(resource); + request.add_resource_names(resource); + } + } + } + + ENVOY_LOG(trace, "Sending DiscoveryRequest for {}: {}", type_url, request.DebugString()); + stream_->sendMessage(request, false); +} + +void GrpcMuxImpl::handleFailure() { + for (auto watches : watches_) { + for (auto watch : watches.second) { + watch->callbacks_.onConfigUpdateFailed(nullptr); + } + } + setRetryTimer(); +} + +GrpcMuxWatchPtr GrpcMuxImpl::subscribe(const std::string& type_url, + const std::vector& resources, + GrpcMuxCallbacks& callbacks) { + auto watch = + std::unique_ptr(new GrpcMuxWatchImpl(resources, callbacks, type_url, *this)); + ENVOY_LOG(debug, "gRPC mux subscribe for " + type_url); + + // Lazily kick off the requests based on first subscription. This has the + // convenient side-effect that we order messages on the channel based on + // Envoy's internal dependency ordering. + if (requests_.count(type_url) == 0) { + requests_[type_url].set_type_url(type_url); + requests_[type_url].mutable_node()->MergeFrom(node_); + subscriptions_.emplace_back(type_url); + } + + // This will send an updated request on each subscription. + // TODO(htuch): For RDS/EDS, this will generate a new DiscoveryRequest on each resource we added. + // Consider in the future adding some kind of collation/batching during CDS/LDS updates so that we + // only send a single RDS/EDS update after the CDS/LDS update. + sendDiscoveryRequest(type_url); + + return watch; +} + +void GrpcMuxImpl::onCreateInitialMetadata(Http::HeaderMap& metadata) { + UNREFERENCED_PARAMETER(metadata); +} + +void GrpcMuxImpl::onReceiveInitialMetadata(Http::HeaderMapPtr&& metadata) { + UNREFERENCED_PARAMETER(metadata); +} + +void GrpcMuxImpl::onReceiveMessage(std::unique_ptr&& message) { + const std::string& type_url = message->type_url(); + ENVOY_LOG(debug, "Received gRPC message for {} at version {}", type_url, message->version_info()); + if (requests_.count(type_url) == 0) { + ENVOY_LOG(warn, "Ignoring unknown type URL {}", type_url); + return; + } + try { + // To avoid O(n^2) explosion (e.g. when we have 1000s of EDS watches), we + // build a map here from resource name to resource and then walk watches_. + // We have to walk all watches (and need an efficient map as a result) to + // ensure we deliver empty config updates when a resource is dropped. + std::unordered_map resources; + for (const auto& resource : message->resources()) { + if (type_url != resource.type_url()) { + throw EnvoyException(fmt::format("{} does not match {} type URL is DiscoveryResponse {}", + resource.type_url(), type_url, message->DebugString())); + } + const std::string resource_name = Utility::resourceName(resource); + resources.emplace(resource_name, resource); + } + for (auto watch : watches_[type_url]) { + if (watch->resources_.empty()) { + watch->callbacks_.onConfigUpdate(message->resources(), message->version_info()); + continue; + } + Protobuf::RepeatedPtrField found_resources; + for (auto watched_resource_name : watch->resources_) { + auto it = resources.find(watched_resource_name); + if (it != resources.end()) { + found_resources.Add()->MergeFrom(it->second); + } + } + watch->callbacks_.onConfigUpdate(found_resources, message->version_info()); + } + requests_[type_url].set_version_info(message->version_info()); + } catch (const EnvoyException& e) { + ENVOY_LOG(warn, "gRPC config for {} update rejected: {}", message->type_url(), e.what()); + for (auto watch : watches_[type_url]) { + watch->callbacks_.onConfigUpdateFailed(&e); + } + } + requests_[type_url].set_response_nonce(message->nonce()); + sendDiscoveryRequest(type_url); +} + +void GrpcMuxImpl::onReceiveTrailingMetadata(Http::HeaderMapPtr&& metadata) { + UNREFERENCED_PARAMETER(metadata); +} + +void GrpcMuxImpl::onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) { + ENVOY_LOG(warn, "gRPC config stream closed: {}, {}", status, message); + stream_ = nullptr; + handleFailure(); +} + +} // namespace Config +} // namespace Envoy diff --git a/source/common/config/grpc_mux_impl.h b/source/common/config/grpc_mux_impl.h new file mode 100644 index 000000000000..5628684d7887 --- /dev/null +++ b/source/common/config/grpc_mux_impl.h @@ -0,0 +1,97 @@ +#pragma once + +#include "envoy/config/grpc_mux.h" +#include "envoy/config/subscription.h" +#include "envoy/event/dispatcher.h" +#include "envoy/upstream/cluster_manager.h" + +#include "common/common/logger.h" +#include "common/grpc/async_client_impl.h" + +#include "api/discovery.pb.h" + +namespace Envoy { +namespace Config { + +/** + * ADS API implementation that fetches via gRPC. + */ +class GrpcMuxImpl : public GrpcMux, + Grpc::AsyncStreamCallbacks, + Logger::Loggable { +public: + GrpcMuxImpl(const envoy::api::v2::Node& node, Upstream::ClusterManager& cluster_manager, + const std::string& remote_cluster_name, Event::Dispatcher& dispatcher, + const Protobuf::MethodDescriptor& service_method); + GrpcMuxImpl(const envoy::api::v2::Node& node, + std::unique_ptr> + async_client, + Event::Dispatcher& dispatcher, const Protobuf::MethodDescriptor& service_method); + ~GrpcMuxImpl(); + + void start() override; + GrpcMuxWatchPtr subscribe(const std::string& type_url, const std::vector& resources, + GrpcMuxCallbacks& callbacks) override; + + // Grpc::AsyncStreamCallbacks + void onCreateInitialMetadata(Http::HeaderMap& metadata) override; + void onReceiveInitialMetadata(Http::HeaderMapPtr&& metadata) override; + void onReceiveMessage(std::unique_ptr&& message) override; + void onReceiveTrailingMetadata(Http::HeaderMapPtr&& metadata) override; + void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override; + + // TODO(htuch): Make this configurable or some static. + const uint32_t RETRY_DELAY_MS = 5000; + +private: + void setRetryTimer(); + void establishNewStream(); + void sendDiscoveryRequest(const std::string& type_url); + void handleFailure(); + + struct GrpcMuxWatchImpl : public GrpcMuxWatch { + GrpcMuxWatchImpl(const std::vector& resources, GrpcMuxCallbacks& callbacks, + const std::string& type_url, GrpcMuxImpl& parent) + : resources_(resources), callbacks_(callbacks), type_url_(type_url), parent_(parent), + inserted_(true) { + entry_ = parent.watches_[type_url].emplace(parent.watches_[type_url].begin(), this); + } + ~GrpcMuxWatchImpl() override { + if (inserted_) { + parent_.watches_[type_url_].erase(entry_); + parent_.sendDiscoveryRequest(type_url_); + } + } + std::vector resources_; + GrpcMuxCallbacks& callbacks_; + const std::string type_url_; + GrpcMuxImpl& parent_; + std::list::iterator entry_; + bool inserted_; + }; + + envoy::api::v2::Node node_; + std::unique_ptr< + Grpc::AsyncClient> + async_client_; + Grpc::AsyncStream* stream_{}; + const Protobuf::MethodDescriptor& service_method_; + std::unordered_map> watches_; + std::unordered_map requests_; + // Envoy's dependendency ordering. + std::list subscriptions_; + Event::TimerPtr retry_timer_; +}; + +class NullGrpcMuxImpl : public GrpcMux { +public: + void start() {} + GrpcMuxWatchPtr subscribe(const std::string&, const std::vector&, + GrpcMuxCallbacks&) { + throw EnvoyException("ADS must be configured to support an ADS config source"); + } +}; + +} // namespace Config +} // namespace Envoy diff --git a/source/common/config/ads_subscription_impl.h b/source/common/config/grpc_mux_subscription_impl.h similarity index 53% rename from source/common/config/ads_subscription_impl.h rename to source/common/config/grpc_mux_subscription_impl.h index 4c97d6f8bd26..32322631a97f 100644 --- a/source/common/config/ads_subscription_impl.h +++ b/source/common/config/grpc_mux_subscription_impl.h @@ -1,12 +1,13 @@ #pragma once -#include "envoy/config/ads.h" +#include "envoy/config/grpc_mux.h" #include "envoy/config/subscription.h" #include "common/common/assert.h" #include "common/common/logger.h" #include "common/grpc/common.h" #include "common/protobuf/protobuf.h" +#include "common/protobuf/utility.h" #include "api/discovery.pb.h" @@ -14,22 +15,22 @@ namespace Envoy { namespace Config { /** - * Adapter from typed Subscription to untyped AdsApi. Also handles per-xDS API stats/logging. + * Adapter from typed Subscription to untyped GrpcMux. Also handles per-xDS API stats/logging. */ template -class AdsSubscriptionImpl : public Subscription, - AdsCallbacks, - Logger::Loggable { +class GrpcMuxSubscriptionImpl : public Subscription, + GrpcMuxCallbacks, + Logger::Loggable { public: - AdsSubscriptionImpl(AdsApi& ads_api, SubscriptionStats stats) - : ads_api_(ads_api), stats_(stats), + GrpcMuxSubscriptionImpl(GrpcMux& grpc_mux, SubscriptionStats stats) + : grpc_mux_(grpc_mux), stats_(stats), type_url_(Grpc::Common::typeUrl(ResourceType().GetDescriptor()->full_name())) {} // Config::Subscription void start(const std::vector& resources, SubscriptionCallbacks& callbacks) override { callbacks_ = &callbacks; - watch_ = ads_api_.subscribe(type_url_, resources, *this); + watch_ = grpc_mux_.subscribe(type_url_, resources, *this); // The attempt stat here is maintained for the purposes of having consistency between ADS and // gRPC/filesystem/REST Subscriptions. Since ADS is push based and muxed, the notion of an // "attempt" for a given xDS API combined by ADS is not really that meaningful. @@ -37,41 +38,49 @@ class AdsSubscriptionImpl : public Subscription, } void updateResources(const std::vector& resources) override { - watch_ = ads_api_.subscribe(type_url_, resources, *this); + watch_ = grpc_mux_.subscribe(type_url_, resources, *this); stats_.update_attempt_.inc(); } - const std::string versionInfo() const override { NOT_IMPLEMENTED; } + const std::string versionInfo() const override { return version_info_; } - // Config::AdsCallbacks - void onConfigUpdate(const Protobuf::RepeatedPtrField& resources) override { + // Config::GrpcMuxCallbacks + void onConfigUpdate(const Protobuf::RepeatedPtrField& resources, + const std::string& version_info) override { Protobuf::RepeatedPtrField typed_resources; std::transform(resources.cbegin(), resources.cend(), Protobuf::RepeatedPtrFieldBackInserter(&typed_resources), - [](const ProtobufWkt::Any& resource) { - ResourceType typed_resource; - resource.UnpackTo(&typed_resource); - return typed_resource; - }); + MessageUtil::anyConvert); callbacks_->onConfigUpdate(typed_resources); stats_.update_success_.inc(); stats_.update_attempt_.inc(); - ENVOY_LOG(debug, "ADS config for {} accepted", type_url_); + version_info_ = version_info; + ENVOY_LOG(debug, "gRPC config for {} accepted with {} resources", type_url_, resources.size()); + for (const auto resource : typed_resources) { + ENVOY_LOG(debug, "- {}", resource.DebugString()); + } } void onConfigUpdateFailed(const EnvoyException* e) override { - stats_.update_rejected_.inc(); + // TODO(htuch): Less fragile signal that this is failure vs. reject. + if (e == nullptr) { + stats_.update_failure_.inc(); + ENVOY_LOG(warn, "gRPC update for {} failed", type_url_); + } else { + stats_.update_rejected_.inc(); + ENVOY_LOG(warn, "gRPC config for {} rejected: {}", type_url_, e->what()); + } stats_.update_attempt_.inc(); - ENVOY_LOG(warn, "ADS config for {} rejected: {}", type_url_, e->what()); callbacks_->onConfigUpdateFailed(e); } private: - AdsApi& ads_api_; + GrpcMux& grpc_mux_; SubscriptionStats stats_; const std::string type_url_; SubscriptionCallbacks* callbacks_{}; - AdsWatchPtr watch_{}; + GrpcMuxWatchPtr watch_{}; + std::string version_info_; }; } // namespace Config diff --git a/source/common/config/grpc_subscription_impl.h b/source/common/config/grpc_subscription_impl.h index 4c7ab3b89da0..e20125793025 100644 --- a/source/common/config/grpc_subscription_impl.h +++ b/source/common/config/grpc_subscription_impl.h @@ -3,10 +3,8 @@ #include "envoy/config/subscription.h" #include "envoy/event/dispatcher.h" -#include "common/common/logger.h" -#include "common/config/utility.h" -#include "common/grpc/async_client_impl.h" -#include "common/protobuf/protobuf.h" +#include "common/config/grpc_mux_impl.h" +#include "common/config/grpc_mux_subscription_impl.h" #include "api/base.pb.h" @@ -14,9 +12,7 @@ namespace Envoy { namespace Config { template -class GrpcSubscriptionImpl : public Config::Subscription, - Grpc::AsyncStreamCallbacks, - Logger::Loggable { +class GrpcSubscriptionImpl : public Config::Subscription { public: GrpcSubscriptionImpl(const envoy::api::v2::Node& node, Upstream::ClusterManager& cm, const std::string& remote_cluster_name, Event::Dispatcher& dispatcher, @@ -36,114 +32,28 @@ class GrpcSubscriptionImpl : public Config::Subscription, async_client, Event::Dispatcher& dispatcher, const Protobuf::MethodDescriptor& service_method, SubscriptionStats stats) - : async_client_(std::move(async_client)), service_method_(service_method), - retry_timer_(dispatcher.createTimer([this]() -> void { establishNewStream(); })), - stats_(stats) { - request_.mutable_node()->CopyFrom(node); - } - - void setRetryTimer() { retry_timer_->enableTimer(std::chrono::milliseconds(RETRY_DELAY_MS)); } - - void establishNewStream() { - ENVOY_LOG(debug, "Establishing new gRPC bidi stream for {}", service_method_.DebugString()); - stats_.update_attempt_.inc(); - stream_ = async_client_->start(service_method_, *this); - if (stream_ == nullptr) { - ENVOY_LOG(warn, "Unable to establish new stream"); - handleFailure(); - return; - } - sendDiscoveryRequest(); - } - - void sendDiscoveryRequest() { - if (stream_ == nullptr) { - ENVOY_LOG(trace, "Unable to sendDiscoveryRequest() on null stream"); - return; - } - ENVOY_LOG(trace, "sendDiscoveryRequest: {}", request_.DebugString()); - stream_->sendMessage(request_, false); - } + : grpc_mux_(node, std::move(async_client), dispatcher, service_method), + grpc_mux_subscription_(grpc_mux_, stats) {} // Config::Subscription void start(const std::vector& resources, Config::SubscriptionCallbacks& callbacks) override { - ASSERT(callbacks_ == nullptr); - Protobuf::RepeatedPtrField resources_vector(resources.begin(), - resources.end()); - request_.mutable_resource_names()->Swap(&resources_vector); - callbacks_ = &callbacks; - establishNewStream(); + // Subscribe first, so we get failure callbacks if grpc_mux_.start() fails. + grpc_mux_subscription_.start(resources, callbacks); + grpc_mux_.start(); } void updateResources(const std::vector& resources) override { - Protobuf::RepeatedPtrField resources_vector(resources.begin(), - resources.end()); - request_.mutable_resource_names()->Swap(&resources_vector); - sendDiscoveryRequest(); - stats_.update_attempt_.inc(); - } - - const std::string versionInfo() const override { return request_.version_info(); } - - // Grpc::AsyncStreamCallbacks - void onCreateInitialMetadata(Http::HeaderMap& metadata) override { - UNREFERENCED_PARAMETER(metadata); + grpc_mux_subscription_.updateResources(resources); } - void onReceiveInitialMetadata(Http::HeaderMapPtr&& metadata) override { - UNREFERENCED_PARAMETER(metadata); - } + const std::string versionInfo() const override { return grpc_mux_subscription_.versionInfo(); } - void onReceiveMessage(std::unique_ptr&& message) override { - const auto typed_resources = Config::Utility::getTypedResources(*message); - try { - callbacks_->onConfigUpdate(typed_resources); - request_.set_version_info(message->version_info()); - stats_.update_success_.inc(); - ENVOY_LOG(debug, "gRPC config update accepted: {}", message->DebugString()); - } catch (const EnvoyException& e) { - ENVOY_LOG(warn, "gRPC config update rejected: {}", e.what()); - stats_.update_rejected_.inc(); - callbacks_->onConfigUpdateFailed(&e); - } - // This effectively ACK/NACKs the accepted configuration. - ENVOY_LOG(debug, "Sending version update: {}", message->version_info()); - stats_.update_attempt_.inc(); - request_.set_response_nonce(message->nonce()); - sendDiscoveryRequest(); - } - - void onReceiveTrailingMetadata(Http::HeaderMapPtr&& metadata) override { - UNREFERENCED_PARAMETER(metadata); - } - - void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override { - ENVOY_LOG(warn, "gRPC config stream closed: {}, {}", status, message); - handleFailure(); - stream_ = nullptr; - } - - // TODO(htuch): Make this configurable or some static. - const uint32_t RETRY_DELAY_MS = 5000; + GrpcMuxImpl& grpcMux() { return grpc_mux_; } private: - void handleFailure() { - stats_.update_failure_.inc(); - callbacks_->onConfigUpdateFailed(nullptr); - setRetryTimer(); - } - - std::unique_ptr< - Grpc::AsyncClient> - async_client_; - const Protobuf::MethodDescriptor& service_method_; - Event::TimerPtr retry_timer_; - Protobuf::RepeatedPtrField resources_; - Config::SubscriptionCallbacks* callbacks_{}; - Grpc::AsyncStream* stream_{}; - envoy::api::v2::DiscoveryRequest request_; - SubscriptionStats stats_; + GrpcMuxImpl grpc_mux_; + GrpcMuxSubscriptionImpl grpc_mux_subscription_; }; } // namespace Config diff --git a/source/common/config/resources.h b/source/common/config/resources.h new file mode 100644 index 000000000000..ae8fff8d1715 --- /dev/null +++ b/source/common/config/resources.h @@ -0,0 +1,24 @@ +#pragma once + +#include + +#include "common/common/singleton.h" + +namespace Envoy { +namespace Config { + +/** + * Constant Type URLs. + */ +class TypeUrlValues { +public: + const std::string Listener{"type.googleapis.com/envoy.api.v2.Listener"}; + const std::string Cluster{"type.googleapis.com/envoy.api.v2.Cluster"}; + const std::string ClusterLoadAssignment{"type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"}; + const std::string RouteConfiguration{"type.googleapis.com/envoy.api.v2.RouteConfiguration"}; +}; + +typedef ConstSingleton TypeUrl; + +} // namespace Config +} // namespace Envoy diff --git a/source/common/config/subscription_factory.h b/source/common/config/subscription_factory.h index 3ba6686571ec..97ba4ea10abb 100644 --- a/source/common/config/subscription_factory.h +++ b/source/common/config/subscription_factory.h @@ -4,8 +4,8 @@ #include "envoy/config/subscription.h" -#include "common/config/ads_subscription_impl.h" #include "common/config/filesystem_subscription_impl.h" +#include "common/config/grpc_mux_subscription_impl.h" #include "common/config/grpc_subscription_impl.h" #include "common/config/http_subscription_impl.h" #include "common/config/utility.h" @@ -75,7 +75,7 @@ class SubscriptionFactory { break; } case envoy::api::v2::ConfigSource::kAds: { - result.reset(new AdsSubscriptionImpl(cm.adsProvider(), stats)); + result.reset(new GrpcMuxSubscriptionImpl(cm.adsMux(), stats)); break; } default: diff --git a/source/common/config/utility.cc b/source/common/config/utility.cc index ce91b4b8f380..fcd1497a3a29 100644 --- a/source/common/config/utility.cc +++ b/source/common/config/utility.cc @@ -4,8 +4,10 @@ #include "common/common/hex.h" #include "common/common/utility.h" #include "common/config/json_utility.h" +#include "common/config/resources.h" #include "common/json/config_schemas.h" #include "common/protobuf/protobuf.h" +#include "common/protobuf/utility.h" #include "fmt/format.h" @@ -100,5 +102,22 @@ void Utility::translateLdsConfig(const Json::Object& json_lds, *lds_config.mutable_api_config_source()); } +std::string Utility::resourceName(const ProtobufWkt::Any& resource) { + if (resource.type_url() == Config::TypeUrl::get().Listener) { + return MessageUtil::anyConvert(resource).name(); + } + if (resource.type_url() == Config::TypeUrl::get().RouteConfiguration) { + return MessageUtil::anyConvert(resource).name(); + } + if (resource.type_url() == Config::TypeUrl::get().Cluster) { + return MessageUtil::anyConvert(resource).name(); + } + if (resource.type_url() == Config::TypeUrl::get().ClusterLoadAssignment) { + return MessageUtil::anyConvert(resource).cluster_name(); + } + throw EnvoyException( + fmt::format("Unknown type URL {} in DiscoveryResponse", resource.type_url())); +} + } // namespace Config } // namespace Envoy diff --git a/source/common/config/utility.h b/source/common/config/utility.h index fb3f40a84fb0..ab66a1d447e7 100644 --- a/source/common/config/utility.h +++ b/source/common/config/utility.h @@ -1,19 +1,26 @@ #pragma once +#include "envoy/config/grpc_mux.h" #include "envoy/config/subscription.h" #include "envoy/json/json_object.h" #include "envoy/local_info/local_info.h" #include "envoy/registry/registry.h" #include "envoy/upstream/cluster_manager.h" +#include "common/common/assert.h" #include "common/common/hash.h" #include "common/common/hex.h" #include "common/common/singleton.h" +#include "common/grpc/common.h" #include "common/protobuf/protobuf.h" #include "common/protobuf/utility.h" #include "api/base.pb.h" +#include "api/cds.pb.h" +#include "api/eds.pb.h" #include "api/filter/http_connection_manager.pb.h" +#include "api/lds.pb.h" +#include "api/rds.pb.h" namespace Envoy { namespace Config { @@ -44,7 +51,7 @@ class Utility { static Protobuf::RepeatedPtrField getTypedResources(const envoy::api::v2::DiscoveryResponse& response) { Protobuf::RepeatedPtrField typed_resources; - for (auto& resource : response.resources()) { + for (const auto& resource : response.resources()) { auto* typed_resource = typed_resources.Add(); resource.UnpackTo(typed_resource); } @@ -190,6 +197,16 @@ class Utility { return config; } + + /** + * Obtain the "name" of a v2 API resource in a google.protobuf.Any, e.g. the route config name for + * a Routeconfiguration, based on the underlying resource type. + * TODO(htuch): This is kind of a hack. If we had a better support for resource names as first + * class in the API, this would not be necessary. + * @param resource google.protobuf.Any v2 API resource. + * @return std::string resource name. + */ + static std::string resourceName(const ProtobufWkt::Any& resource); }; } // namespace Config diff --git a/source/common/grpc/codec.h b/source/common/grpc/codec.h index 6089d5798c45..eaaac0a3d1d1 100644 --- a/source/common/grpc/codec.h +++ b/source/common/grpc/codec.h @@ -45,6 +45,11 @@ class Decoder { // @return bool whether the decoding succeeded or not. bool decode(Buffer::Instance& input, std::vector& output); + // Determine the length of the current frame being decoded. This is useful when supplying a + // partial frame to decode() and wanting to know how many more bytes need to be read to complete + // the frame. + uint32_t length() const { return frame_.length_; } + private: // Wire format (http://www.grpc.io/docs/guides/wire.html) of GRPC data frame // header: diff --git a/source/common/protobuf/utility.cc b/source/common/protobuf/utility.cc index 13e4eac68fb7..19cc51a38742 100644 --- a/source/common/protobuf/utility.cc +++ b/source/common/protobuf/utility.cc @@ -21,6 +21,11 @@ void MessageUtil::loadFromJson(const std::string& json, Protobuf::Message& messa } } +void MessageUtil::loadFromYaml(const std::string& yaml, Protobuf::Message& message) { + const std::string json = Json::Factory::loadFromYamlString(yaml)->asJsonString(); + loadFromJson(json, message); +} + void MessageUtil::loadFromFile(const std::string& path, Protobuf::Message& message) { const std::string contents = Filesystem::fileReadToEnd(path); // If the filename ends with .pb, attempt to parse it as a binary proto. @@ -41,8 +46,7 @@ void MessageUtil::loadFromFile(const std::string& path, Protobuf::Message& messa message.GetTypeName() + ")"); } if (StringUtil::endsWith(path, ".yaml")) { - const std::string json = Json::Factory::loadFromYamlString(contents)->asJsonString(); - loadFromJson(json, message); + loadFromYaml(contents, message); } else { loadFromJson(contents, message); } diff --git a/source/common/protobuf/utility.h b/source/common/protobuf/utility.h index 31d5f9491d80..f49305a44c5d 100644 --- a/source/common/protobuf/utility.h +++ b/source/common/protobuf/utility.h @@ -66,8 +66,23 @@ class MessageUtil { } static void loadFromJson(const std::string& json, Protobuf::Message& message); + static void loadFromYaml(const std::string& yaml, Protobuf::Message& message); static void loadFromFile(const std::string& path, Protobuf::Message& message); + /** + * Convert from google.protobuf.Any to a typed message. + * @param message source google.protobuf.Any message. + * @return MessageType the typed message inside the Any. + */ + template + static inline MessageType anyConvert(const ProtobufWkt::Any& message) { + MessageType typed_message; + if (!message.UnpackTo(&typed_message)) { + throw EnvoyException("Unable to unpack " + message.DebugString()); + } + return typed_message; + }; + /** * Convert between two protobufs via a JSON round-trip. This is used to translate arbitrary * messages to/from google.protobuf.Struct. diff --git a/source/common/router/rds_subscription.cc b/source/common/router/rds_subscription.cc index 33f83ec97e72..313b90d982c9 100644 --- a/source/common/router/rds_subscription.cc +++ b/source/common/router/rds_subscription.cc @@ -16,9 +16,10 @@ RdsSubscription::RdsSubscription(Envoy::Config::SubscriptionStats stats, Upstream::ClusterManager& cm, Event::Dispatcher& dispatcher, Runtime::RandomGenerator& random, const LocalInfo::LocalInfo& local_info) - : RestApiFetcher( - cm, rds.config_source().api_config_source().cluster_name()[0], dispatcher, random, - Config::Utility::apiConfigSourceRefreshDelay(rds.config_source().api_config_source())), + : RestApiFetcher(cm, rds.config_source().api_config_source().cluster_name()[0], dispatcher, + random, + Envoy::Config::Utility::apiConfigSourceRefreshDelay( + rds.config_source().api_config_source())), local_info_(local_info), stats_(stats) { const auto& api_config_source = rds.config_source().api_config_source(); UNREFERENCED_PARAMETER(api_config_source); diff --git a/source/common/upstream/BUILD b/source/common/upstream/BUILD index ec38706a50bc..b8c2445bead3 100644 --- a/source/common/upstream/BUILD +++ b/source/common/upstream/BUILD @@ -64,8 +64,8 @@ envoy_cc_library( "//include/envoy/upstream:cluster_manager_interface", "//source/common/common:enum_to_int", "//source/common/common:utility_lib", - "//source/common/config:ads_api_lib", "//source/common/config:cds_json_lib", + "//source/common/config:grpc_mux_lib", "//source/common/config:utility_lib", "//source/common/http:async_client_lib", "//source/common/http/http1:conn_pool_lib", @@ -210,7 +210,7 @@ envoy_cc_library( deps = [ ":sds_subscription_lib", ":upstream_includes", - "//include/envoy/config:ads_interface", + "//include/envoy/config:grpc_mux_interface", "//include/envoy/config:subscription_interface", "//include/envoy/local_info:local_info_interface", "//source/common/config:metadata_lib", diff --git a/source/common/upstream/cds_api_impl.cc b/source/common/upstream/cds_api_impl.cc index ce121f237686..3c5682e65106 100644 --- a/source/common/upstream/cds_api_impl.cc +++ b/source/common/upstream/cds_api_impl.cc @@ -48,8 +48,9 @@ void CdsApiImpl::onConfigUpdate(const ResourceVector& resources) { } for (auto cluster : clusters_to_remove) { - if (cm_.removePrimaryCluster(cluster.first)) { - ENVOY_LOG(info, "cds: remove cluster '{}'", cluster.first); + const std::string cluster_name = cluster.first; + if (cm_.removePrimaryCluster(cluster_name)) { + ENVOY_LOG(info, "cds: remove cluster '{}'", cluster_name); } } diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 9d1f8dcea062..01763da49171 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -161,10 +161,26 @@ ClusterManagerImpl::ClusterManagerImpl(const envoy::api::v2::Bootstrap& bootstra ThreadLocal::SlotAllocator& tls, Runtime::Loader& runtime, Runtime::RandomGenerator& random, const LocalInfo::LocalInfo& local_info, - AccessLog::AccessLogManager& log_manager) + AccessLog::AccessLogManager& log_manager, + Event::Dispatcher& primary_dispatcher) : factory_(factory), runtime_(runtime), stats_(stats), tls_(tls.allocateSlot()), - random_(random), local_info_(local_info), cm_stats_(generateStats(stats)), - ads_api_(bootstrap.dynamic_resources().ads_config(), *this) { + random_(random), local_info_(local_info), cm_stats_(generateStats(stats)) { + const auto& ads_config = bootstrap.dynamic_resources().ads_config(); + if (ads_config.cluster_name().empty()) { + ENVOY_LOG(debug, "No ADS clusters defined, ADS will not be initialized."); + ads_mux_.reset(new Config::NullGrpcMuxImpl()); + } else { + if (ads_config.cluster_name().size() != 1) { + // TODO(htuch): Add support for multiple clusters, #1170. + throw EnvoyException( + "envoy::api::v2::ApiConfigSource must have a singleton cluster name specified"); + } + ads_mux_.reset(new Config::GrpcMuxImpl( + bootstrap.node(), *this, ads_config.cluster_name()[0], primary_dispatcher, + *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( + "envoy.api.v2.AggregatedDiscoveryService.StreamAggregatedResources"))); + } + const auto& cm_config = bootstrap.cluster_manager(); if (cm_config.has_outlier_detection()) { const std::string event_log_file_path = cm_config.outlier_detection().event_log_path(); @@ -220,6 +236,7 @@ ClusterManagerImpl::ClusterManagerImpl(const envoy::api::v2::Bootstrap& bootstra } init_helper_.onStaticLoadComplete(); + ads_mux_->start(); } ClusterManagerStats ClusterManagerImpl::generateStats(Stats::Scope& scope) { @@ -585,7 +602,7 @@ ClusterManagerPtr ProdClusterManagerFactory::clusterManagerFromProto( Runtime::Loader& runtime, Runtime::RandomGenerator& random, const LocalInfo::LocalInfo& local_info, AccessLog::AccessLogManager& log_manager) { return ClusterManagerPtr{new ClusterManagerImpl(bootstrap, *this, stats, tls, runtime, random, - local_info, log_manager)}; + local_info, log_manager, primary_dispatcher_)}; } Http::ConnectionPool::InstancePtr diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 7e43be271efa..d033d0dfa3f7 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -16,7 +16,7 @@ #include "envoy/thread_local/thread_local.h" #include "envoy/upstream/cluster_manager.h" -#include "common/config/ads_api_impl.h" +#include "common/config/grpc_mux_impl.h" #include "common/http/async_client_impl.h" #include "common/upstream/upstream_impl.h" @@ -36,8 +36,8 @@ class ProdClusterManagerFactory : public ClusterManagerFactory { Ssl::ContextManager& ssl_context_manager, Event::Dispatcher& primary_dispatcher, const LocalInfo::LocalInfo& local_info) - : runtime_(runtime), stats_(stats), tls_(tls), random_(random), dns_resolver_(dns_resolver), - ssl_context_manager_(ssl_context_manager), primary_dispatcher_(primary_dispatcher), + : primary_dispatcher_(primary_dispatcher), runtime_(runtime), stats_(stats), tls_(tls), + random_(random), dns_resolver_(dns_resolver), ssl_context_manager_(ssl_context_manager), local_info_(local_info) {} // Upstream::ClusterManagerFactory @@ -57,6 +57,9 @@ class ProdClusterManagerFactory : public ClusterManagerFactory { const Optional& eds_config, ClusterManager& cm) override; +protected: + Event::Dispatcher& primary_dispatcher_; + private: Runtime::Loader& runtime_; Stats::Store& stats_; @@ -64,7 +67,6 @@ class ProdClusterManagerFactory : public ClusterManagerFactory { Runtime::RandomGenerator& random_; Network::DnsResolverSharedPtr dns_resolver_; Ssl::ContextManager& ssl_context_manager_; - Event::Dispatcher& primary_dispatcher_; const LocalInfo::LocalInfo& local_info_; }; @@ -126,7 +128,8 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable(drain_time_completed_.count()) > + return static_cast(drain_time_completed_.load()) > (server_.random().random() % server_.options().drainTime().count()); } void DrainManagerImpl::drainSequenceTick() { - ENVOY_LOG(trace, "drain tick #{}", drain_time_completed_.count()); - ASSERT(drain_time_completed_ < server_.options().drainTime()); - drain_time_completed_ += std::chrono::seconds(1); + ENVOY_LOG(trace, "drain tick #{}", drain_time_completed_.load()); + ASSERT(drain_time_completed_.load() < server_.options().drainTime().count()); + ++drain_time_completed_; - if (drain_time_completed_ < server_.options().drainTime()) { + if (drain_time_completed_.load() < server_.options().drainTime().count()) { drain_tick_timer_->enableTimer(std::chrono::milliseconds(1000)); } else if (drain_sequence_completion_) { drain_sequence_completion_(); diff --git a/source/server/drain_manager_impl.h b/source/server/drain_manager_impl.h index 1b574e179e8d..da51606955a9 100644 --- a/source/server/drain_manager_impl.h +++ b/source/server/drain_manager_impl.h @@ -32,7 +32,7 @@ class DrainManagerImpl : Logger::Loggable, public DrainManager Instance& server_; Event::TimerPtr drain_tick_timer_; - std::chrono::seconds drain_time_completed_{}; + std::atomic drain_time_completed_{}; Event::TimerPtr parent_shutdown_timer_; std::function drain_sequence_completion_; }; diff --git a/test/common/config/BUILD b/test/common/config/BUILD index 41b3d1ba447c..264bb918a71f 100644 --- a/test/common/config/BUILD +++ b/test/common/config/BUILD @@ -9,18 +9,6 @@ load( envoy_package() -envoy_cc_test_library( - name = "ads_subscription_test_harness", - srcs = ["ads_subscription_test_harness.h"], - external_deps = ["envoy_eds"], - deps = [ - ":subscription_test_harness", - "//source/common/config:ads_subscription_lib", - "//test/mocks/config:config_mocks", - "//test/test_common:utility_lib", - ], -) - envoy_cc_test( name = "filesystem_subscription_impl_test", srcs = ["filesystem_subscription_impl_test.cc"], @@ -44,6 +32,24 @@ envoy_cc_test_library( ], ) +envoy_cc_test( + name = "grpc_mux_impl_test", + srcs = ["grpc_mux_impl_test.cc"], + external_deps = [ + "envoy_discovery", + "envoy_eds", + ], + deps = [ + "//source/common/config:grpc_mux_lib", + "//source/common/config:resources_lib", + "//source/common/protobuf", + "//test/mocks/config:config_mocks", + "//test/mocks/event:event_mocks", + "//test/mocks/grpc:grpc_mocks", + "//test/test_common:utility_lib", + ], +) + envoy_cc_test( name = "grpc_subscription_impl_test", srcs = ["grpc_subscription_impl_test.cc"], @@ -60,6 +66,7 @@ envoy_cc_test_library( ":subscription_test_harness", "//source/common/common:hash_lib", "//source/common/config:grpc_subscription_lib", + "//source/common/config:resources_lib", "//test/mocks/config:config_mocks", "//test/mocks/event:event_mocks", "//test/mocks/grpc:grpc_mocks", @@ -115,7 +122,6 @@ envoy_cc_test( name = "subscription_impl_test", srcs = ["subscription_impl_test.cc"], deps = [ - ":ads_subscription_test_harness", ":filesystem_subscription_test_harness", ":grpc_subscription_test_harness", ":http_subscription_test_harness", diff --git a/test/common/config/ads_subscription_test_harness.h b/test/common/config/ads_subscription_test_harness.h deleted file mode 100644 index a37b12a7720b..000000000000 --- a/test/common/config/ads_subscription_test_harness.h +++ /dev/null @@ -1,88 +0,0 @@ -#pragma once - -#include "common/config/ads_subscription_impl.h" - -#include "test/common/config/subscription_test_harness.h" -#include "test/mocks/config/mocks.h" -#include "test/test_common/utility.h" - -#include "api/eds.pb.h" -#include "gmock/gmock.h" -#include "gtest/gtest.h" - -using testing::ElementsAreArray; -using testing::Return; -using testing::_; - -namespace Envoy { -namespace Config { - -typedef AdsSubscriptionImpl AdsEdsSubscriptionImpl; - -class AdsSubscriptionTestHarness : public SubscriptionTestHarness { -public: - AdsSubscriptionTestHarness() { - subscription_.reset(new AdsEdsSubscriptionImpl(ads_api_, stats_)); - } - - ~AdsSubscriptionTestHarness() { EXPECT_CALL(*ads_watch_, cancel()); } - - AdsWatch* resetAdsWatch() { - ads_watch_ = new Config::MockAdsWatch(); - return ads_watch_; - } - - void expectSendMessage(const std::vector& cluster_names, - const std::string& version) override { - UNREFERENCED_PARAMETER(cluster_names); - UNREFERENCED_PARAMETER(version); - } - - void startSubscription(const std::vector& cluster_names) override { - EXPECT_CALL(ads_api_, subscribe_("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment", - ElementsAreArray(cluster_names), _)) - .WillOnce(Return(resetAdsWatch())); - subscription_->start(cluster_names, callbacks_); - } - - void deliverConfigUpdate(const std::vector cluster_names, const std::string& version, - bool accept) override { - Protobuf::RepeatedPtrField typed_resources; - Protobuf::RepeatedPtrField resources; - for (const auto& cluster : cluster_names) { - envoy::api::v2::ClusterLoadAssignment* load_assignment = typed_resources.Add(); - load_assignment->set_cluster_name(cluster); - resources.Add()->PackFrom(*load_assignment); - } - - EXPECT_CALL(callbacks_, onConfigUpdate(RepeatedProtoEq(typed_resources))) - .WillOnce(ThrowOnRejectedConfig(accept)); - if (accept) { - subscription_->onConfigUpdate(resources); - } else { - EXPECT_THROW_WITH_MESSAGE(subscription_->onConfigUpdate(resources), EnvoyException, - "bad config"); - EnvoyException bad_config("bad config"); - EXPECT_CALL(callbacks_, onConfigUpdateFailed(&bad_config)); - subscription_->onConfigUpdateFailed(&bad_config); - } - - UNREFERENCED_PARAMETER(version); - } - - void updateResources(const std::vector& cluster_names) override { - EXPECT_CALL(*ads_watch_, cancel()); - EXPECT_CALL(ads_api_, subscribe_("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment", - ElementsAreArray(cluster_names), _)) - .WillOnce(Return(resetAdsWatch())); - subscription_->updateResources(cluster_names); - } - - Config::MockAdsApi ads_api_; - Config::MockAdsWatch* ads_watch_; - std::unique_ptr subscription_; - Config::MockSubscriptionCallbacks callbacks_; -}; - -} // namespace Config -} // namespace Envoy diff --git a/test/common/config/grpc_mux_impl_test.cc b/test/common/config/grpc_mux_impl_test.cc new file mode 100644 index 000000000000..1bd315be6b5e --- /dev/null +++ b/test/common/config/grpc_mux_impl_test.cc @@ -0,0 +1,265 @@ +#include "common/config/grpc_mux_impl.h" +#include "common/config/resources.h" +#include "common/protobuf/protobuf.h" + +#include "test/mocks/config/mocks.h" +#include "test/mocks/event/mocks.h" +#include "test/mocks/grpc/mocks.h" +#include "test/test_common/utility.h" + +#include "api/discovery.pb.h" +#include "api/eds.pb.h" +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::InSequence; +using testing::Invoke; +using testing::IsSubstring; +using testing::NiceMock; +using testing::Return; +using testing::_; + +namespace Envoy { +namespace Config { +namespace { + +typedef Grpc::MockAsyncClient + SubscriptionMockAsyncClient; + +// We test some mux specific stuff below, other unit test coverage for singleton use of GrpcMuxImpl +// is provided in [grpc_]subscription_impl_test.cc. +class GrpcMuxImplTest : public testing::Test { +public: + GrpcMuxImplTest() + : async_client_(new SubscriptionMockAsyncClient()), timer_(new Event::MockTimer()) { + EXPECT_CALL(dispatcher_, createTimer_(_)).WillOnce(Invoke([this](Event::TimerCb timer_cb) { + timer_cb_ = timer_cb; + return timer_; + })); + grpc_mux_.reset( + new GrpcMuxImpl(envoy::api::v2::Node(), + std::unique_ptr(async_client_), dispatcher_, + *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( + "envoy.api.v2.AggregatedDiscoveryService.StreamAggregatedResources"))); + } + + void expectSendMessage(const std::string& type_url, + const std::vector& resource_names, + const std::string& version) { + envoy::api::v2::DiscoveryRequest expected_request; + expected_request.mutable_node()->CopyFrom(node_); + for (const auto& resource : resource_names) { + expected_request.add_resource_names(resource); + } + if (!version.empty()) { + expected_request.set_version_info(version); + } + expected_request.set_response_nonce(""); + expected_request.set_type_url(type_url); + EXPECT_CALL(async_stream_, sendMessage(ProtoEq(expected_request), false)); + } + + envoy::api::v2::Node node_; + NiceMock dispatcher_; + SubscriptionMockAsyncClient* async_client_; + Event::MockTimer* timer_; + Event::TimerCb timer_cb_; + Grpc::MockAsyncStream async_stream_; + std::unique_ptr grpc_mux_; + MockGrpcMuxCallbacks callbacks_; +}; + +// Validate behavior when multiple type URL watches are maintained, watches are created/destroyed +// (via RAII). +TEST_F(GrpcMuxImplTest, MultipleTypeUrlStreams) { + InSequence s; + auto foo_sub = grpc_mux_->subscribe("foo", {"x", "y"}, callbacks_); + auto bar_sub = grpc_mux_->subscribe("bar", {}, callbacks_); + EXPECT_CALL(*async_client_, start(_, _)).WillOnce(Return(&async_stream_)); + expectSendMessage("foo", {"x", "y"}, ""); + expectSendMessage("bar", {}, ""); + grpc_mux_->start(); + expectSendMessage("bar", {"z"}, ""); + auto bar_z_sub = grpc_mux_->subscribe("bar", {"z"}, callbacks_); + expectSendMessage("bar", {"zz", "z"}, ""); + auto bar_zz_sub = grpc_mux_->subscribe("bar", {"zz"}, callbacks_); + expectSendMessage("bar", {"z"}, ""); + expectSendMessage("bar", {}, ""); + expectSendMessage("bar", {}, ""); + expectSendMessage("foo", {}, ""); +} + +// Validate behavior when multiple type URL watches are maintained and the stream is reset. +TEST_F(GrpcMuxImplTest, ResetStream) { + InSequence s; + auto foo_sub = grpc_mux_->subscribe("foo", {"x", "y"}, callbacks_); + auto bar_sub = grpc_mux_->subscribe("bar", {}, callbacks_); + auto baz_sub = grpc_mux_->subscribe("baz", {"z"}, callbacks_); + EXPECT_CALL(*async_client_, start(_, _)).WillOnce(Return(&async_stream_)); + expectSendMessage("foo", {"x", "y"}, ""); + expectSendMessage("bar", {}, ""); + expectSendMessage("baz", {"z"}, ""); + grpc_mux_->start(); + + EXPECT_CALL(callbacks_, onConfigUpdateFailed(_)).Times(3); + EXPECT_CALL(*timer_, enableTimer(_)); + grpc_mux_->onRemoteClose(Grpc::Status::GrpcStatus::Canceled, ""); + EXPECT_CALL(*async_client_, start(_, _)).WillOnce(Return(&async_stream_)); + expectSendMessage("foo", {"x", "y"}, ""); + expectSendMessage("bar", {}, ""); + expectSendMessage("baz", {"z"}, ""); + timer_cb_(); + + expectSendMessage("baz", {}, ""); + expectSendMessage("bar", {}, ""); + expectSendMessage("foo", {}, ""); +} + +// Validate behavior when type URL mismatches occur. +TEST_F(GrpcMuxImplTest, TypeUrlMismatch) { + InSequence s; + auto foo_sub = grpc_mux_->subscribe("foo", {"x", "y"}, callbacks_); + EXPECT_CALL(*async_client_, start(_, _)).WillOnce(Return(&async_stream_)); + expectSendMessage("foo", {"x", "y"}, ""); + grpc_mux_->start(); + + { + std::unique_ptr response( + new envoy::api::v2::DiscoveryResponse()); + response->set_type_url("bar"); + grpc_mux_->onReceiveMessage(std::move(response)); + } + + { + std::unique_ptr response( + new envoy::api::v2::DiscoveryResponse()); + response->set_type_url("foo"); + response->mutable_resources()->Add()->set_type_url("bar"); + EXPECT_CALL(callbacks_, onConfigUpdateFailed(_)).WillOnce(Invoke([](const EnvoyException* e) { + EXPECT_TRUE( + IsSubstring("", "", "bar does not match foo type URL is DiscoveryResponse", e->what())); + })); + expectSendMessage("foo", {"x", "y"}, ""); + grpc_mux_->onReceiveMessage(std::move(response)); + } + + expectSendMessage("foo", {}, ""); +} + +// Validate behavior when watches has an unknown resource name. +TEST_F(GrpcMuxImplTest, WildcardWatch) { + InSequence s; + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + auto foo_sub = grpc_mux_->subscribe(type_url, {}, callbacks_); + EXPECT_CALL(*async_client_, start(_, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {}, ""); + grpc_mux_->start(); + + { + std::unique_ptr response( + new envoy::api::v2::DiscoveryResponse()); + response->set_type_url(type_url); + response->set_version_info("1"); + envoy::api::v2::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("x"); + response->add_resources()->PackFrom(load_assignment); + EXPECT_CALL(callbacks_, onConfigUpdate(_, "1")) + .WillOnce( + Invoke([&load_assignment](const Protobuf::RepeatedPtrField& resources, + const std::string&) { + EXPECT_EQ(1, resources.size()); + envoy::api::v2::ClusterLoadAssignment expected_assignment; + resources[0].UnpackTo(&expected_assignment); + EXPECT_TRUE(TestUtility::protoEqual(expected_assignment, load_assignment)); + })); + expectSendMessage(type_url, {}, "1"); + grpc_mux_->onReceiveMessage(std::move(response)); + } + + expectSendMessage(type_url, {}, "1"); +} + +// Validate behavior when watches specify resources (potentially overlapping). +TEST_F(GrpcMuxImplTest, WatchDemux) { + InSequence s; + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + MockGrpcMuxCallbacks foo_callbacks; + auto foo_sub = grpc_mux_->subscribe(type_url, {"x", "y"}, foo_callbacks); + MockGrpcMuxCallbacks bar_callbacks; + auto bar_sub = grpc_mux_->subscribe(type_url, {"y", "z"}, bar_callbacks); + EXPECT_CALL(*async_client_, start(_, _)).WillOnce(Return(&async_stream_)); + // Should dedupe the "x" resource. + expectSendMessage(type_url, {"y", "z", "x"}, ""); + grpc_mux_->start(); + + { + std::unique_ptr response( + new envoy::api::v2::DiscoveryResponse()); + response->set_type_url(type_url); + response->set_version_info("1"); + envoy::api::v2::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("x"); + response->add_resources()->PackFrom(load_assignment); + EXPECT_CALL(bar_callbacks, onConfigUpdate(_, "1")) + .WillOnce(Invoke([](const Protobuf::RepeatedPtrField& resources, + const std::string&) { EXPECT_TRUE(resources.empty()); })); + EXPECT_CALL(foo_callbacks, onConfigUpdate(_, "1")) + .WillOnce( + Invoke([&load_assignment](const Protobuf::RepeatedPtrField& resources, + const std::string&) { + EXPECT_EQ(1, resources.size()); + envoy::api::v2::ClusterLoadAssignment expected_assignment; + resources[0].UnpackTo(&expected_assignment); + EXPECT_TRUE(TestUtility::protoEqual(expected_assignment, load_assignment)); + })); + expectSendMessage(type_url, {"y", "z", "x"}, "1"); + grpc_mux_->onReceiveMessage(std::move(response)); + } + + { + std::unique_ptr response( + new envoy::api::v2::DiscoveryResponse()); + response->set_type_url(type_url); + response->set_version_info("2"); + envoy::api::v2::ClusterLoadAssignment load_assignment_x; + load_assignment_x.set_cluster_name("x"); + response->add_resources()->PackFrom(load_assignment_x); + envoy::api::v2::ClusterLoadAssignment load_assignment_y; + load_assignment_y.set_cluster_name("y"); + response->add_resources()->PackFrom(load_assignment_y); + envoy::api::v2::ClusterLoadAssignment load_assignment_z; + load_assignment_z.set_cluster_name("z"); + response->add_resources()->PackFrom(load_assignment_z); + EXPECT_CALL(bar_callbacks, onConfigUpdate(_, "2")) + .WillOnce(Invoke( + [&load_assignment_y, &load_assignment_z]( + const Protobuf::RepeatedPtrField& resources, const std::string&) { + EXPECT_EQ(2, resources.size()); + envoy::api::v2::ClusterLoadAssignment expected_assignment; + resources[0].UnpackTo(&expected_assignment); + EXPECT_TRUE(TestUtility::protoEqual(expected_assignment, load_assignment_y)); + resources[1].UnpackTo(&expected_assignment); + EXPECT_TRUE(TestUtility::protoEqual(expected_assignment, load_assignment_z)); + })); + EXPECT_CALL(foo_callbacks, onConfigUpdate(_, "2")) + .WillOnce(Invoke( + [&load_assignment_x, &load_assignment_y]( + const Protobuf::RepeatedPtrField& resources, const std::string&) { + EXPECT_EQ(2, resources.size()); + envoy::api::v2::ClusterLoadAssignment expected_assignment; + resources[0].UnpackTo(&expected_assignment); + EXPECT_TRUE(TestUtility::protoEqual(expected_assignment, load_assignment_x)); + resources[1].UnpackTo(&expected_assignment); + EXPECT_TRUE(TestUtility::protoEqual(expected_assignment, load_assignment_y)); + })); + expectSendMessage(type_url, {"y", "z", "x"}, "2"); + grpc_mux_->onReceiveMessage(std::move(response)); + } + + expectSendMessage(type_url, {"x", "y"}, "2"); + expectSendMessage(type_url, {}, "2"); +} + +} // namespace +} // namespace Config +} // namespace Envoy diff --git a/test/common/config/grpc_subscription_impl_test.cc b/test/common/config/grpc_subscription_impl_test.cc index 61f0c2d2e906..f61ebdd470eb 100644 --- a/test/common/config/grpc_subscription_impl_test.cc +++ b/test/common/config/grpc_subscription_impl_test.cc @@ -17,7 +17,7 @@ TEST_F(GrpcSubscriptionImplTest, StreamCreationFailure) { EXPECT_CALL(callbacks_, onConfigUpdateFailed(_)); EXPECT_CALL(*timer_, enableTimer(_)); subscription_->start({"cluster0", "cluster1"}, callbacks_); - verifyStats(1, 0, 0, 1); + verifyStats(2, 0, 0, 1); // Ensure this doesn't cause an issue by sending a request, since we don't // have a gRPC stream. subscription_->updateResources({"cluster2"}); @@ -33,11 +33,11 @@ TEST_F(GrpcSubscriptionImplTest, RemoteStreamClose) { startSubscription({"cluster0", "cluster1"}); verifyStats(1, 0, 0, 0); Http::HeaderMapPtr trailers{new Http::TestHeaderMapImpl{}}; - subscription_->onReceiveTrailingMetadata(std::move(trailers)); + subscription_->grpcMux().onReceiveTrailingMetadata(std::move(trailers)); EXPECT_CALL(*timer_, enableTimer(_)); EXPECT_CALL(callbacks_, onConfigUpdateFailed(_)); - subscription_->onRemoteClose(Grpc::Status::GrpcStatus::Canceled, ""); - verifyStats(1, 0, 0, 1); + subscription_->grpcMux().onRemoteClose(Grpc::Status::GrpcStatus::Canceled, ""); + verifyStats(2, 0, 0, 1); // Retry and succeed. EXPECT_CALL(*async_client_, start(_, _)).WillOnce(Return(&async_stream_)); expectSendMessage({"cluster0", "cluster1"}, ""); @@ -48,24 +48,22 @@ TEST_F(GrpcSubscriptionImplTest, RemoteStreamClose) { // Validate that When the management server gets multiple requests for the same version, it can // ignore later ones. This allows the nonce to be used. TEST_F(GrpcSubscriptionImplTest, RepeatedNonce) { + InSequence s; startSubscription({"cluster0", "cluster1"}); verifyStats(1, 0, 0, 0); // First with the initial, empty version update to "0". - expectSendMessage({"cluster2"}, ""); updateResources({"cluster2"}); verifyStats(2, 0, 0, 0); - deliverConfigUpdate({"cluster0", "cluster1"}, "0", false); + deliverConfigUpdate({"cluster0", "cluster2"}, "0", false); verifyStats(3, 0, 1, 0); - deliverConfigUpdate({"cluster0", "cluster1"}, "0", true); + deliverConfigUpdate({"cluster0", "cluster2"}, "0", true); verifyStats(4, 1, 1, 0); // Now with version "0" update to "1". - expectSendMessage({"cluster3"}, "0"); updateResources({"cluster3"}); verifyStats(5, 1, 1, 0); - deliverConfigUpdate({"cluster2"}, "1", false); + deliverConfigUpdate({"cluster3"}, "1", false); verifyStats(6, 1, 2, 0); - Mock::VerifyAndClearExpectations(&async_stream_); - deliverConfigUpdate({"cluster2"}, "1", true); + deliverConfigUpdate({"cluster3"}, "1", true); verifyStats(7, 2, 2, 0); } diff --git a/test/common/config/grpc_subscription_test_harness.h b/test/common/config/grpc_subscription_test_harness.h index bc6f617c8196..c7703806cb15 100644 --- a/test/common/config/grpc_subscription_test_harness.h +++ b/test/common/config/grpc_subscription_test_harness.h @@ -2,6 +2,7 @@ #include "common/common/hash.h" #include "common/config/grpc_subscription_impl.h" +#include "common/config/resources.h" #include "test/common/config/subscription_test_harness.h" #include "test/mocks/config/mocks.h" @@ -43,6 +44,8 @@ class GrpcSubscriptionTestHarness : public SubscriptionTestHarness { *method_descriptor_, stats_)); } + ~GrpcSubscriptionTestHarness() { EXPECT_CALL(async_stream_, sendMessage(_, false)); } + void expectSendMessage(const std::vector& cluster_names, const std::string& version) override { envoy::api::v2::DiscoveryRequest expected_request; @@ -54,6 +57,7 @@ class GrpcSubscriptionTestHarness : public SubscriptionTestHarness { expected_request.set_version_info(version); } expected_request.set_response_nonce(last_response_nonce_); + expected_request.set_type_url(Config::TypeUrl::get().ClusterLoadAssignment); EXPECT_CALL(async_stream_, sendMessage(ProtoEq(expected_request), false)); } @@ -65,9 +69,9 @@ class GrpcSubscriptionTestHarness : public SubscriptionTestHarness { // These are just there to add coverage to the null implementations of these // callbacks. Http::HeaderMapPtr response_headers{new Http::TestHeaderMapImpl{}}; - subscription_->onReceiveInitialMetadata(std::move(response_headers)); + subscription_->grpcMux().onReceiveInitialMetadata(std::move(response_headers)); Http::TestHeaderMapImpl request_headers; - subscription_->onCreateInitialMetadata(request_headers); + subscription_->grpcMux().onCreateInitialMetadata(request_headers); } void deliverConfigUpdate(const std::vector cluster_names, const std::string& version, @@ -77,11 +81,15 @@ class GrpcSubscriptionTestHarness : public SubscriptionTestHarness { response->set_version_info(version); last_response_nonce_ = std::to_string(HashUtil::xxHash64(version)); response->set_nonce(last_response_nonce_); + response->set_type_url(Config::TypeUrl::get().ClusterLoadAssignment); Protobuf::RepeatedPtrField typed_resources; for (const auto& cluster : cluster_names) { - envoy::api::v2::ClusterLoadAssignment* load_assignment = typed_resources.Add(); - load_assignment->set_cluster_name(cluster); - response->add_resources()->PackFrom(*load_assignment); + if (std::find(last_cluster_names_.begin(), last_cluster_names_.end(), cluster) != + last_cluster_names_.end()) { + envoy::api::v2::ClusterLoadAssignment* load_assignment = typed_resources.Add(); + load_assignment->set_cluster_name(cluster); + response->add_resources()->PackFrom(*load_assignment); + } } EXPECT_CALL(callbacks_, onConfigUpdate(RepeatedProtoEq(typed_resources))) .WillOnce(ThrowOnRejectedConfig(accept)); @@ -89,15 +97,20 @@ class GrpcSubscriptionTestHarness : public SubscriptionTestHarness { expectSendMessage(last_cluster_names_, version); version_ = version; } else { - expectSendMessage(last_cluster_names_, version_); EXPECT_CALL(callbacks_, onConfigUpdateFailed(_)); + expectSendMessage(last_cluster_names_, version_); } - subscription_->onReceiveMessage(std::move(response)); + subscription_->grpcMux().onReceiveMessage(std::move(response)); EXPECT_EQ(version_, subscription_->versionInfo()); Mock::VerifyAndClearExpectations(&async_stream_); } void updateResources(const std::vector& cluster_names) override { + std::vector cluster_superset = cluster_names; + cluster_superset.insert(cluster_superset.end(), last_cluster_names_.begin(), + last_cluster_names_.end()); + expectSendMessage(cluster_superset, version_); + expectSendMessage(cluster_names, version_); subscription_->updateResources(cluster_names); last_cluster_names_ = cluster_names; } diff --git a/test/common/config/http_subscription_test_harness.h b/test/common/config/http_subscription_test_harness.h index d6e785ef8ed0..ccad638d24a7 100644 --- a/test/common/config/http_subscription_test_harness.h +++ b/test/common/config/http_subscription_test_harness.h @@ -87,6 +87,7 @@ class HttpSubscriptionTestHarness : public SubscriptionTestHarness { void updateResources(const std::vector& cluster_names) override { cluster_names_ = cluster_names; + expectSendMessage(cluster_names, version_); subscription_->updateResources(cluster_names); timer_cb_(); } diff --git a/test/common/config/subscription_impl_test.cc b/test/common/config/subscription_impl_test.cc index 37f37515a57a..9ebcf9af9130 100644 --- a/test/common/config/subscription_impl_test.cc +++ b/test/common/config/subscription_impl_test.cc @@ -1,4 +1,3 @@ -#include "test/common/config/ads_subscription_test_harness.h" #include "test/common/config/filesystem_subscription_test_harness.h" #include "test/common/config/grpc_subscription_test_harness.h" #include "test/common/config/http_subscription_test_harness.h" @@ -12,7 +11,6 @@ enum class SubscriptionType { Grpc, Http, Filesystem, - Ads, }; class SubscriptionImplTest : public testing::TestWithParam { @@ -28,9 +26,6 @@ class SubscriptionImplTest : public testing::TestWithParam { case SubscriptionType::Filesystem: test_harness_.reset(new FilesystemSubscriptionTestHarness()); break; - case SubscriptionType::Ads: - test_harness_.reset(new AdsSubscriptionTestHarness()); - break; } } @@ -61,7 +56,7 @@ class SubscriptionImplTest : public testing::TestWithParam { INSTANTIATE_TEST_CASE_P(SubscriptionImplTest, SubscriptionImplTest, testing::ValuesIn({SubscriptionType::Grpc, SubscriptionType::Http, - SubscriptionType::Filesystem, SubscriptionType::Ads})); + SubscriptionType::Filesystem})); // Validate basic request-response succeeds. TEST_P(SubscriptionImplTest, InitialRequestResponse) { @@ -115,7 +110,6 @@ TEST_P(SubscriptionImplTest, UpdateResources) { verifyStats(1, 0, 0, 0); deliverConfigUpdate({"cluster0", "cluster1"}, "0", true); verifyStats(2, 1, 0, 0); - expectSendMessage({"cluster2"}, "0"); updateResources({"cluster2"}); verifyStats(3, 1, 0, 0); } diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index e5cb77fbd9ef..3f54913d708e 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -109,7 +109,7 @@ class ClusterManagerImplTest : public testing::Test { void create(const envoy::api::v2::Bootstrap& bootstrap) { cluster_manager_.reset(new ClusterManagerImpl( bootstrap, factory_, factory_.stats_, factory_.tls_, factory_.runtime_, factory_.random_, - factory_.local_info_, log_manager_)); + factory_.local_info_, log_manager_, factory_.dispatcher_)); } NiceMock factory_; @@ -747,8 +747,6 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemove) { // drain callbacks, etc. dns_timer_->callback_(); dns_callback(TestUtility::makeDnsResponse({"127.0.0.2", "127.0.0.3"})); - dns_timer_->callback_(); - dns_callback(TestUtility::makeDnsResponse({"127.0.0.2"})); factory_.tls_.shutdownThread(); } diff --git a/test/config/integration/BUILD b/test/config/integration/BUILD index 19bf78f0998b..cced7b128fae 100644 --- a/test/config/integration/BUILD +++ b/test/config/integration/BUILD @@ -10,6 +10,7 @@ envoy_package() exports_files([ "echo_server.json", "server.json", + "server_ads.yaml", "server_cors_filter.json", "server_grpc_json_transcoder.json", "server_http2.json", diff --git a/test/config/integration/server_ads.yaml b/test/config/integration/server_ads.yaml new file mode 100644 index 000000000000..d2f5e05b338a --- /dev/null +++ b/test/config/integration/server_ads.yaml @@ -0,0 +1,23 @@ +dynamic_resources: + lds_config: {ads: {}} + cds_config: {ads: {}} + ads_config: + api_type: GRPC + cluster_name: [ads_cluster] +static_resources: + clusters: + - name: ads_cluster + connect_timeout: { seconds: 5 } + type: STATIC + hosts: + - socket_address: + address: {{ ntop_ip_loopback_address }} + port_value: {{ ads_upstream }} + lb_policy: ROUND_ROBIN + http2_protocol_options: {} +admin: + access_log_path: /dev/null + address: + socket_address: + address: {{ ntop_ip_loopback_address }} + port_value: 0 diff --git a/test/integration/BUILD b/test/integration/BUILD index db62f96a95ef..0a15103a33be 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -11,6 +11,26 @@ load( envoy_package() +envoy_cc_test( + name = "ads_integration_test", + srcs = ["ads_integration_test.cc"], + data = ["//test/config/integration:server_ads.yaml"], + external_deps = [ + "envoy_cds", + "envoy_discovery", + "envoy_eds", + "envoy_lds", + "envoy_rds", + ], + deps = [ + ":http_integration_lib", + "//source/common/config:resources_lib", + "//source/common/protobuf:utility_lib", + "//test/test_common:network_utility_lib", + "//test/test_common:utility_lib", + ], +) + envoy_cc_test( name = "legacy_json_integration_test", srcs = ["legacy_json_integration_test.cc"], @@ -189,6 +209,7 @@ envoy_cc_test_library( "//include/envoy/api:api_interface", "//include/envoy/buffer:buffer_interface", "//include/envoy/event:dispatcher_interface", + "//include/envoy/grpc:status", "//include/envoy/http:codec_interface", "//include/envoy/http:header_map_interface", "//include/envoy/network:connection_interface", @@ -198,9 +219,12 @@ envoy_cc_test_library( "//include/envoy/server:options_interface", "//source/common/api:api_lib", "//source/common/buffer:buffer_lib", + "//source/common/buffer:zero_copy_input_stream_lib", "//source/common/common:assert_lib", "//source/common/common:logger_lib", "//source/common/common:thread_lib", + "//source/common/grpc:codec_lib", + "//source/common/grpc:common_lib", "//source/common/http:codec_client_lib", "//source/common/http:header_map_lib", "//source/common/http:headers_lib", diff --git a/test/integration/ads_integration_test.cc b/test/integration/ads_integration_test.cc new file mode 100644 index 000000000000..cad94a7fef2a --- /dev/null +++ b/test/integration/ads_integration_test.cc @@ -0,0 +1,281 @@ +#include "common/config/resources.h" +#include "common/protobuf/utility.h" + +#include "test/integration/http_integration.h" +#include "test/integration/utility.h" +#include "test/test_common/network_utility.h" +#include "test/test_common/utility.h" + +#include "api/cds.pb.h" +#include "api/discovery.pb.h" +#include "api/eds.pb.h" +#include "api/lds.pb.h" +#include "api/rds.pb.h" +#include "gtest/gtest.h" + +using testing::AssertionFailure; +using testing::AssertionResult; +using testing::AssertionSuccess; + +namespace Envoy { +namespace { + +class AdsIntegrationTest : public HttpIntegrationTest, + public testing::TestWithParam { +public: + AdsIntegrationTest() : HttpIntegrationTest(Http::CodecClient::Type::HTTP2, GetParam()) {} + + void SetUp() override { + fake_upstreams_.emplace_back(new FakeUpstream(0, FakeHttpConnection::Type::HTTP2, version_)); + registerPort("endpoint", fake_upstreams_.back()->localAddress()->ip()->port()); + fake_upstreams_.emplace_back(new FakeUpstream(0, FakeHttpConnection::Type::HTTP2, version_)); + registerPort("ads_upstream", fake_upstreams_.back()->localAddress()->ip()->port()); + createTestServer("test/config/integration/server_ads.yaml", {"http"}); + } + + void TearDown() override { + test_server_.reset(); + fake_upstreams_.clear(); + } + + AssertionResult compareDiscoveryRequest(const std::string& expected_type_url, + const std::string& expected_version, + const std::vector& expected_resource_names) { + envoy::api::v2::DiscoveryRequest discovery_request; + ads_stream_->waitForGrpcMessage(*dispatcher_, discovery_request); + if (expected_type_url != discovery_request.type_url()) { + return AssertionFailure() << fmt::format("type_url {} does not match expected {}", + discovery_request.type_url(), expected_type_url); + } + const std::vector resource_names(discovery_request.resource_names().cbegin(), + discovery_request.resource_names().cend()); + if (expected_resource_names != resource_names) { + return AssertionFailure() << fmt::format( + "resources {} do not match expected {} in {}", + fmt::join(resource_names.begin(), resource_names.end(), ","), + fmt::join(expected_resource_names.begin(), expected_resource_names.end(), ","), + discovery_request.DebugString()); + } + if (expected_version != discovery_request.version_info()) { + return AssertionFailure() << fmt::format("version {} does not match expected {} in {}", + discovery_request.version_info(), expected_version, + discovery_request.DebugString()); + } + return AssertionSuccess(); + } + + void sendDiscoveryResponse(const std::string& type_url, const Protobuf::Message& message, + const std::string& version) { + envoy::api::v2::DiscoveryResponse discovery_response; + discovery_response.set_version_info(version); + discovery_response.set_type_url(type_url); + discovery_response.add_resources()->PackFrom(message); + ads_stream_->sendGrpcMessage(discovery_response); + } + + envoy::api::v2::Cluster buildCluster(const std::string& name) { + return TestUtility::parseYaml(fmt::format(R"EOF( + name: {} + connect_timeout: 5s + type: EDS + eds_cluster_config: {{ eds_config: {{ ads: {{}} }} }} + lb_policy: ROUND_ROBIN + http2_protocol_options: {{}} + )EOF", + name)); + } + + envoy::api::v2::ClusterLoadAssignment buildClusterLoadAssignment(const std::string& name) { + return TestUtility::parseYaml( + fmt::format(R"EOF( + cluster_name: {} + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: {} + port_value: {} + )EOF", + name, Network::Test::getLoopbackAddressString(GetParam()), + fake_upstreams_[0]->localAddress()->ip()->port())); + } + + envoy::api::v2::Listener buildListener(const std::string& name, const std::string& route_config) { + return TestUtility::parseYaml( + fmt::format(R"EOF( + name: {} + address: + socket_address: + address: {} + port_value: 0 + filter_chains: + filters: + - name: envoy.http_connection_manager + config: + codec_type: HTTP2 + rds: + route_config_name: {} + config_source: {{ ads: {{}} }} + http_filters: [{{ name: envoy.router, config: {{ deprecated_v1: true }}}}] + )EOF", + name, Network::Test::getLoopbackAddressString(GetParam()), route_config)); + } + + envoy::api::v2::RouteConfiguration buildRouteConfig(const std::string& name, + const std::string& cluster) { + return TestUtility::parseYaml(fmt::format(R"EOF( + name: {} + virtual_hosts: + - name: integration + domains: ["*"] + routes: + - match: {{ prefix: "/" }} + route: {{ cluster: {} }} + )EOF", + name, cluster)); + } + + void makeSingleRequest() { + registerTestServerPorts({"http"}); + auto client_conn = makeClientConnection(lookupPort("http")); + testRouterHeaderOnlyRequestAndResponse(std::move(client_conn), true); + cleanupUpstreamAndDownstream(); + fake_upstream_connection_ = nullptr; + } + + void initialize() { + ads_connection_ = fake_upstreams_[1]->waitForHttpConnection(*dispatcher_); + ads_stream_ = ads_connection_->waitForNewStream(); + ads_stream_->startGrpcStream(); + } + + FakeHttpConnectionPtr ads_connection_; + FakeStreamPtr ads_stream_; +}; + +INSTANTIATE_TEST_CASE_P(IpVersions, AdsIntegrationTest, + testing::ValuesIn(TestEnvironment::getIpVersionsForTest())); + +// Validate basic config delivery and upgrade. +TEST_P(AdsIntegrationTest, Basic) { + initialize(); + + // Send initial configuration, validate we can process a request. + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "", {})); + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, buildCluster("cluster_0"), "1"); + + EXPECT_TRUE( + compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "", {"cluster_0"})); + sendDiscoveryResponse(Config::TypeUrl::get().ClusterLoadAssignment, + buildClusterLoadAssignment("cluster_0"), "1"); + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "1", {})); + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Listener, "", {})); + sendDiscoveryResponse(Config::TypeUrl::get().Listener, + buildListener("listener_0", "route_config_0"), "1"); + + EXPECT_TRUE( + compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "1", {"cluster_0"})); + EXPECT_TRUE( + compareDiscoveryRequest(Config::TypeUrl::get().RouteConfiguration, "", {"route_config_0"})); + sendDiscoveryResponse(Config::TypeUrl::get().RouteConfiguration, + buildRouteConfig("route_config_0", "cluster_0"), "1"); + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Listener, "1", {})); + EXPECT_TRUE( + compareDiscoveryRequest(Config::TypeUrl::get().RouteConfiguration, "1", {"route_config_0"})); + + test_server_->waitForCounterGe("listener_manager.listener_create_success", 1); + makeSingleRequest(); + + // Upgrade RDS/CDS/EDS to a newer config, validate we can process a request. + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, buildCluster("cluster_1"), "2"); + sendDiscoveryResponse(Config::TypeUrl::get().ClusterLoadAssignment, + buildClusterLoadAssignment("cluster_1"), "2"); + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "1", + {"cluster_1", "cluster_0"})); + EXPECT_TRUE( + compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "1", {"cluster_1"})); + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "2", {})); + EXPECT_TRUE( + compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "2", {"cluster_1"})); + sendDiscoveryResponse(Config::TypeUrl::get().RouteConfiguration, + buildRouteConfig("route_config_0", "cluster_1"), "2"); + EXPECT_TRUE( + compareDiscoveryRequest(Config::TypeUrl::get().RouteConfiguration, "2", {"route_config_0"})); + + makeSingleRequest(); + + // Upgrade LDS/RDS, validate we can process a request. + sendDiscoveryResponse(Config::TypeUrl::get().Listener, + buildListener("listener_1", "route_config_1"), "2"); + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().RouteConfiguration, "2", + {"route_config_1", "route_config_0"})); + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Listener, "2", {})); + EXPECT_TRUE( + compareDiscoveryRequest(Config::TypeUrl::get().RouteConfiguration, "2", {"route_config_1"})); + sendDiscoveryResponse(Config::TypeUrl::get().RouteConfiguration, + buildRouteConfig("route_config_1", "cluster_1"), "3"); + EXPECT_TRUE( + compareDiscoveryRequest(Config::TypeUrl::get().RouteConfiguration, "3", {"route_config_1"})); + + test_server_->waitForCounterGe("listener_manager.listener_create_success", 2); + makeSingleRequest(); +} + +// Validate that we can recover from failures. +TEST_P(AdsIntegrationTest, Failure) { + initialize(); + + // Send initial configuration, failing each xDS once (via a type mismatch), validate we can + // process a request. + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "", {})); + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, buildClusterLoadAssignment("cluster_0"), + "1"); + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Listener, "", {})); + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "", {})); + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, buildCluster("cluster_0"), "1"); + + EXPECT_TRUE( + compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "", {"cluster_0"})); + sendDiscoveryResponse(Config::TypeUrl::get().ClusterLoadAssignment, buildCluster("cluster_0"), + "1"); + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "1", {})); + EXPECT_TRUE( + compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "", {"cluster_0"})); + sendDiscoveryResponse(Config::TypeUrl::get().ClusterLoadAssignment, + buildClusterLoadAssignment("cluster_0"), "1"); + + EXPECT_TRUE( + compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "1", {"cluster_0"})); + sendDiscoveryResponse(Config::TypeUrl::get().Listener, + buildRouteConfig("listener_0", "route_config_0"), "1"); + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Listener, "", {})); + sendDiscoveryResponse(Config::TypeUrl::get().Listener, + buildListener("listener_0", "route_config_0"), "1"); + + EXPECT_TRUE( + compareDiscoveryRequest(Config::TypeUrl::get().RouteConfiguration, "", {"route_config_0"})); + sendDiscoveryResponse(Config::TypeUrl::get().RouteConfiguration, + buildListener("route_config_0", "cluster_0"), "1"); + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Listener, "1", {})); + EXPECT_TRUE( + compareDiscoveryRequest(Config::TypeUrl::get().RouteConfiguration, "", {"route_config_0"})); + sendDiscoveryResponse(Config::TypeUrl::get().RouteConfiguration, + buildRouteConfig("route_config_0", "cluster_0"), "1"); + + EXPECT_TRUE( + compareDiscoveryRequest(Config::TypeUrl::get().RouteConfiguration, "1", {"route_config_0"})); + + test_server_->waitForCounterGe("listener_manager.listener_create_success", 1); + makeSingleRequest(); +} + +} // namespace +} // namespace Envoy diff --git a/test/integration/fake_upstream.cc b/test/integration/fake_upstream.cc index ad778d71f270..67a64e99d558 100644 --- a/test/integration/fake_upstream.cc +++ b/test/integration/fake_upstream.cc @@ -98,10 +98,10 @@ void FakeStream::waitForHeadersComplete() { void FakeStream::waitForData(Event::Dispatcher& client_dispatcher, uint64_t body_length) { std::unique_lock lock(lock_); - while (bodyLength() != body_length) { + while (bodyLength() < body_length) { decoder_event_.wait_until(lock, std::chrono::system_clock::now() + std::chrono::milliseconds(5)); - if (bodyLength() != body_length) { + if (bodyLength() < body_length) { // Run the client dispatcher since we may need to process window updates, etc. client_dispatcher.run(Event::Dispatcher::RunType::NonBlock); } @@ -127,6 +127,15 @@ void FakeStream::waitForReset() { } } +void FakeStream::startGrpcStream() { + encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false); +} + +void FakeStream::finishGrpcStream(Grpc::Status::GrpcStatus status) { + encodeTrailers( + Http::TestHeaderMapImpl{{"grpc-status", std::to_string(static_cast(status))}}); +} + FakeHttpConnection::FakeHttpConnection(QueuedConnectionWrapperPtr connection_wrapper, Stats::Store& store, Type type) : FakeConnectionBase(std::move(connection_wrapper)) { diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index 6acd3d66c18a..c6ebfbfb3151 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -8,6 +8,7 @@ #include #include "envoy/api/api.h" +#include "envoy/grpc/status.h" #include "envoy/http/codec.h" #include "envoy/network/connection.h" #include "envoy/network/connection_handler.h" @@ -15,7 +16,10 @@ #include "envoy/server/configuration.h" #include "common/buffer/buffer_impl.h" +#include "common/buffer/zero_copy_input_stream_impl.h" #include "common/common/thread.h" +#include "common/grpc/codec.h" +#include "common/grpc/common.h" #include "common/network/filter_impl.h" #include "common/network/listen_socket_impl.h" #include "common/stats/stats_impl.h" @@ -29,7 +33,9 @@ class FakeHttpConnection; /** * Provides a fake HTTP stream for integration testing. */ -class FakeStream : public Http::StreamDecoder, public Http::StreamCallbacks { +class FakeStream : public Http::StreamDecoder, + public Http::StreamCallbacks, + Logger::Loggable { public: FakeStream(FakeHttpConnection& parent, Http::StreamEncoder& encoder); @@ -48,6 +54,41 @@ class FakeStream : public Http::StreamDecoder, public Http::StreamCallbacks { void waitForEndStream(Event::Dispatcher& client_dispatcher); void waitForReset(); + // gRPC convenience methods. + void startGrpcStream(); + void finishGrpcStream(Grpc::Status::GrpcStatus status); + template void sendGrpcMessage(const T& message) { + auto serialized_response = Grpc::Common::serializeBody(message); + encodeData(*serialized_response, false); + } + template void decodeGrpcFrame(T& message) { + EXPECT_GE(decoded_grpc_frames_.size(), 1); + Buffer::ZeroCopyInputStreamImpl stream(std::move(decoded_grpc_frames_[0].data_)); + EXPECT_TRUE(decoded_grpc_frames_[0].flags_ == Grpc::GRPC_FH_DEFAULT); + EXPECT_TRUE(message.ParseFromZeroCopyStream(&stream)); + ENVOY_LOG(debug, "Received gRPC message: {}", message.DebugString()); + decoded_grpc_frames_.erase(decoded_grpc_frames_.begin()); + } + template void waitForGrpcMessage(Event::Dispatcher& client_dispatcher, T& message) { + if (!decoded_grpc_frames_.empty()) { + decodeGrpcFrame(message); + return; + } + waitForData(client_dispatcher, 5); + { + std::unique_lock lock(lock_); + EXPECT_TRUE(grpc_decoder_.decode(body(), decoded_grpc_frames_)); + } + if (decoded_grpc_frames_.size() < 1) { + waitForData(client_dispatcher, grpc_decoder_.length()); + { + std::unique_lock lock(lock_); + EXPECT_TRUE(grpc_decoder_.decode(body(), decoded_grpc_frames_)); + } + } + decodeGrpcFrame(message); + } + // Http::StreamDecoder void decodeHeaders(Http::HeaderMapPtr&& headers, bool end_stream) override; void decodeData(Buffer::Instance& data, bool end_stream) override; @@ -68,6 +109,8 @@ class FakeStream : public Http::StreamDecoder, public Http::StreamCallbacks { bool end_stream_{}; Buffer::OwnedImpl body_; bool saw_reset_{}; + Grpc::Decoder grpc_decoder_; + std::vector decoded_grpc_frames_; }; typedef std::unique_ptr FakeStreamPtr; diff --git a/test/integration/ratelimit_integration_test.cc b/test/integration/ratelimit_integration_test.cc index b0c145d1c737..fb117487b6f7 100644 --- a/test/integration/ratelimit_integration_test.cc +++ b/test/integration/ratelimit_integration_test.cc @@ -40,6 +40,8 @@ class RatelimitIntegrationTest : public HttpIntegrationTest, void waitForRatelimitRequest() { fake_ratelimit_connection_ = fake_upstreams_[1]->waitForHttpConnection(*dispatcher_); ratelimit_request_ = fake_ratelimit_connection_->waitForNewStream(); + pb::lyft::ratelimit::RateLimitRequest request_msg; + ratelimit_request_->waitForGrpcMessage(*dispatcher_, request_msg); ratelimit_request_->waitForEndStream(*dispatcher_); EXPECT_STREQ("POST", ratelimit_request_->headers().Method()->value().c_str()); EXPECT_STREQ("/pb.lyft.ratelimit.RateLimitService/ShouldRateLimit", @@ -51,16 +53,6 @@ class RatelimitIntegrationTest : public HttpIntegrationTest, auto* entry = expected_request_msg.add_descriptors()->add_entries(); entry->set_key("destination_cluster"); entry->set_value("traffic"); - - Grpc::Decoder decoder; - std::vector decoded_frames; - EXPECT_TRUE(decoder.decode(ratelimit_request_->body(), decoded_frames)); - EXPECT_EQ(1, decoded_frames.size()); - pb::lyft::ratelimit::RateLimitRequest request_msg; - Buffer::ZeroCopyInputStreamImpl stream(std::move(decoded_frames[0].data_)); - EXPECT_TRUE(decoded_frames[0].flags_ == Grpc::GRPC_FH_DEFAULT); - EXPECT_TRUE(request_msg.ParseFromZeroCopyStream(&stream)); - EXPECT_EQ(expected_request_msg.DebugString(), request_msg.DebugString()); } @@ -89,12 +81,11 @@ class RatelimitIntegrationTest : public HttpIntegrationTest, } void sendRateLimitResponse(pb::lyft::ratelimit::RateLimitResponse_Code code) { - ratelimit_request_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false); + ratelimit_request_->startGrpcStream(); pb::lyft::ratelimit::RateLimitResponse response_msg; response_msg.set_overall_code(code); - auto serialized_response = Grpc::Common::serializeBody(response_msg); - ratelimit_request_->encodeData(*serialized_response, false); - ratelimit_request_->encodeTrailers(Http::TestHeaderMapImpl{{"grpc-status", "0"}}); + ratelimit_request_->sendGrpcMessage(response_msg); + ratelimit_request_->finishGrpcStream(Grpc::Status::Ok); } void cleanup() { diff --git a/test/mocks/config/BUILD b/test/mocks/config/BUILD index da73894fce00..5f0ff1a7a16d 100644 --- a/test/mocks/config/BUILD +++ b/test/mocks/config/BUILD @@ -13,7 +13,7 @@ envoy_cc_mock( srcs = ["mocks.cc"], hdrs = ["mocks.h"], deps = [ - "//include/envoy/config:ads_interface", + "//include/envoy/config:grpc_mux_interface", "//include/envoy/config:subscription_interface", ], ) diff --git a/test/mocks/config/mocks.cc b/test/mocks/config/mocks.cc index c0c3a2f811f4..d50477a29eb7 100644 --- a/test/mocks/config/mocks.cc +++ b/test/mocks/config/mocks.cc @@ -3,17 +3,20 @@ namespace Envoy { namespace Config { -MockAdsWatch::MockAdsWatch() {} -MockAdsWatch::~MockAdsWatch() { cancel(); } +MockGrpcMuxWatch::MockGrpcMuxWatch() {} +MockGrpcMuxWatch::~MockGrpcMuxWatch() { cancel(); } -MockAdsApi::MockAdsApi() {} -MockAdsApi::~MockAdsApi() {} +MockGrpcMux::MockGrpcMux() {} +MockGrpcMux::~MockGrpcMux() {} -AdsWatchPtr MockAdsApi::subscribe(const std::string& type_url, - const std::vector& resources, - AdsCallbacks& callbacks) { - return AdsWatchPtr(subscribe_(type_url, resources, callbacks)); +GrpcMuxWatchPtr MockGrpcMux::subscribe(const std::string& type_url, + const std::vector& resources, + GrpcMuxCallbacks& callbacks) { + return GrpcMuxWatchPtr(subscribe_(type_url, resources, callbacks)); } +MockGrpcMuxCallbacks::MockGrpcMuxCallbacks() {} +MockGrpcMuxCallbacks::~MockGrpcMuxCallbacks() {} + } // namespace Config } // namespace Envoy diff --git a/test/mocks/config/mocks.h b/test/mocks/config/mocks.h index c1eb5f35cc3f..4af766289f42 100644 --- a/test/mocks/config/mocks.h +++ b/test/mocks/config/mocks.h @@ -1,6 +1,6 @@ #pragma once -#include "envoy/config/ads.h" +#include "envoy/config/grpc_mux.h" #include "envoy/config/subscription.h" #include "gmock/gmock.h" @@ -26,24 +26,35 @@ template class MockSubscription : public Subscription& resources, - AdsCallbacks& callbacks)); - AdsWatchPtr subscribe(const std::string& type_url, const std::vector& resources, - AdsCallbacks& callbacks); + GrpcMuxWatch*(const std::string& type_url, const std::vector& resources, + GrpcMuxCallbacks& callbacks)); + GrpcMuxWatchPtr subscribe(const std::string& type_url, const std::vector& resources, + GrpcMuxCallbacks& callbacks); +}; + +class MockGrpcMuxCallbacks : public GrpcMuxCallbacks { +public: + MockGrpcMuxCallbacks(); + virtual ~MockGrpcMuxCallbacks(); + + MOCK_METHOD2(onConfigUpdate, void(const Protobuf::RepeatedPtrField& resources, + const std::string& version_info)); + MOCK_METHOD1(onConfigUpdateFailed, void(const EnvoyException* e)); }; } // namespace Config diff --git a/test/mocks/upstream/mocks.h b/test/mocks/upstream/mocks.h index 309d0f4dedb8..cd811d755c37 100644 --- a/test/mocks/upstream/mocks.h +++ b/test/mocks/upstream/mocks.h @@ -114,7 +114,7 @@ class MockClusterManager : public ClusterManager { MOCK_METHOD1(removePrimaryCluster, bool(const std::string& cluster)); MOCK_METHOD0(shutdown, void()); MOCK_CONST_METHOD0(sourceAddress, const Network::Address::InstanceConstSharedPtr&()); - MOCK_METHOD0(adsProvider, Config::AdsApi&()); + MOCK_METHOD0(adsMux, Config::GrpcMux&()); NiceMock conn_pool_; NiceMock async_client_; diff --git a/test/test_common/BUILD b/test/test_common/BUILD index c8d4a418d3c3..93c1edee217f 100644 --- a/test/test_common/BUILD +++ b/test/test_common/BUILD @@ -71,5 +71,6 @@ envoy_cc_test_library( "//source/common/json:json_loader_lib", "//source/common/network:address_lib", "//source/common/network:utility_lib", + "//source/common/protobuf:utility_lib", ], ) diff --git a/test/test_common/utility.h b/test/test_common/utility.h index f3adbcdb0948..589e07797e9e 100644 --- a/test/test_common/utility.h +++ b/test/test_common/utility.h @@ -14,6 +14,7 @@ #include "envoy/stats/stats.h" #include "common/http/header_map_impl.h" +#include "common/protobuf/utility.h" #include "test/test_common/printers.h" @@ -126,6 +127,17 @@ class TestUtility { return "127.0.0.9"; #endif } + + /** + * Return typed proto message object for YAML. + * @param yaml YAML string. + * @return MessageType parsed from yaml. + */ + template static MessageType parseYaml(const std::string& yaml) { + MessageType message; + MessageUtil::loadFromYaml(yaml, message); + return message; + } }; /**