diff --git a/envoy/config/grpc_mux.h b/envoy/config/grpc_mux.h index 0139cb3d9524..f3c5d7c00808 100644 --- a/envoy/config/grpc_mux.h +++ b/envoy/config/grpc_mux.h @@ -105,6 +105,9 @@ class GrpcMux { virtual void requestOnDemandUpdate(const std::string& type_url, const absl::flat_hash_set& for_update) PURE; + + // TODO (dmitri-d) remove this when legacy muxes have been removed + virtual bool isUnified() const { return false; } }; using GrpcMuxPtr = std::unique_ptr; diff --git a/source/common/config/xds_mux/BUILD b/source/common/config/xds_mux/BUILD index f934e46e19dc..de337592a86f 100644 --- a/source/common/config/xds_mux/BUILD +++ b/source/common/config/xds_mux/BUILD @@ -51,3 +51,23 @@ envoy_cc_library( "@envoy_api//envoy/service/discovery/v3:pkg_cc_proto", ], ) + +envoy_cc_library( + name = "grpc_mux_lib", + srcs = ["grpc_mux_impl.cc"], + hdrs = ["grpc_mux_impl.h"], + deps = [ + ":delta_subscription_state_lib", + ":sotw_subscription_state_lib", + "//envoy/event:dispatcher_interface", + "//envoy/grpc:async_client_interface", + "//source/common/config:grpc_stream_lib", + "//source/common/config:pausable_ack_queue_lib", + "//source/common/config:version_converter_lib", + "//source/common/config:watch_map_lib", + "//source/common/config:xds_context_params_lib", + "//source/common/config:xds_resource_lib", + "//source/common/memory:utils_lib", + "@envoy_api//envoy/service/discovery/v3:pkg_cc_proto", + ], +) diff --git a/source/common/config/xds_mux/delta_subscription_state.h b/source/common/config/xds_mux/delta_subscription_state.h index 801bd5edc0c1..ced0c9fd52f0 100644 --- a/source/common/config/xds_mux/delta_subscription_state.h +++ b/source/common/config/xds_mux/delta_subscription_state.h @@ -93,6 +93,20 @@ class DeltaSubscriptionState std::set names_removed_; }; +class DeltaSubscriptionStateFactory : public SubscriptionStateFactory { +public: + DeltaSubscriptionStateFactory(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {} + ~DeltaSubscriptionStateFactory() override = default; + std::unique_ptr + makeSubscriptionState(const std::string& type_url, UntypedConfigUpdateCallbacks& callbacks, + OpaqueResourceDecoder&, const bool wildcard) override { + return std::make_unique(type_url, callbacks, dispatcher_, wildcard); + } + +private: + Event::Dispatcher& dispatcher_; +}; + } // namespace XdsMux } // namespace Config } // namespace Envoy diff --git a/source/common/config/xds_mux/grpc_mux_impl.cc b/source/common/config/xds_mux/grpc_mux_impl.cc new file mode 100644 index 000000000000..0e62f1022d94 --- /dev/null +++ b/source/common/config/xds_mux/grpc_mux_impl.cc @@ -0,0 +1,403 @@ +#include "source/common/config/xds_mux/grpc_mux_impl.h" + +#include "envoy/service/discovery/v3/discovery.pb.h" + +#include "source/common/common/assert.h" +#include "source/common/common/backoff_strategy.h" +#include "source/common/config/decoded_resource_impl.h" +#include "source/common/config/utility.h" +#include "source/common/config/version_converter.h" +#include "source/common/config/xds_context_params.h" +#include "source/common/config/xds_resource.h" +#include "source/common/memory/utils.h" +#include "source/common/protobuf/protobuf.h" +#include "source/common/protobuf/utility.h" + +namespace Envoy { +namespace Config { +namespace XdsMux { + +namespace { +class AllMuxesState { +public: + void insert(ShutdownableMux* mux) { muxes_.insert(mux); } + + void erase(ShutdownableMux* mux) { muxes_.erase(mux); } + + void shutdownAll() { + for (auto& mux : muxes_) { + mux->shutdown(); + } + } + +private: + absl::flat_hash_set muxes_; +}; +using AllMuxes = ThreadSafeSingleton; +} // namespace + +template +GrpcMuxImpl::GrpcMuxImpl(std::unique_ptr subscription_state_factory, + bool skip_subsequent_node, + const LocalInfo::LocalInfo& local_info, + envoy::config::core::v3::ApiVersion transport_api_version, + Grpc::RawAsyncClientPtr&& async_client, + Event::Dispatcher& dispatcher, + const Protobuf::MethodDescriptor& service_method, + Random::RandomGenerator& random, Stats::Scope& scope, + const RateLimitSettings& rate_limit_settings) + : grpc_stream_(this, std::move(async_client), service_method, random, dispatcher, scope, + rate_limit_settings), + subscription_state_factory_(std::move(subscription_state_factory)), + skip_subsequent_node_(skip_subsequent_node), local_info_(local_info), + dynamic_update_callback_handle_(local_info.contextProvider().addDynamicContextUpdateCallback( + [this](absl::string_view resource_type_url) { + onDynamicContextUpdate(resource_type_url); + })), + transport_api_version_(transport_api_version) { + Config::Utility::checkLocalInfo("ads", local_info); + AllMuxes::get().insert(this); +} + +template GrpcMuxImpl::~GrpcMuxImpl() { + AllMuxes::get().erase(this); +} + +template void GrpcMuxImpl::shutdownAll() { + AllMuxes::get().shutdownAll(); +} + +template +void GrpcMuxImpl::onDynamicContextUpdate(absl::string_view resource_type_url) { + ENVOY_LOG(debug, "GrpcMuxImpl::onDynamicContextUpdate for {}", resource_type_url); + auto sub = subscriptions_.find(resource_type_url); + if (sub == subscriptions_.end()) { + return; + } + sub->second->setDynamicContextChanged(); + trySendDiscoveryRequests(); +} + +template +Config::GrpcMuxWatchPtr GrpcMuxImpl::addWatch( + const std::string& type_url, const absl::flat_hash_set& resources, + SubscriptionCallbacks& callbacks, OpaqueResourceDecoder& resource_decoder, + const SubscriptionOptions& options) { + auto watch_map = watch_maps_.find(type_url); + if (watch_map == watch_maps_.end()) { + // We don't yet have a subscription for type_url! Make one! + watch_map = + watch_maps_.emplace(type_url, std::make_unique(options.use_namespace_matching_)) + .first; + subscriptions_.emplace( + type_url, subscription_state_factory_->makeSubscriptionState( + type_url, *watch_maps_[type_url], resource_decoder, resources.empty())); + subscription_ordering_.emplace_back(type_url); + } + + Watch* watch = watch_map->second->addWatch(callbacks, resource_decoder); + // updateWatch() queues a discovery request if any of 'resources' are not yet subscribed. + updateWatch(type_url, watch, resources, options); + return std::make_unique(type_url, watch, *this, options); +} + +// Updates the list of resource names watched by the given watch. If an added name is new across +// the whole subscription, or if a removed name has no other watch interested in it, then the +// subscription will enqueue and attempt to send an appropriate discovery request. +template +void GrpcMuxImpl::updateWatch(const std::string& type_url, Watch* watch, + const absl::flat_hash_set& resources, + const SubscriptionOptions& options) { + ENVOY_LOG(debug, "GrpcMuxImpl::updateWatch for {}", type_url); + ASSERT(watch != nullptr); + auto& sub = subscriptionStateFor(type_url); + WatchMap& watch_map = watchMapFor(type_url); + + // We need to prepare xdstp:// resources for the transport, by normalizing and adding any extra + // context parameters. + absl::flat_hash_set effective_resources; + for (const auto& resource : resources) { + if (XdsResourceIdentifier::hasXdsTpScheme(resource)) { + auto xdstp_resource = XdsResourceIdentifier::decodeUrn(resource); + if (options.add_xdstp_node_context_params_) { + const auto context = XdsContextParams::encodeResource( + local_info_.contextProvider().nodeContext(), xdstp_resource.context(), {}, {}); + xdstp_resource.mutable_context()->CopyFrom(context); + } + XdsResourceIdentifier::EncodeOptions encode_options; + encode_options.sort_context_params_ = true; + effective_resources.insert(XdsResourceIdentifier::encodeUrn(xdstp_resource, encode_options)); + } else { + effective_resources.insert(resource); + } + } + + auto added_removed = watch_map.updateWatchInterest(watch, effective_resources); + if (options.use_namespace_matching_) { + // This is to prevent sending out of requests that contain prefixes instead of resource names + sub.updateSubscriptionInterest({}, {}); + } else { + sub.updateSubscriptionInterest(added_removed.added_, added_removed.removed_); + } + + // Tell the server about our change in interest, if any. + if (sub.subscriptionUpdatePending()) { + trySendDiscoveryRequests(); + } +} + +template +void GrpcMuxImpl::removeWatch(const std::string& type_url, Watch* watch) { + updateWatch(type_url, watch, {}, {}); + watchMapFor(type_url).removeWatch(watch); +} + +template +ScopedResume GrpcMuxImpl::pause(const std::string& type_url) { + return pause(std::vector{type_url}); +} + +template +ScopedResume GrpcMuxImpl::pause(const std::vector type_urls) { + for (const auto& type_url : type_urls) { + pausable_ack_queue_.pause(type_url); + } + + return std::make_unique([this, type_urls]() { + for (const auto& type_url : type_urls) { + pausable_ack_queue_.resume(type_url); + trySendDiscoveryRequests(); + } + }); +} + +template +void GrpcMuxImpl::sendGrpcMessage(RQ& msg_proto, S& sub_state) { + if (sub_state.dynamicContextChanged() || !anyRequestSentYetInCurrentStream() || + !skipSubsequentNode()) { + msg_proto.mutable_node()->CopyFrom(localInfo().node()); + } + VersionConverter::prepareMessageForGrpcWire(msg_proto, transportApiVersion()); + sendMessage(msg_proto); + setAnyRequestSentYetInCurrentStream(true); + sub_state.clearDynamicContextChanged(); +} + +template +void GrpcMuxImpl::genericHandleResponse(const std::string& type_url, + const RS& response_proto, + ControlPlaneStats& control_plane_stats) { + auto sub = subscriptions_.find(type_url); + if (sub == subscriptions_.end()) { + ENVOY_LOG(warn, + "The server sent an xDS response proto with type_url {}, which we have " + "not subscribed to. Ignoring.", + type_url); + return; + } + + if (response_proto.has_control_plane()) { + control_plane_stats.identifier_.set(response_proto.control_plane().identifier()); + } + + if (response_proto.control_plane().identifier() != sub->second->controlPlaneIdentifier()) { + sub->second->setControlPlaneIdentifier(response_proto.control_plane().identifier()); + ENVOY_LOG(debug, "Receiving gRPC updates for {} from {}", response_proto.type_url(), + sub->second->controlPlaneIdentifier()); + } + + pausable_ack_queue_.push(sub->second->handleResponse(response_proto)); + trySendDiscoveryRequests(); + Memory::Utils::tryShrinkHeap(); +} + +template void GrpcMuxImpl::start() { + ENVOY_LOG(debug, "GrpcMuxImpl now trying to establish a stream"); + grpc_stream_.establishNewStream(); +} + +template +void GrpcMuxImpl::handleEstablishedStream() { + ENVOY_LOG(debug, "GrpcMuxImpl stream successfully established"); + for (auto& [type_url, subscription_state] : subscriptions_) { + subscription_state->markStreamFresh(); + } + setAnyRequestSentYetInCurrentStream(false); + maybeUpdateQueueSizeStat(0); + pausable_ack_queue_.clear(); + trySendDiscoveryRequests(); +} + +template +void GrpcMuxImpl::handleStreamEstablishmentFailure() { + ENVOY_LOG(debug, "GrpcMuxImpl stream failed to establish"); + // If this happens while Envoy is still initializing, the onConfigUpdateFailed() we ultimately + // call on CDS will cause LDS to start up, which adds to subscriptions_ here. So, to avoid a + // crash, the iteration needs to dance around a little: collect pointers to all + // SubscriptionStates, call on all those pointers we haven't yet called on, repeat if there are + // now more SubscriptionStates. + absl::flat_hash_map all_subscribed; + absl::flat_hash_map already_called; + do { + for (auto& [type_url, subscription_state] : subscriptions_) { + all_subscribed[type_url] = subscription_state.get(); + } + for (auto& sub : all_subscribed) { + if (already_called.insert(sub).second) { // insert succeeded ==> not already called + sub.second->handleEstablishmentFailure(); + } + } + } while (all_subscribed.size() != subscriptions_.size()); +} + +template +S& GrpcMuxImpl::subscriptionStateFor(const std::string& type_url) { + auto sub = subscriptions_.find(type_url); + RELEASE_ASSERT(sub != subscriptions_.end(), + fmt::format("Tried to look up SubscriptionState for non-existent subscription {}.", + type_url)); + return *sub->second; +} + +template +WatchMap& GrpcMuxImpl::watchMapFor(const std::string& type_url) { + auto watch_map = watch_maps_.find(type_url); + RELEASE_ASSERT( + watch_map != watch_maps_.end(), + fmt::format("Tried to look up WatchMap for non-existent subscription {}.", type_url)); + return *watch_map->second; +} + +template +void GrpcMuxImpl::trySendDiscoveryRequests() { + if (shutdown_) { + return; + } + + while (true) { + // Do any of our subscriptions even want to send a request? + absl::optional request_type_if_any = whoWantsToSendDiscoveryRequest(); + if (!request_type_if_any.has_value()) { + break; + } + // If so, which one (by type_url)? + std::string next_request_type_url = request_type_if_any.value(); + auto& sub = subscriptionStateFor(next_request_type_url); + ENVOY_LOG(debug, "GrpcMuxImpl wants to send discovery request for {}", next_request_type_url); + // Try again later if paused/rate limited/stream down. + if (!canSendDiscoveryRequest(next_request_type_url)) { + break; + } + std::unique_ptr request; + // Get our subscription state to generate the appropriate discovery request, and send. + if (!pausable_ack_queue_.empty()) { + // Because ACKs take precedence over plain requests, if there is anything in the queue, it's + // safe to assume it's of the type_url that we're wanting to send. + // + // getNextRequestWithAck() returns a raw unowned pointer, which sendGrpcMessage deletes. + request = sub.getNextRequestWithAck(pausable_ack_queue_.popFront()); + ENVOY_LOG(debug, "GrpcMuxImpl sent ACK discovery request for {}", next_request_type_url); + } else { + // Returns a raw unowned pointer, which sendGrpcMessage deletes. + request = sub.getNextRequestAckless(); + ENVOY_LOG(debug, "GrpcMuxImpl sent non-ACK discovery request for {}", next_request_type_url); + } + ENVOY_LOG(debug, "GrpcMuxImpl skip_subsequent_node: {}", skipSubsequentNode()); + sendGrpcMessage(*request, sub); + } + maybeUpdateQueueSizeStat(pausable_ack_queue_.size()); +} + +// Checks whether external conditions allow sending a discovery request. (Does not check +// whether we *want* to send a discovery request). +template +bool GrpcMuxImpl::canSendDiscoveryRequest(const std::string& type_url) { + RELEASE_ASSERT( + !pausable_ack_queue_.paused(type_url), + fmt::format("canSendDiscoveryRequest() called on paused type_url {}. Pausedness is " + "supposed to be filtered out by whoWantsToSendDiscoveryRequest(). ", + type_url)); + + if (!grpcStreamAvailable()) { + ENVOY_LOG(trace, "No stream available to send a discovery request for {}.", type_url); + return false; + } else if (!rateLimitAllowsDrain()) { + ENVOY_LOG(trace, "{} discovery request hit rate limit; will try later.", type_url); + return false; + } + return true; +} + +// Checks whether we have something to say in a discovery request, which can be an ACK and/or +// a subscription update. (Does not check whether we *can* send that discovery request). +// Returns the type_url we should send the discovery request for (if any). +// First, prioritizes ACKs over non-ACK subscription interest updates. +// Then, prioritizes non-ACK updates in the order the various types +// of subscriptions were activated. +template +absl::optional GrpcMuxImpl::whoWantsToSendDiscoveryRequest() { + // All ACKs are sent before plain updates. trySendDiscoveryRequests() relies on this. So, choose + // type_url from pausable_ack_queue_ if possible, before looking at pending updates. + if (!pausable_ack_queue_.empty()) { + return pausable_ack_queue_.front().type_url_; + } + // If we're looking to send multiple non-ACK requests, send them in the order that their + // subscriptions were initiated. + for (const auto& sub_type : subscription_ordering_) { + auto& sub = subscriptionStateFor(sub_type); + if (sub.subscriptionUpdatePending() && !pausable_ack_queue_.paused(sub_type)) { + return sub_type; + } + } + return absl::nullopt; +} + +template class GrpcMuxImpl; +template class GrpcMuxImpl; + +// Delta- and SotW-specific concrete subclasses: +GrpcMuxDelta::GrpcMuxDelta(Grpc::RawAsyncClientPtr&& async_client, Event::Dispatcher& dispatcher, + const Protobuf::MethodDescriptor& service_method, + envoy::config::core::v3::ApiVersion transport_api_version, + Random::RandomGenerator& random, Stats::Scope& scope, + const RateLimitSettings& rate_limit_settings, + const LocalInfo::LocalInfo& local_info, bool skip_subsequent_node) + : GrpcMuxImpl(std::make_unique(dispatcher), skip_subsequent_node, + local_info, transport_api_version, std::move(async_client), dispatcher, + service_method, random, scope, rate_limit_settings) {} + +// GrpcStreamCallbacks for GrpcMuxDelta +void GrpcMuxDelta::requestOnDemandUpdate(const std::string& type_url, + const absl::flat_hash_set& for_update) { + auto& sub = subscriptionStateFor(type_url); + sub.updateSubscriptionInterest(for_update, {}); + // Tell the server about our change in interest, if any. + if (sub.subscriptionUpdatePending()) { + trySendDiscoveryRequests(); + } +} + +GrpcMuxSotw::GrpcMuxSotw(Grpc::RawAsyncClientPtr&& async_client, Event::Dispatcher& dispatcher, + const Protobuf::MethodDescriptor& service_method, + envoy::config::core::v3::ApiVersion transport_api_version, + Random::RandomGenerator& random, Stats::Scope& scope, + const RateLimitSettings& rate_limit_settings, + const LocalInfo::LocalInfo& local_info, bool skip_subsequent_node) + : GrpcMuxImpl(std::make_unique(dispatcher), skip_subsequent_node, + local_info, transport_api_version, std::move(async_client), dispatcher, + service_method, random, scope, rate_limit_settings) {} + +Config::GrpcMuxWatchPtr NullGrpcMuxImpl::addWatch(const std::string&, + const absl::flat_hash_set&, + SubscriptionCallbacks&, OpaqueResourceDecoder&, + const SubscriptionOptions&) { + throw EnvoyException("ADS must be configured to support an ADS config source"); +} + +} // namespace XdsMux +} // namespace Config +} // namespace Envoy diff --git a/source/common/config/xds_mux/grpc_mux_impl.h b/source/common/config/xds_mux/grpc_mux_impl.h new file mode 100644 index 000000000000..a1ec7f2332dd --- /dev/null +++ b/source/common/config/xds_mux/grpc_mux_impl.h @@ -0,0 +1,270 @@ +#pragma once + +#include +#include +#include + +#include "envoy/common/random_generator.h" +#include "envoy/common/time.h" +#include "envoy/common/token_bucket.h" +#include "envoy/config/grpc_mux.h" +#include "envoy/config/subscription.h" +#include "envoy/event/dispatcher.h" +#include "envoy/grpc/status.h" +#include "envoy/service/discovery/v3/discovery.pb.h" +#include "envoy/upstream/cluster_manager.h" + +#include "source/common/common/logger.h" +#include "source/common/common/utility.h" +#include "source/common/config/api_version.h" +#include "source/common/config/grpc_stream.h" +#include "source/common/config/pausable_ack_queue.h" +#include "source/common/config/watch_map.h" +#include "source/common/config/xds_mux/delta_subscription_state.h" +#include "source/common/config/xds_mux/sotw_subscription_state.h" +#include "source/common/grpc/common.h" + +#include "absl/container/node_hash_map.h" + +namespace Envoy { +namespace Config { +namespace XdsMux { + +class ShutdownableMux { +public: + virtual ~ShutdownableMux() = default; + virtual void shutdown() PURE; +}; + +// Manages subscriptions to one or more type of resource. The logical protocol +// state of those subscription(s) is handled by SubscriptionState. +// This class owns the GrpcStream used to talk to the server, maintains queuing +// logic to properly order the subscription(s)' various messages, and allows +// starting/stopping/pausing of the subscriptions. +// +// @tparam S SubscriptionState state type, either SotwSubscriptionState or DeltaSubscriptionState +// @tparam F SubscriptionStateFactory type, either SotwSubscriptionStateFactory or +// DeltaSubscriptionStateFactory +// @tparam RQ Xds request type, either envoy::service::discovery::v3::DiscoveryRequest or +// envoy::service::discovery::v3::DeltaDiscoveryRequest +// @tparam RS Xds response type, either envoy::service::discovery::v3::DiscoveryResponse or +// envoy::service::discovery::v3::DeltaDiscoveryResponse +// +template +class GrpcMuxImpl : public GrpcStreamCallbacks, + public GrpcMux, + public ShutdownableMux, + Logger::Loggable { +public: + GrpcMuxImpl(std::unique_ptr subscription_state_factory, bool skip_subsequent_node, + const LocalInfo::LocalInfo& local_info, + envoy::config::core::v3::ApiVersion transport_api_version, + Grpc::RawAsyncClientPtr&& async_client, Event::Dispatcher& dispatcher, + const Protobuf::MethodDescriptor& service_method, Random::RandomGenerator& random, + Stats::Scope& scope, const RateLimitSettings& rate_limit_settings); + + ~GrpcMuxImpl() override; + + // Causes all GrpcMuxImpl objects to stop sending any messages on `grpc_stream_` to fix a crash + // on Envoy shutdown due to dangling pointers. This may not be the ideal fix; it is probably + // preferable for the `ServerImpl` to cause all configuration subscriptions to be shutdown, which + // would then cause all `GrpcMuxImpl` to be destructed. + // TODO: figure out the correct fix: https://github.com/envoyproxy/envoy/issues/15072. + static void shutdownAll(); + + void shutdown() override { shutdown_ = true; } + + // TODO (dmitri-d) return a naked pointer instead of the wrapper once the legacy mux has been + // removed and the mux interface can be changed + Config::GrpcMuxWatchPtr addWatch(const std::string& type_url, + const absl::flat_hash_set& resources, + SubscriptionCallbacks& callbacks, + OpaqueResourceDecoder& resource_decoder, + const SubscriptionOptions& options) override; + void updateWatch(const std::string& type_url, Watch* watch, + const absl::flat_hash_set& resources, + const SubscriptionOptions& options); + void removeWatch(const std::string& type_url, Watch* watch); + + ScopedResume pause(const std::string& type_url) override; + ScopedResume pause(const std::vector type_urls) override; + void start() override; + const absl::flat_hash_map>& subscriptions() const { + return subscriptions_; + } + bool isUnified() const override { return true; } + + // GrpcStreamCallbacks + void onStreamEstablished() override { handleEstablishedStream(); } + void onEstablishmentFailure() override { handleStreamEstablishmentFailure(); } + void onWriteable() override { trySendDiscoveryRequests(); } + void onDiscoveryResponse(std::unique_ptr&& message, + ControlPlaneStats& control_plane_stats) override { + genericHandleResponse(message->type_url(), *message, control_plane_stats); + } + + GrpcStream& grpcStreamForTest() { return grpc_stream_; } + +protected: + class WatchImpl : public Envoy::Config::GrpcMuxWatch { + public: + WatchImpl(const std::string& type_url, Watch* watch, GrpcMuxImpl& parent, + const SubscriptionOptions& options) + : type_url_(type_url), watch_(watch), parent_(parent), options_(options) {} + + ~WatchImpl() override { remove(); } + + void remove() { + if (watch_) { + parent_.removeWatch(type_url_, watch_); + watch_ = nullptr; + } + } + + void update(const absl::flat_hash_set& resources) override { + parent_.updateWatch(type_url_, watch_, resources, options_); + } + + private: + const std::string type_url_; + Watch* watch_; + GrpcMuxImpl& parent_; + const SubscriptionOptions options_; + }; + + void sendGrpcMessage(RQ& msg_proto, S& sub_state); + void maybeUpdateQueueSizeStat(uint64_t size) { grpc_stream_.maybeUpdateQueueSizeStat(size); } + bool grpcStreamAvailable() { return grpc_stream_.grpcStreamAvailable(); } + bool rateLimitAllowsDrain() { return grpc_stream_.checkRateLimitAllowsDrain(); } + void sendMessage(RQ& msg_proto) { grpc_stream_.sendMessage(msg_proto); } + + S& subscriptionStateFor(const std::string& type_url); + WatchMap& watchMapFor(const std::string& type_url); + void handleEstablishedStream(); + void handleStreamEstablishmentFailure(); + void genericHandleResponse(const std::string& type_url, const RS& response_proto, + ControlPlaneStats& control_plane_stats); + void trySendDiscoveryRequests(); + bool skipSubsequentNode() const { return skip_subsequent_node_; } + bool anyRequestSentYetInCurrentStream() const { return any_request_sent_yet_in_current_stream_; } + void setAnyRequestSentYetInCurrentStream(bool value) { + any_request_sent_yet_in_current_stream_ = value; + } + const LocalInfo::LocalInfo& localInfo() const { return local_info_; } + const envoy::config::core::v3::ApiVersion& transportApiVersion() const { + return transport_api_version_; + } + +private: + // Checks whether external conditions allow sending a DeltaDiscoveryRequest. (Does not check + // whether we *want* to send a (Delta)DiscoveryRequest). + bool canSendDiscoveryRequest(const std::string& type_url); + + // Checks whether we have something to say in a (Delta)DiscoveryRequest, which can be an ACK + // and/or a subscription update. (Does not check whether we *can* send that + // (Delta)DiscoveryRequest). Returns the type_url we should send the DeltaDiscoveryRequest for (if + // any). First, prioritizes ACKs over non-ACK subscription interest updates. Then, prioritizes + // non-ACK updates in the order the various types of subscriptions were activated (as tracked by + // subscription_ordering_). + absl::optional whoWantsToSendDiscoveryRequest(); + + // Invoked when dynamic context parameters change for a resource type. + void onDynamicContextUpdate(absl::string_view resource_type_url); + + GrpcStream grpc_stream_; + + // Resource (N)ACKs we're waiting to send, stored in the order that they should be sent in. All + // of our different resource types' ACKs are mixed together in this queue. See class for + // description of how it interacts with pause() and resume(). + PausableAckQueue pausable_ack_queue_; + + // Makes SubscriptionStates, to be held in the subscriptions_ map. Whether this GrpcMux is doing + // delta or state of the world xDS is determined by which concrete subclass this variable gets. + std::unique_ptr subscription_state_factory_; + + // Map key is type_url. + // Only addWatch() should insert into these maps. + absl::flat_hash_map> subscriptions_; + absl::flat_hash_map> watch_maps_; + + // Determines the order of initial discovery requests. (Assumes that subscriptions are added + // to this GrpcMux in the order of Envoy's dependency ordering). + std::list subscription_ordering_; + + // Whether to enable the optimization of only including the node field in the very first + // discovery request in an xDS gRPC stream (really just one: *not* per-type_url). + const bool skip_subsequent_node_; + + // State to help with skip_subsequent_node's logic. + bool any_request_sent_yet_in_current_stream_{}; + + // Used to populate the (Delta)DiscoveryRequest's node field. That field is the same across + // all type_urls, and moreover, the 'skip_subsequent_node' logic needs to operate across all + // the type_urls. So, while the SubscriptionStates populate every other field of these messages, + // this one is up to GrpcMux. + const LocalInfo::LocalInfo& local_info_; + Common::CallbackHandlePtr dynamic_update_callback_handle_; + const envoy::config::core::v3::ApiVersion transport_api_version_; + + // True iff Envoy is shutting down; no messages should be sent on the `grpc_stream_` when this is + // true because it may contain dangling pointers. + std::atomic shutdown_{false}; +}; + +class GrpcMuxDelta : public GrpcMuxImpl { +public: + GrpcMuxDelta(Grpc::RawAsyncClientPtr&& async_client, Event::Dispatcher& dispatcher, + const Protobuf::MethodDescriptor& service_method, + envoy::config::core::v3::ApiVersion transport_api_version, + Random::RandomGenerator& random, Stats::Scope& scope, + const RateLimitSettings& rate_limit_settings, const LocalInfo::LocalInfo& local_info, + bool skip_subsequent_node); + + // GrpcStreamCallbacks + void requestOnDemandUpdate(const std::string& type_url, + const absl::flat_hash_set& for_update) override; +}; + +class GrpcMuxSotw : public GrpcMuxImpl { +public: + GrpcMuxSotw(Grpc::RawAsyncClientPtr&& async_client, Event::Dispatcher& dispatcher, + const Protobuf::MethodDescriptor& service_method, + envoy::config::core::v3::ApiVersion transport_api_version, + Random::RandomGenerator& random, Stats::Scope& scope, + const RateLimitSettings& rate_limit_settings, const LocalInfo::LocalInfo& local_info, + bool skip_subsequent_node); + + // GrpcStreamCallbacks + void requestOnDemandUpdate(const std::string&, const absl::flat_hash_set&) override { + NOT_IMPLEMENTED_GCOVR_EXCL_LINE; + } +}; + +class NullGrpcMuxImpl : public GrpcMux { +public: + void start() override {} + + ScopedResume pause(const std::string&) override { + return std::make_unique([]() {}); + } + ScopedResume pause(const std::vector) override { + return std::make_unique([]() {}); + } + + Config::GrpcMuxWatchPtr addWatch(const std::string&, const absl::flat_hash_set&, + SubscriptionCallbacks&, OpaqueResourceDecoder&, + const SubscriptionOptions&) override; + + // legacy mux interface not implemented by unified mux. + void requestOnDemandUpdate(const std::string&, const absl::flat_hash_set&) override { + NOT_IMPLEMENTED_GCOVR_EXCL_LINE; + } +}; + +} // namespace XdsMux +} // namespace Config +} // namespace Envoy diff --git a/source/common/config/xds_mux/sotw_subscription_state.cc b/source/common/config/xds_mux/sotw_subscription_state.cc index bb7a9f4c3a9c..90be3a318a2a 100644 --- a/source/common/config/xds_mux/sotw_subscription_state.cc +++ b/source/common/config/xds_mux/sotw_subscription_state.cc @@ -70,7 +70,7 @@ void SotwSubscriptionState::handleGoodResponse( } // TODO (dmitri-d) to eliminate decoding of resources twice consider expanding the interface to - // support passing of decoded resources + // support passing of decoded resources. This would also avoid a resource copy above. callbacks().onConfigUpdate(non_heartbeat_resources, message.version_info()); // Now that we're passed onConfigUpdate() without an exception thrown, we know we're good. last_good_version_info_ = message.version_info(); diff --git a/source/common/config/xds_mux/sotw_subscription_state.h b/source/common/config/xds_mux/sotw_subscription_state.h index 4d191fb93a3c..86063198f5a7 100644 --- a/source/common/config/xds_mux/sotw_subscription_state.h +++ b/source/common/config/xds_mux/sotw_subscription_state.h @@ -62,6 +62,21 @@ class SotwSubscriptionState absl::flat_hash_set names_tracked_; }; +class SotwSubscriptionStateFactory : public SubscriptionStateFactory { +public: + SotwSubscriptionStateFactory(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {} + ~SotwSubscriptionStateFactory() override = default; + std::unique_ptr + makeSubscriptionState(const std::string& type_url, UntypedConfigUpdateCallbacks& callbacks, + OpaqueResourceDecoder& resource_decoder, const bool) override { + return std::make_unique(type_url, callbacks, dispatcher_, + resource_decoder); + } + +private: + Event::Dispatcher& dispatcher_; +}; + } // namespace XdsMux } // namespace Config } // namespace Envoy diff --git a/source/common/config/xds_mux/subscription_state.h b/source/common/config/xds_mux/subscription_state.h index 660798974576..9f9b48cd7723 100644 --- a/source/common/config/xds_mux/subscription_state.h +++ b/source/common/config/xds_mux/subscription_state.h @@ -47,6 +47,9 @@ class BaseSubscriptionState : public SubscriptionState, void clearDynamicContextChanged() { dynamic_context_changed_ = false; } bool dynamicContextChanged() const { return dynamic_context_changed_; } + void setControlPlaneIdentifier(const std::string& id) { control_plane_identifier_ = id; } + std::string& controlPlaneIdentifier() { return control_plane_identifier_; } + // Whether there was a change in our subscription interest we have yet to inform the server of. virtual bool subscriptionUpdatePending() const PURE; @@ -111,6 +114,17 @@ class BaseSubscriptionState : public SubscriptionState, UntypedConfigUpdateCallbacks& callbacks_; Event::Dispatcher& dispatcher_; bool dynamic_context_changed_{}; + std::string control_plane_identifier_{}; +}; + +template class SubscriptionStateFactory { +public: + virtual ~SubscriptionStateFactory() = default; + // Note that, outside of tests, we expect callbacks to always be a WatchMap. + virtual std::unique_ptr makeSubscriptionState(const std::string& type_url, + UntypedConfigUpdateCallbacks& callbacks, + OpaqueResourceDecoder& resource_decoder, + const bool wildcard) PURE; }; } // namespace XdsMux diff --git a/test/common/config/BUILD b/test/common/config/BUILD index 7f2f20924956..f662aaa300cc 100644 --- a/test/common/config/BUILD +++ b/test/common/config/BUILD @@ -176,8 +176,37 @@ envoy_cc_test( "//source/common/config:new_grpc_mux_lib", "//source/common/config:protobuf_link_hacks", "//source/common/config:version_converter_lib", + "//source/common/config/xds_mux:grpc_mux_lib", "//source/common/protobuf", "//test/common/stats:stat_test_utility_lib", + "//test/config:v2_link_hacks", + "//test/mocks:common_lib", + "//test/mocks/config:config_mocks", + "//test/mocks/event:event_mocks", + "//test/mocks/grpc:grpc_mocks", + "//test/mocks/local_info:local_info_mocks", + "//test/mocks/runtime:runtime_mocks", + "//test/test_common:logging_lib", + "//test/test_common:resources_lib", + "//test/test_common:simulated_time_system_lib", + "//test/test_common:test_runtime_lib", + "//test/test_common:utility_lib", + "@envoy_api//envoy/config/endpoint/v3:pkg_cc_proto", + "@envoy_api//envoy/service/discovery/v3:pkg_cc_proto", + ], +) + +envoy_cc_test( + name = "xds_grpc_mux_impl_test", + srcs = ["xds_grpc_mux_impl_test.cc"], + deps = [ + "//source/common/config:protobuf_link_hacks", + "//source/common/config:resource_name_lib", + "//source/common/config:version_converter_lib", + "//source/common/config/xds_mux:grpc_mux_lib", + "//source/common/protobuf", + "//test/common/stats:stat_test_utility_lib", + "//test/config:v2_link_hacks", "//test/mocks:common_lib", "//test/mocks/config:config_mocks", "//test/mocks/event:event_mocks", @@ -189,6 +218,7 @@ envoy_cc_test( "//test/test_common:simulated_time_system_lib", "//test/test_common:test_runtime_lib", "//test/test_common:utility_lib", + "@envoy_api//envoy/api/v2:pkg_cc_proto", "@envoy_api//envoy/config/endpoint/v3:pkg_cc_proto", "@envoy_api//envoy/service/discovery/v3:pkg_cc_proto", ], diff --git a/test/common/config/new_grpc_mux_impl_test.cc b/test/common/config/new_grpc_mux_impl_test.cc index 8312e31afbf6..b417e99377ef 100644 --- a/test/common/config/new_grpc_mux_impl_test.cc +++ b/test/common/config/new_grpc_mux_impl_test.cc @@ -10,9 +10,11 @@ #include "source/common/config/protobuf_link_hacks.h" #include "source/common/config/utility.h" #include "source/common/config/version_converter.h" +#include "source/common/config/xds_mux/grpc_mux_impl.h" #include "source/common/protobuf/protobuf.h" #include "test/common/stats/stat_test_utility.h" +#include "test/config/v2_link_hacks.h" #include "test/mocks/common.h" #include "test/mocks/config/mocks.h" #include "test/mocks/event/mocks.h" @@ -41,17 +43,29 @@ namespace Envoy { namespace Config { namespace { +enum class LegacyOrUnified { Legacy, Unified }; + // We test some mux specific stuff below, other unit test coverage for singleton use of // NewGrpcMuxImpl is provided in [grpc_]subscription_impl_test.cc. -class NewGrpcMuxImplTestBase : public testing::Test { +class NewGrpcMuxImplTestBase : public testing::TestWithParam { public: - NewGrpcMuxImplTestBase() + NewGrpcMuxImplTestBase(LegacyOrUnified legacy_or_unified) : async_client_(new Grpc::MockAsyncClient()), control_plane_stats_(Utility::generateControlPlaneStats(stats_)), control_plane_connected_state_( - stats_.gauge("control_plane.connected_state", Stats::Gauge::ImportMode::NeverImport)) {} + stats_.gauge("control_plane.connected_state", Stats::Gauge::ImportMode::NeverImport)), + should_use_unified_(legacy_or_unified == LegacyOrUnified::Unified) {} void setup() { + if (isUnifiedMuxTest()) { + grpc_mux_ = std::make_unique( + std::unique_ptr(async_client_), dispatcher_, + *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( + "envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"), + envoy::config::core::v3::ApiVersion::AUTO, random_, stats_, rate_limit_settings_, + local_info_, false); + return; + } grpc_mux_ = std::make_unique( std::unique_ptr(async_client_), dispatcher_, *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( @@ -88,12 +102,60 @@ class NewGrpcMuxImplTestBase : public testing::Test { EXPECT_CALL(async_stream_, sendMessageRaw_(Grpc::ProtoBufferEq(expected_request), false)); } + void remoteClose() { + if (isUnifiedMuxTest()) { + dynamic_cast(grpc_mux_.get()) + ->grpcStreamForTest() + .onRemoteClose(Grpc::Status::WellKnownGrpcStatus::Canceled, ""); + return; + } + dynamic_cast(grpc_mux_.get()) + ->grpcStreamForTest() + .onRemoteClose(Grpc::Status::WellKnownGrpcStatus::Canceled, ""); + } + + void onDiscoveryResponse( + std::unique_ptr&& response) { + if (isUnifiedMuxTest()) { + dynamic_cast(grpc_mux_.get()) + ->onDiscoveryResponse(std::move(response), control_plane_stats_); + return; + } + dynamic_cast(grpc_mux_.get()) + ->onDiscoveryResponse(std::move(response), control_plane_stats_); + } + + void shutdownMux() { + if (isUnifiedMuxTest()) { + dynamic_cast(grpc_mux_.get())->shutdown(); + return; + } + dynamic_cast(grpc_mux_.get())->shutdown(); + } + + // the code is duplicated here, but all calls other than the check in return statement, return + // different types. + bool subscriptionExists(const std::string& type_url) const { + if (isUnifiedMuxTest()) { + auto* mux = dynamic_cast(grpc_mux_.get()); + auto& subscriptions = mux->subscriptions(); + auto sub = subscriptions.find(type_url); + return sub != subscriptions.end(); + } + auto* mux = dynamic_cast(grpc_mux_.get()); + auto& subscriptions = mux->subscriptions(); + auto sub = subscriptions.find(type_url); + return sub != subscriptions.end(); + } + + bool isUnifiedMuxTest() const { return should_use_unified_; } + NiceMock dispatcher_; NiceMock random_; Grpc::MockAsyncClient* async_client_; NiceMock async_stream_; NiceMock local_info_; - NewGrpcMuxImplPtr grpc_mux_; + std::unique_ptr grpc_mux_; NiceMock callbacks_; TestUtility::TestOpaqueResourceDecoderImpl resource_decoder_{"cluster_name"}; @@ -101,15 +163,20 @@ class NewGrpcMuxImplTestBase : public testing::Test { Envoy::Config::RateLimitSettings rate_limit_settings_; ControlPlaneStats control_plane_stats_; Stats::Gauge& control_plane_connected_state_; + bool should_use_unified_; }; class NewGrpcMuxImplTest : public NewGrpcMuxImplTestBase { public: + NewGrpcMuxImplTest() : NewGrpcMuxImplTestBase(GetParam()) {} Event::SimulatedTimeSystem time_system_; }; +INSTANTIATE_TEST_SUITE_P(NewGrpcMuxImplTest, NewGrpcMuxImplTest, + testing::ValuesIn({LegacyOrUnified::Legacy, LegacyOrUnified::Unified})); + // Validate behavior when dynamic context parameters are updated. -TEST_F(NewGrpcMuxImplTest, DynamicContextParameters) { +TEST_P(NewGrpcMuxImplTest, DynamicContextParameters) { setup(); InSequence s; auto foo_sub = grpc_mux_->addWatch("foo", {"x", "y"}, callbacks_, resource_decoder_, {}); @@ -126,11 +193,14 @@ TEST_F(NewGrpcMuxImplTest, DynamicContextParameters) { // Update to bar type should resend Node. expectSendMessage("bar", {}, {}); local_info_.context_provider_.update_cb_handler_.runCallbacks("bar"); + expectSendMessage("foo", {}, {"x", "y"}); } // Validate cached nonces are cleared on reconnection. -TEST_F(NewGrpcMuxImplTest, ReconnectionResetsNonceAndAcks) { +// TODO (dmitri-d) remove this test when legacy implementations have been removed +// common mux functionality is tested in xds_grpc_mux_impl_test.cc +TEST_P(NewGrpcMuxImplTest, ReconnectionResetsNonceAndAcks) { Event::MockTimer* grpc_stream_retry_timer{new Event::MockTimer()}; Event::MockTimer* ttl_mgr_timer{new NiceMock()}; Event::TimerCb grpc_stream_retry_timer_cb; @@ -164,7 +234,7 @@ TEST_F(NewGrpcMuxImplTest, ReconnectionResetsNonceAndAcks) { add_response_resource("y", "3000", *response); // Pause EDS to allow the ACK to be cached. auto resume_eds = grpc_mux_->pause(type_url); - grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + onDiscoveryResponse(std::move(response)); // Now disconnect. // Grpc stream retry timer will kick in and reconnection will happen. EXPECT_CALL(*grpc_stream_retry_timer, enableTimer(_, _)) @@ -173,14 +243,14 @@ TEST_F(NewGrpcMuxImplTest, ReconnectionResetsNonceAndAcks) { // initial_resource_versions should contain client side all resource:version info. expectSendMessage(type_url, {"x", "y"}, {}, "", Grpc::Status::WellKnownGrpcStatus::Ok, "", {{"x", "2000"}, {"y", "3000"}}); - grpc_mux_->grpcStreamForTest().onRemoteClose(Grpc::Status::WellKnownGrpcStatus::Canceled, ""); - // Destruction of the EDS subscription will issue an "unsubscribe" request. + remoteClose(); + expectSendMessage(type_url, {}, {"x", "y"}); } // Validate resources are not sent on wildcard watch reconnection. // Regression test of https://github.com/envoyproxy/envoy/issues/16063. -TEST_F(NewGrpcMuxImplTest, ReconnectionResetsWildcardSubscription) { +TEST_P(NewGrpcMuxImplTest, ReconnectionResetsWildcardSubscription) { Event::MockTimer* grpc_stream_retry_timer{new Event::MockTimer()}; Event::MockTimer* ttl_mgr_timer{new NiceMock()}; Event::TimerCb grpc_stream_retry_timer_cb; @@ -231,7 +301,7 @@ TEST_F(NewGrpcMuxImplTest, ReconnectionResetsWildcardSubscription) { })); // Expect an ack with the nonce. expectSendMessage(type_url, {}, {}, "111"); - grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + onDiscoveryResponse(std::move(response)); } // Send another response with a different resource, but where EDS is paused. auto resume_eds = grpc_mux_->pause(type_url); @@ -246,7 +316,7 @@ TEST_F(NewGrpcMuxImplTest, ReconnectionResetsWildcardSubscription) { TestUtility::protoEqual(added_resources[0].get().resource(), load_assignment)); })); // No ack reply is expected in this case, as EDS is suspended. - grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + onDiscoveryResponse(std::move(response)); } // Now disconnect. @@ -258,12 +328,12 @@ TEST_F(NewGrpcMuxImplTest, ReconnectionResetsWildcardSubscription) { // added resources because this is a wildcard request. expectSendMessage(type_url, {}, {}, "", Grpc::Status::WellKnownGrpcStatus::Ok, "", {{"x", "1000"}, {"y", "2000"}}); - grpc_mux_->grpcStreamForTest().onRemoteClose(Grpc::Status::WellKnownGrpcStatus::Canceled, ""); + remoteClose(); // Destruction of wildcard will not issue unsubscribe requests for the resources. } // Test that we simply ignore a message for an unknown type_url, with no ill effects. -TEST_F(NewGrpcMuxImplTest, DiscoveryResponseNonexistentSub) { +TEST_P(NewGrpcMuxImplTest, DiscoveryResponseNonexistentSub) { setup(); const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; @@ -279,7 +349,7 @@ TEST_F(NewGrpcMuxImplTest, DiscoveryResponseNonexistentSub) { unexpected_response->set_system_version_info("0"); // empty response should call onConfigUpdate on wildcard watch EXPECT_CALL(callbacks_, onConfigUpdate(_, _, "0")); - grpc_mux_->onDiscoveryResponse(std::move(unexpected_response), control_plane_stats_); + onDiscoveryResponse(std::move(unexpected_response)); } { auto response = std::make_unique(); @@ -296,13 +366,13 @@ TEST_F(NewGrpcMuxImplTest, DiscoveryResponseNonexistentSub) { EXPECT_TRUE( TestUtility::protoEqual(added_resources[0].get().resource(), load_assignment)); })); - grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + onDiscoveryResponse(std::move(response)); } } // DeltaDiscoveryResponse that comes in response to an on-demand request updates the watch with // resource's name. The watch is initially created with an alias used in the on-demand request. -TEST_F(NewGrpcMuxImplTest, ConfigUpdateWithAliases) { +TEST_P(NewGrpcMuxImplTest, ConfigUpdateWithAliases) { setup(); const std::string& type_url = Config::TypeUrl::get().VirtualHost; @@ -329,20 +399,17 @@ TEST_F(NewGrpcMuxImplTest, ConfigUpdateWithAliases) { response->mutable_resources()->at(0).add_aliases("prefix/domain2.test"); EXPECT_LOG_CONTAINS("debug", "for " + type_url + " from HAL 9000", - grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_)); - - const auto& subscriptions = grpc_mux_->subscriptions(); - auto sub = subscriptions.find(type_url); - - EXPECT_TRUE(sub != subscriptions.end()); + onDiscoveryResponse(std::move(response))); + EXPECT_TRUE(subscriptionExists(type_url)); watch->update({}); + EXPECT_EQ("HAL 9000", stats_.textReadout("control_plane.identifier").value()); } // DeltaDiscoveryResponse that comes in response to an on-demand request that couldn't be resolved // will contain an empty Resource. The Resource's aliases field will be populated with the alias // originally used in the request. -TEST_F(NewGrpcMuxImplTest, ConfigUpdateWithNotFoundResponse) { +TEST_P(NewGrpcMuxImplTest, ConfigUpdateWithNotFoundResponse) { setup(); const std::string& type_url = Config::TypeUrl::get().VirtualHost; @@ -363,7 +430,7 @@ TEST_F(NewGrpcMuxImplTest, ConfigUpdateWithNotFoundResponse) { } // Validate basic gRPC mux subscriptions to xdstp:// glob collections. -TEST_F(NewGrpcMuxImplTest, XdsTpGlobCollection) { +TEST_P(NewGrpcMuxImplTest, XdsTpGlobCollection) { setup(); const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; @@ -398,11 +465,11 @@ TEST_F(NewGrpcMuxImplTest, XdsTpGlobCollection) { EXPECT_EQ(1, added_resources.size()); EXPECT_TRUE(TestUtility::protoEqual(added_resources[0].get().resource(), load_assignment)); })); - grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + onDiscoveryResponse(std::move(response)); } // Validate basic gRPC mux subscriptions to xdstp:// singletons. -TEST_F(NewGrpcMuxImplTest, XdsTpSingleton) { +TEST_P(NewGrpcMuxImplTest, XdsTpSingleton) { setup(); const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; @@ -455,7 +522,35 @@ TEST_F(NewGrpcMuxImplTest, XdsTpSingleton) { EXPECT_TRUE(TestUtility::protoEqual(added_resources[1].get().resource(), load_assignment)); EXPECT_TRUE(TestUtility::protoEqual(added_resources[2].get().resource(), load_assignment)); })); - grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + onDiscoveryResponse(std::move(response)); +} + +TEST_P(NewGrpcMuxImplTest, RequestOnDemandUpdate) { + setup(); + + auto foo_sub = grpc_mux_->addWatch("foo", {"x", "y"}, callbacks_, resource_decoder_, {}); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage("foo", {"x", "y"}, {}); + grpc_mux_->start(); + + expectSendMessage("foo", {"z"}, {}); + grpc_mux_->requestOnDemandUpdate("foo", {"z"}); + + expectSendMessage("foo", {}, {"x", "y"}); +} + +TEST_P(NewGrpcMuxImplTest, Shutdown) { + setup(); + InSequence s; + auto foo_sub = grpc_mux_->addWatch("foo", {"x", "y"}, callbacks_, resource_decoder_, {}); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage("foo", {"x", "y"}, {}); + grpc_mux_->start(); + + shutdownMux(); + auto bar_sub = grpc_mux_->addWatch("bar", {"z"}, callbacks_, resource_decoder_, {}); + // We do not expect any messages to be sent here as the mux has been shutdown + // There won't be any unsubscribe messages for the legacy mux either for the same reason } } // namespace diff --git a/test/common/config/xds_grpc_mux_impl_test.cc b/test/common/config/xds_grpc_mux_impl_test.cc new file mode 100644 index 000000000000..0e2f40a2256f --- /dev/null +++ b/test/common/config/xds_grpc_mux_impl_test.cc @@ -0,0 +1,943 @@ +#include + +#include "envoy/api/v2/discovery.pb.h" +#include "envoy/config/endpoint/v3/endpoint.pb.h" +#include "envoy/config/endpoint/v3/endpoint.pb.validate.h" +#include "envoy/event/timer.h" +#include "envoy/service/discovery/v3/discovery.pb.h" + +#include "source/common/common/empty_string.h" +#include "source/common/config/protobuf_link_hacks.h" +#include "source/common/config/resource_name.h" +#include "source/common/config/utility.h" +#include "source/common/config/version_converter.h" +#include "source/common/config/xds_mux/grpc_mux_impl.h" +#include "source/common/protobuf/protobuf.h" + +#include "test/common/stats/stat_test_utility.h" +#include "test/config/v2_link_hacks.h" +#include "test/mocks/common.h" +#include "test/mocks/config/mocks.h" +#include "test/mocks/event/mocks.h" +#include "test/mocks/grpc/mocks.h" +#include "test/mocks/local_info/mocks.h" +#include "test/mocks/runtime/mocks.h" +#include "test/test_common/logging.h" +#include "test/test_common/resources.h" +#include "test/test_common/simulated_time_system.h" +#include "test/test_common/test_time.h" +#include "test/test_common/utility.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::_; +using testing::AtLeast; +using testing::InSequence; +using testing::Invoke; +using testing::IsSubstring; +using testing::NiceMock; +using testing::Return; +using testing::ReturnRef; + +namespace Envoy { +namespace Config { +namespace XdsMux { +namespace { + +// We test some mux specific stuff below, other unit test coverage for singleton use of GrpcMuxImpl +// is provided in [grpc_]subscription_impl_test.cc. +class GrpcMuxImplTestBase : public testing::Test { +public: + GrpcMuxImplTestBase() + : async_client_(new Grpc::MockAsyncClient()), + control_plane_stats_(Utility::generateControlPlaneStats(stats_)), + control_plane_connected_state_( + stats_.gauge("control_plane.connected_state", Stats::Gauge::ImportMode::NeverImport)), + control_plane_pending_requests_(stats_.gauge("control_plane.pending_requests", + Stats::Gauge::ImportMode::NeverImport)) {} + + void setup() { + grpc_mux_ = std::make_unique( + std::unique_ptr(async_client_), dispatcher_, + *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( + "envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"), + envoy::config::core::v3::ApiVersion::AUTO, random_, stats_, rate_limit_settings_, + local_info_, true); + } + + void setup(const RateLimitSettings& custom_rate_limit_settings) { + grpc_mux_ = std::make_unique( + std::unique_ptr(async_client_), dispatcher_, + *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( + "envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"), + envoy::config::core::v3::ApiVersion::AUTO, random_, stats_, custom_rate_limit_settings, + local_info_, true); + } + + void expectSendMessage(const std::string& type_url, + const std::vector& resource_names, const std::string& version, + bool first = false, const std::string& nonce = "", + const Protobuf::int32 error_code = Grpc::Status::WellKnownGrpcStatus::Ok, + const std::string& error_message = "") { + API_NO_BOOST(envoy::api::v2::DiscoveryRequest) expected_request; + if (first) { + expected_request.mutable_node()->CopyFrom(API_DOWNGRADE(local_info_.node())); + } + for (const auto& resource : resource_names) { + expected_request.add_resource_names(resource); + } + if (!version.empty()) { + expected_request.set_version_info(version); + } + expected_request.set_response_nonce(nonce); + expected_request.set_type_url(type_url); + if (error_code != Grpc::Status::WellKnownGrpcStatus::Ok) { + ::google::rpc::Status* error_detail = expected_request.mutable_error_detail(); + error_detail->set_code(error_code); + error_detail->set_message(error_message); + } + EXPECT_CALL( + async_stream_, + sendMessageRaw_(Grpc::ProtoBufferEqIgnoreRepeatedFieldOrdering(expected_request), false)); + } + + Config::GrpcMuxWatchPtr makeWatch(const std::string& type_url, + const absl::flat_hash_set& resources) { + return grpc_mux_->addWatch(type_url, resources, callbacks_, resource_decoder_, {}); + } + + Config::GrpcMuxWatchPtr makeWatch(const std::string& type_url, + const absl::flat_hash_set& resources, + NiceMock& callbacks, + Config::OpaqueResourceDecoder& resource_decoder) { + return grpc_mux_->addWatch(type_url, resources, callbacks, resource_decoder, {}); + } + + NiceMock dispatcher_; + NiceMock random_; + Grpc::MockAsyncClient* async_client_; + Grpc::MockAsyncStream async_stream_; + NiceMock local_info_; + std::unique_ptr grpc_mux_; + NiceMock callbacks_; + TestUtility::TestOpaqueResourceDecoderImpl + resource_decoder_{"cluster_name"}; + Stats::TestUtil::TestStore stats_; + ControlPlaneStats control_plane_stats_; + Envoy::Config::RateLimitSettings rate_limit_settings_; + Stats::Gauge& control_plane_connected_state_; + Stats::Gauge& control_plane_pending_requests_; +}; + +class GrpcMuxImplTest : public GrpcMuxImplTestBase { +public: + Event::SimulatedTimeSystem time_system_; +}; + +// Validate behavior when multiple type URL watches are maintained, watches are created/destroyed. +TEST_F(GrpcMuxImplTest, MultipleTypeUrlStreams) { + setup(); + InSequence s; + + auto foo_sub = makeWatch("type_url_foo", {"x", "y"}); + auto bar_sub = makeWatch("type_url_bar", {}); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage("type_url_foo", {"x", "y"}, "", true); + expectSendMessage("type_url_bar", {}, ""); + grpc_mux_->start(); + EXPECT_EQ(1, control_plane_connected_state_.value()); + expectSendMessage("type_url_bar", {"z"}, ""); + auto bar_z_sub = makeWatch("type_url_bar", {"z"}); + expectSendMessage("type_url_bar", {"zz", "z"}, ""); + auto bar_zz_sub = makeWatch("type_url_bar", {"zz"}); + expectSendMessage("type_url_bar", {"z"}, ""); + expectSendMessage("type_url_bar", {}, ""); + expectSendMessage("type_url_foo", {}, ""); +} + +// Validate behavior when multiple type URL watches are maintained and the stream is reset. +TEST_F(GrpcMuxImplTest, ResetStream) { + InSequence s; + + auto* timer = new Event::MockTimer(&dispatcher_); + // TTL timers. + new Event::MockTimer(&dispatcher_); + new Event::MockTimer(&dispatcher_); + new Event::MockTimer(&dispatcher_); + + setup(); + auto foo_sub = makeWatch("type_url_foo", {"x", "y"}); + auto bar_sub = makeWatch("type_url_bar", {}); + auto baz_sub = makeWatch("type_url_baz", {"z"}); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage("type_url_foo", {"x", "y"}, "", true); + expectSendMessage("type_url_bar", {}, ""); + expectSendMessage("type_url_baz", {"z"}, ""); + grpc_mux_->start(); + + // Send another message for foo so that the node is cleared in the cached request. + // This is to test that the the node is set again in the first message below. + expectSendMessage("type_url_foo", {"z", "x", "y"}, ""); + auto foo_z_sub = makeWatch("type_url_foo", {"z"}); + + EXPECT_CALL(callbacks_, + onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, _)) + .Times(4); + EXPECT_CALL(random_, random()); + EXPECT_CALL(*timer, enableTimer(_, _)); + grpc_mux_->grpcStreamForTest().onRemoteClose(Grpc::Status::WellKnownGrpcStatus::Canceled, ""); + EXPECT_EQ(0, control_plane_connected_state_.value()); + EXPECT_EQ(0, control_plane_pending_requests_.value()); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage("type_url_foo", {"z", "x", "y"}, "", true); + expectSendMessage("type_url_bar", {}, ""); + expectSendMessage("type_url_baz", {"z"}, ""); + expectSendMessage("type_url_foo", {"x", "y"}, ""); + timer->invokeCallback(); + + expectSendMessage("type_url_baz", {}, ""); + expectSendMessage("type_url_foo", {}, ""); +} + +// Validate pause-resume behavior. +TEST_F(GrpcMuxImplTest, PauseResume) { + setup(); + InSequence s; + GrpcMuxWatchPtr foo1; + GrpcMuxWatchPtr foo2; + GrpcMuxWatchPtr foo3; + auto foo = grpc_mux_->addWatch("type_url_foo", {"x", "y"}, callbacks_, resource_decoder_, {}); + { + ScopedResume a = grpc_mux_->pause("type_url_foo"); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + grpc_mux_->start(); + expectSendMessage("type_url_foo", {"x", "y"}, "", true); + } + { + ScopedResume a = grpc_mux_->pause("type_url_bar"); + expectSendMessage("type_url_foo", {"z", "x", "y"}, ""); + foo1 = grpc_mux_->addWatch("type_url_foo", {"z"}, callbacks_, resource_decoder_, {}); + } + { + ScopedResume a = grpc_mux_->pause("type_url_foo"); + foo2 = grpc_mux_->addWatch("type_url_foo", {"zz"}, callbacks_, resource_decoder_, {}); + expectSendMessage("type_url_foo", {"zz", "z", "x", "y"}, ""); + } + // When nesting, we only have a single resumption. + { + ScopedResume a = grpc_mux_->pause("type_url_foo"); + ScopedResume b = grpc_mux_->pause("type_url_foo"); + foo3 = grpc_mux_->addWatch("type_url_foo", {"zzz"}, callbacks_, resource_decoder_, {}); + expectSendMessage("type_url_foo", {"zzz", "zz", "z", "x", "y"}, ""); + } + + grpc_mux_->pause("type_url_foo")->cancel(); +} + +// Validate behavior when type URL mismatches occur. +TEST_F(GrpcMuxImplTest, TypeUrlMismatch) { + setup(); + + auto invalid_response = std::make_unique(); + InSequence s; + auto foo_sub = makeWatch("type_url_foo", {"x", "y"}); + + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage("type_url_foo", {"x", "y"}, "", true); + grpc_mux_->start(); + + { + auto response = std::make_unique(); + response->set_type_url("type_url_bar"); + response->set_version_info("bar-version"); + grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + } + + { + invalid_response->set_type_url("type_url_foo"); + invalid_response->set_version_info("foo-version"); + invalid_response->mutable_resources()->Add()->set_type_url("type_url_bar"); + EXPECT_CALL(callbacks_, onConfigUpdateFailed(_, _)) + .WillOnce(Invoke([](Envoy::Config::ConfigUpdateFailureReason, const EnvoyException* e) { + EXPECT_TRUE( + IsSubstring("", "", + "type URL type_url_bar embedded in an individual Any does not match the " + "message-wide type URL type_url_foo in DiscoveryResponse", + e->what())); + })); + + expectSendMessage( + "type_url_foo", {"x", "y"}, "", false, "", Grpc::Status::WellKnownGrpcStatus::Internal, + fmt::format("type URL type_url_bar embedded in an individual Any does not match the " + "message-wide type URL type_url_foo in DiscoveryResponse {}", + invalid_response->DebugString())); + grpc_mux_->onDiscoveryResponse(std::move(invalid_response), control_plane_stats_); + } + expectSendMessage("type_url_foo", {}, ""); +} + +TEST_F(GrpcMuxImplTest, RpcErrorMessageTruncated) { + setup(); + auto invalid_response = std::make_unique(); + InSequence s; + auto foo_sub = makeWatch("type_url_foo", {"x", "y"}); + + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage("type_url_foo", {"x", "y"}, "", true); + grpc_mux_->start(); + + { // Large error message sent back to management server is truncated. + const std::string very_large_type_url(1 << 20, 'A'); + invalid_response->set_type_url("type_url_foo"); + invalid_response->set_version_info("invalid"); + invalid_response->mutable_resources()->Add()->set_type_url(very_large_type_url); + EXPECT_CALL(callbacks_, onConfigUpdateFailed(_, _)) + .WillOnce(Invoke([&very_large_type_url](Envoy::Config::ConfigUpdateFailureReason, + const EnvoyException* e) { + EXPECT_TRUE( + IsSubstring("", "", + fmt::format("type URL {} embedded in an individual Any does not match " + "the message-wide type URL type_url_foo in DiscoveryResponse", + very_large_type_url), // Local error message is not truncated. + e->what())); + })); + expectSendMessage("type_url_foo", {"x", "y"}, "", false, "", + Grpc::Status::WellKnownGrpcStatus::Internal, + fmt::format("type URL {}...(truncated)", std::string(4087, 'A'))); + grpc_mux_->onDiscoveryResponse(std::move(invalid_response), control_plane_stats_); + } + expectSendMessage("type_url_foo", {}, ""); +} + +envoy::service::discovery::v3::Resource heartbeatResource(std::chrono::milliseconds ttl, + const std::string& name) { + envoy::service::discovery::v3::Resource resource; + + resource.mutable_ttl()->CopyFrom(Protobuf::util::TimeUtil::MillisecondsToDuration(ttl.count())); + resource.set_name(name); + + return resource; +} + +envoy::service::discovery::v3::Resource +resourceWithTtl(std::chrono::milliseconds ttl, + envoy::config::endpoint::v3::ClusterLoadAssignment& cla) { + envoy::service::discovery::v3::Resource resource; + resource.mutable_resource()->PackFrom(cla); + resource.mutable_ttl()->CopyFrom(Protobuf::util::TimeUtil::MillisecondsToDuration(ttl.count())); + + resource.set_name(cla.cluster_name()); + + return resource; +} +envoy::service::discovery::v3::Resource +resourceWithEmptyTtl(envoy::config::endpoint::v3::ClusterLoadAssignment& cla) { + envoy::service::discovery::v3::Resource resource; + resource.mutable_resource()->PackFrom(cla); + resource.set_name(cla.cluster_name()); + return resource; +} +// Validates the behavior when the TTL timer expires. +TEST_F(GrpcMuxImplTest, ResourceTTL) { + setup(); + + time_system_.setSystemTime(std::chrono::seconds(0)); + + TestUtility::TestOpaqueResourceDecoderImpl + resource_decoder("cluster_name"); + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + InSequence s; + auto* ttl_timer = new Event::MockTimer(&dispatcher_); + auto eds_sub = makeWatch(type_url, {"x"}, callbacks_, resource_decoder); + + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {"x"}, "", true); + grpc_mux_->start(); + + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("1"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("x"); + + auto wrapped_resource = resourceWithTtl(std::chrono::milliseconds(1000), load_assignment); + response->add_resources()->PackFrom(wrapped_resource); + + EXPECT_CALL(*ttl_timer, enabled()); + EXPECT_CALL(*ttl_timer, enableTimer(std::chrono::milliseconds(1000), _)); + EXPECT_CALL(callbacks_, onConfigUpdate(_, "1")) + .WillOnce(Invoke([](const std::vector& resources, const std::string&) { + EXPECT_EQ(1, resources.size()); + })); + expectSendMessage(type_url, {"x"}, "1"); + grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + } + + // Increase the TTL. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("1"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("x"); + auto wrapped_resource = resourceWithTtl(std::chrono::milliseconds(10000), load_assignment); + response->add_resources()->PackFrom(wrapped_resource); + + EXPECT_CALL(*ttl_timer, enabled()); + EXPECT_CALL(*ttl_timer, enableTimer(std::chrono::milliseconds(10000), _)); + EXPECT_CALL(callbacks_, onConfigUpdate(_, "1")) + .WillOnce(Invoke([](const std::vector& resources, const std::string&) { + EXPECT_EQ(1, resources.size()); + })); + // No update, just a change in TTL. + expectSendMessage(type_url, {"x"}, "1"); + grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + } + + // Refresh the TTL with a heartbeat response. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("1"); + auto wrapped_resource = heartbeatResource(std::chrono::milliseconds(10000), "x"); + response->add_resources()->PackFrom(wrapped_resource); + + EXPECT_CALL(*ttl_timer, enabled()); + EXPECT_CALL(callbacks_, onConfigUpdate(_, "1")) + .WillOnce(Invoke([](const std::vector& resources, const std::string&) { + EXPECT_TRUE(resources.empty()); + })); + + // No update, just a change in TTL. + expectSendMessage(type_url, {"x"}, "1"); + grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + } + + // Remove the TTL. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("1"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("x"); + response->add_resources()->PackFrom(resourceWithEmptyTtl(load_assignment)); + + EXPECT_CALL(*ttl_timer, disableTimer()); + EXPECT_CALL(callbacks_, onConfigUpdate(_, "1")) + .WillOnce(Invoke([](const std::vector& resources, const std::string&) { + EXPECT_EQ(1, resources.size()); + })); + expectSendMessage(type_url, {"x"}, "1"); + grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + } + + // Put the TTL back. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("1"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("x"); + auto wrapped_resource = resourceWithTtl(std::chrono::milliseconds(10000), load_assignment); + response->add_resources()->PackFrom(wrapped_resource); + + EXPECT_CALL(*ttl_timer, enabled()); + EXPECT_CALL(*ttl_timer, enableTimer(std::chrono::milliseconds(10000), _)); + EXPECT_CALL(callbacks_, onConfigUpdate(_, "1")) + .WillOnce(Invoke([](const std::vector& resources, const std::string&) { + EXPECT_EQ(1, resources.size()); + })); + // No update, just a change in TTL. + expectSendMessage(type_url, {"x"}, "1"); + grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + } + + time_system_.setSystemTime(std::chrono::seconds(11)); + EXPECT_CALL(callbacks_, onConfigUpdate(_, _, "")) + .WillOnce(Invoke([](auto, const auto& removed, auto) { + EXPECT_EQ(1, removed.size()); + EXPECT_EQ("x", removed.Get(0)); + })); + // Fire the TTL timer. + EXPECT_CALL(*ttl_timer, disableTimer()); + ttl_timer->invokeCallback(); + + expectSendMessage(type_url, {}, "1"); +} + +// Checks that the control plane identifier is logged +TEST_F(GrpcMuxImplTest, LogsControlPlaneIndentifier) { + setup(); + + std::string type_url = "foo"; + auto foo_sub = makeWatch(type_url, {}, callbacks_, resource_decoder_); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {}, "", true); + grpc_mux_->start(); + + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("1"); + response->mutable_control_plane()->set_identifier("control_plane_ID"); + + EXPECT_CALL(callbacks_, onConfigUpdate(_, _)); + expectSendMessage(type_url, {}, "1"); + EXPECT_LOG_CONTAINS("debug", "for foo from control_plane_ID", + grpc_mux_->grpcStreamForTest().onReceiveMessage(std::move(response))); + } + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("2"); + response->mutable_control_plane()->set_identifier("different_ID"); + + EXPECT_CALL(callbacks_, onConfigUpdate(_, _)); + expectSendMessage(type_url, {}, "2"); + EXPECT_LOG_CONTAINS("debug", "for foo from different_ID", + grpc_mux_->grpcStreamForTest().onReceiveMessage(std::move(response))); + } +} + +// Validate behavior when watches has an unknown resource name. +TEST_F(GrpcMuxImplTest, WildcardWatch) { + setup(); + + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + auto foo_sub = makeWatch(type_url, {}, callbacks_, resource_decoder_); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {}, "", true); + grpc_mux_->start(); + + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("1"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("x"); + response->add_resources()->PackFrom(load_assignment); + EXPECT_CALL(callbacks_, onConfigUpdate(_, "1")) + .WillOnce(Invoke([&load_assignment](const std::vector& resources, + const std::string&) { + EXPECT_EQ(1, resources.size()); + const auto& expected_assignment = + dynamic_cast( + resources[0].get().resource()); + EXPECT_TRUE(TestUtility::protoEqual(expected_assignment, load_assignment)); + })); + expectSendMessage(type_url, {}, "1"); + grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + } +} + +// Validate behavior when watches specify resources (potentially overlapping). +TEST_F(GrpcMuxImplTest, WatchDemux) { + setup(); + // We will not require InSequence here: an update that causes multiple onConfigUpdates + // causes them in an indeterminate order, based on the whims of the hash map. + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + + NiceMock foo_callbacks; + auto foo_sub = makeWatch(type_url, {"x", "y"}, foo_callbacks, resource_decoder_); + NiceMock bar_callbacks; + auto bar_sub = makeWatch(type_url, {"y", "z"}, bar_callbacks, resource_decoder_); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + // Should dedupe the "x" resource. + expectSendMessage(type_url, {"y", "z", "x"}, "", true); + grpc_mux_->start(); + + // Send just x; only foo_callbacks should receive an onConfigUpdate(). + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("1"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("x"); + response->add_resources()->PackFrom(load_assignment); + EXPECT_CALL(bar_callbacks, onConfigUpdate(_, "1")).Times(0); + EXPECT_CALL(foo_callbacks, onConfigUpdate(_, "1")) + .WillOnce(Invoke([&load_assignment](const std::vector& resources, + const std::string&) { + EXPECT_EQ(1, resources.size()); + const auto& expected_assignment = + dynamic_cast( + resources[0].get().resource()); + EXPECT_TRUE(TestUtility::protoEqual(expected_assignment, load_assignment)); + })); + expectSendMessage(type_url, {"y", "z", "x"}, "1"); + grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + } + + // Send x y and z; foo_ and bar_callbacks should both receive onConfigUpdate()s, carrying {x,y} + // and {y,z} respectively. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("2"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment_x; + load_assignment_x.set_cluster_name("x"); + response->add_resources()->PackFrom(load_assignment_x); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment_y; + load_assignment_y.set_cluster_name("y"); + response->add_resources()->PackFrom(load_assignment_y); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment_z; + load_assignment_z.set_cluster_name("z"); + response->add_resources()->PackFrom(load_assignment_z); + EXPECT_CALL(bar_callbacks, onConfigUpdate(_, "2")) + .WillOnce(Invoke([&load_assignment_y, &load_assignment_z]( + const std::vector& resources, const std::string&) { + EXPECT_EQ(2, resources.size()); + const auto& expected_assignment = + dynamic_cast( + resources[0].get().resource()); + EXPECT_TRUE(TestUtility::protoEqual(expected_assignment, load_assignment_y)); + const auto& expected_assignment_1 = + dynamic_cast( + resources[1].get().resource()); + EXPECT_TRUE(TestUtility::protoEqual(expected_assignment_1, load_assignment_z)); + })); + EXPECT_CALL(foo_callbacks, onConfigUpdate(_, "2")) + .WillOnce(Invoke([&load_assignment_x, &load_assignment_y]( + const std::vector& resources, const std::string&) { + EXPECT_EQ(2, resources.size()); + const auto& expected_assignment = + dynamic_cast( + resources[0].get().resource()); + EXPECT_TRUE(TestUtility::protoEqual(expected_assignment, load_assignment_x)); + const auto& expected_assignment_1 = + dynamic_cast( + resources[1].get().resource()); + EXPECT_TRUE(TestUtility::protoEqual(expected_assignment_1, load_assignment_y)); + })); + expectSendMessage(type_url, {"y", "z", "x"}, "2"); + grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + } + + expectSendMessage(type_url, {"x", "y"}, "2"); + expectSendMessage(type_url, {}, "2"); +} + +// Validate behavior when we have multiple watchers that send empty updates. +TEST_F(GrpcMuxImplTest, MultipleWatcherWithEmptyUpdates) { + setup(); + InSequence s; + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + NiceMock foo_callbacks; + auto foo_sub = makeWatch(type_url, {"x", "y"}, foo_callbacks, resource_decoder_); + + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {"x", "y"}, "", true); + grpc_mux_->start(); + + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("1"); + + EXPECT_CALL(foo_callbacks, onConfigUpdate(_, "1")).Times(0); + expectSendMessage(type_url, {"x", "y"}, "1"); + grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + + expectSendMessage(type_url, {}, "1"); +} + +// Validate behavior when we have Single Watcher that sends Empty updates. +TEST_F(GrpcMuxImplTest, SingleWatcherWithEmptyUpdates) { + setup(); + const std::string& type_url = Config::TypeUrl::get().Cluster; + NiceMock foo_callbacks; + auto foo_sub = makeWatch(type_url, {}, foo_callbacks, resource_decoder_); + + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {}, "", true); + grpc_mux_->start(); + + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("1"); + // Validate that onConfigUpdate is called with empty resources. + EXPECT_CALL(foo_callbacks, onConfigUpdate(_, "1")) + .WillOnce(Invoke([](const std::vector& resources, const std::string&) { + EXPECT_TRUE(resources.empty()); + })); + expectSendMessage(type_url, {}, "1"); + grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); +} + +// Exactly one test requires a mock time system to provoke behavior that cannot +// easily be achieved with a SimulatedTimeSystem. +class GrpcMuxImplTestWithMockTimeSystem : public GrpcMuxImplTestBase { +public: + Event::DelegatingTestTimeSystem mock_time_system_; +}; + +// Verifies that rate limiting is not enforced with defaults. +TEST_F(GrpcMuxImplTestWithMockTimeSystem, TooManyRequestsWithDefaultSettings) { + + auto ttl_timer = new Event::MockTimer(&dispatcher_); + // Retry timer, + new Event::MockTimer(&dispatcher_); + + // Validate that rate limiter is not created. + EXPECT_CALL(*mock_time_system_, monotonicTime()).Times(0); + + setup(); + + EXPECT_CALL(async_stream_, sendMessageRaw_(_, false)).Times(AtLeast(99)); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + + const auto onReceiveMessage = [&](uint64_t burst) { + for (uint64_t i = 0; i < burst; i++) { + auto response = std::make_unique(); + response->set_version_info("type_url_baz"); + response->set_nonce("type_url_bar"); + response->set_type_url("type_url_foo"); + EXPECT_CALL(*ttl_timer, disableTimer()); + grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + } + }; + + auto foo_sub = makeWatch("type_url_foo", {"x"}); + expectSendMessage("type_url_foo", {"x"}, "", true); + grpc_mux_->start(); + + // Exhausts the limit. + onReceiveMessage(99); + + // API calls go over the limit but we do not see the stat incremented. + onReceiveMessage(1); + EXPECT_EQ(0, stats_.counter("control_plane.rate_limit_enforced").value()); +} + +// Verifies that default rate limiting is enforced with empty RateLimitSettings. +TEST_F(GrpcMuxImplTest, TooManyRequestsWithEmptyRateLimitSettings) { + // Validate that request drain timer is created. + + auto ttl_timer = new Event::MockTimer(&dispatcher_); + Event::MockTimer* drain_request_timer = new Event::MockTimer(&dispatcher_); + Event::MockTimer* retry_timer = new Event::MockTimer(&dispatcher_); + + RateLimitSettings custom_rate_limit_settings; + custom_rate_limit_settings.enabled_ = true; + setup(custom_rate_limit_settings); + + // Attempt to send 99 messages. One of them is rate limited (and we never drain). + EXPECT_CALL(async_stream_, sendMessageRaw_(_, false)).Times(99); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + + const auto onReceiveMessage = [&](uint64_t burst) { + for (uint64_t i = 0; i < burst; i++) { + auto response = std::make_unique(); + response->set_version_info("type_url_baz"); + response->set_nonce("type_url_bar"); + response->set_type_url("type_url_foo"); + EXPECT_CALL(*ttl_timer, disableTimer()); + grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + } + }; + + auto foo_sub = makeWatch("type_url_foo", {"x"}); + expectSendMessage("type_url_foo", {"x"}, "", true); + grpc_mux_->start(); + + // Validate that drain_request_timer is enabled when there are no tokens. + EXPECT_CALL(*drain_request_timer, enableTimer(std::chrono::milliseconds(100), _)); + // The drain timer enable is checked twice, once when we limit, again when the watch is destroyed. + EXPECT_CALL(*drain_request_timer, enabled()).Times(11); + onReceiveMessage(110); + EXPECT_EQ(11, stats_.counter("control_plane.rate_limit_enforced").value()); + EXPECT_EQ(11, control_plane_pending_requests_.value()); + + // Validate that when we reset a stream with pending requests, it reverts back to the initial + // query (i.e. the queue is discarded). + EXPECT_CALL(callbacks_, + onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, _)); + EXPECT_CALL(random_, random()); + EXPECT_CALL(*retry_timer, enableTimer(_, _)); + grpc_mux_->grpcStreamForTest().onRemoteClose(Grpc::Status::WellKnownGrpcStatus::Canceled, ""); + EXPECT_EQ(11, control_plane_pending_requests_.value()); + EXPECT_EQ(0, control_plane_connected_state_.value()); + EXPECT_CALL(async_stream_, sendMessageRaw_(_, false)); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + time_system_.setMonotonicTime(std::chrono::seconds(30)); + retry_timer->invokeCallback(); + EXPECT_EQ(0, control_plane_pending_requests_.value()); + // One more message on the way out when the watch is destroyed. + EXPECT_CALL(async_stream_, sendMessageRaw_(_, false)); +} + +// Verifies that rate limiting is enforced with custom RateLimitSettings. +TEST_F(GrpcMuxImplTest, TooManyRequestsWithCustomRateLimitSettings) { + // Validate that request drain timer is created. + + // TTL timer. + auto ttl_timer = new Event::MockTimer(&dispatcher_); + Event::MockTimer* drain_request_timer = new Event::MockTimer(&dispatcher_); + // Retry timer. + new Event::MockTimer(&dispatcher_); + + RateLimitSettings custom_rate_limit_settings; + custom_rate_limit_settings.enabled_ = true; + custom_rate_limit_settings.max_tokens_ = 250; + custom_rate_limit_settings.fill_rate_ = 2; + setup(custom_rate_limit_settings); + + EXPECT_CALL(async_stream_, sendMessageRaw_(_, false)).Times(AtLeast(260)); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + + const auto onReceiveMessage = [&](uint64_t burst) { + for (uint64_t i = 0; i < burst; i++) { + auto response = std::make_unique(); + response->set_version_info("type_url_baz"); + response->set_nonce("type_url_bar"); + response->set_type_url("type_url_foo"); + EXPECT_CALL(*ttl_timer, disableTimer()); + grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + } + }; + + auto foo_sub = makeWatch("type_url_foo", {"x"}); + expectSendMessage("type_url_foo", {"x"}, "", true); + grpc_mux_->start(); + + // Validate that rate limit is not enforced for 100 requests. + onReceiveMessage(100); + EXPECT_EQ(0, stats_.counter("control_plane.rate_limit_enforced").value()); + + // Validate that drain_request_timer is enabled when there are no tokens. + EXPECT_CALL(*drain_request_timer, enableTimer(std::chrono::milliseconds(500), _)); + EXPECT_CALL(*drain_request_timer, enabled()).Times(11); + onReceiveMessage(160); + EXPECT_EQ(11, stats_.counter("control_plane.rate_limit_enforced").value()); + EXPECT_EQ(11, control_plane_pending_requests_.value()); + + // Validate that drain requests call when there are multiple requests in queue. + time_system_.setMonotonicTime(std::chrono::seconds(10)); + drain_request_timer->invokeCallback(); + + // Check that the pending_requests stat is updated with the queue drain. + EXPECT_EQ(0, control_plane_pending_requests_.value()); +} + +// Verifies that a message with no resources is accepted. +TEST_F(GrpcMuxImplTest, UnwatchedTypeAcceptsEmptyResources) { + setup(); + + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + + grpc_mux_->start(); + { + // subscribe and unsubscribe to simulate a cluster added and removed + expectSendMessage(type_url, {"y"}, "", true); + auto temp_sub = makeWatch(type_url, {"y"}); + expectSendMessage(type_url, {}, ""); + } + + // simulate the server sending empty CLA message to notify envoy that the CLA was removed. + auto response = std::make_unique(); + response->set_nonce("bar"); + response->set_version_info("1"); + response->set_type_url(type_url); + + // Although the update will change nothing for us, we will "accept" it, and so according + // to the spec we should ACK it. + expectSendMessage(type_url, {}, "1", false, "bar"); + grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + + // When we become interested in "x", we should send a request indicating that interest. + expectSendMessage(type_url, {"x"}, "1", false, "bar"); + auto sub = makeWatch(type_url, {"x"}); + + // Watch destroyed -> interest gone -> unsubscribe request. + expectSendMessage(type_url, {}, "1", false, "bar"); +} + +// Verifies that a message with some resources is accepted even when there are no watches. +// Rationale: SotW gRPC xDS has always been willing to accept updates that include +// uninteresting resources. It should not matter whether those uninteresting resources +// are accompanied by interesting ones. +// Note: this was previously "rejects", not "accepts". See +// https://github.com/envoyproxy/envoy/pull/8350#discussion_r328218220 for discussion. +TEST_F(GrpcMuxImplTest, UnwatchedTypeAcceptsResources) { + setup(); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + const std::string& type_url = + Config::getTypeUrl( + envoy::config::core::v3::ApiVersion::V3); + grpc_mux_->start(); + + // subscribe and unsubscribe so that the type is known to envoy + { + expectSendMessage(type_url, {"y"}, "", true); + expectSendMessage(type_url, {}, ""); + auto delete_immediately = makeWatch(type_url, {"y"}); + } + auto response = std::make_unique(); + response->set_type_url(type_url); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("x"); + response->add_resources()->PackFrom(load_assignment); + response->set_version_info("1"); + + expectSendMessage(type_url, {}, "1"); + grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); +} + +TEST_F(GrpcMuxImplTest, BadLocalInfoEmptyClusterName) { + EXPECT_CALL(local_info_, clusterName()).WillOnce(ReturnRef(EMPTY_STRING)); + EXPECT_THROW_WITH_MESSAGE( + XdsMux::GrpcMuxSotw( + std::unique_ptr(async_client_), dispatcher_, + *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( + "envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"), + envoy::config::core::v3::ApiVersion::AUTO, random_, stats_, rate_limit_settings_, + local_info_, true), + EnvoyException, + "ads: node 'id' and 'cluster' are required. Set it either in 'node' config or via " + "--service-node and --service-cluster options."); +} + +TEST_F(GrpcMuxImplTest, BadLocalInfoEmptyNodeName) { + EXPECT_CALL(local_info_, nodeName()).WillOnce(ReturnRef(EMPTY_STRING)); + EXPECT_THROW_WITH_MESSAGE( + XdsMux::GrpcMuxSotw( + std::unique_ptr(async_client_), dispatcher_, + *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( + "envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"), + envoy::config::core::v3::ApiVersion::AUTO, random_, stats_, rate_limit_settings_, + local_info_, true), + EnvoyException, + "ads: node 'id' and 'cluster' are required. Set it either in 'node' config or via " + "--service-node and --service-cluster options."); +} + +// Validate behavior when dynamic context parameters are updated. +TEST_F(GrpcMuxImplTest, DynamicContextParameters) { + setup(); + InSequence s; + auto foo = grpc_mux_->addWatch("foo", {"x", "y"}, callbacks_, resource_decoder_, {}); + auto bar = grpc_mux_->addWatch("bar", {}, callbacks_, resource_decoder_, {}); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage("foo", {"x", "y"}, "", true); + expectSendMessage("bar", {}, ""); + grpc_mux_->start(); + // Unknown type, shouldn't do anything. + local_info_.context_provider_.update_cb_handler_.runCallbacks("baz"); + // Update to foo type should resend Node. + expectSendMessage("foo", {"x", "y"}, "", true); + local_info_.context_provider_.update_cb_handler_.runCallbacks("foo"); + // Update to bar type should resend Node. + expectSendMessage("bar", {}, "", true); + local_info_.context_provider_.update_cb_handler_.runCallbacks("bar"); + // only destruction of foo watch is going to result in an unsubscribe message. + // bar watch is empty and its destruction doesn't change it resource list. + expectSendMessage("foo", {}, "", false); +} + +} // namespace +} // namespace XdsMux +} // namespace Config +} // namespace Envoy diff --git a/test/mocks/config/mocks.h b/test/mocks/config/mocks.h index ae12fa665c6d..ffcfdc796831 100644 --- a/test/mocks/config/mocks.h +++ b/test/mocks/config/mocks.h @@ -122,6 +122,8 @@ class MockGrpcMux : public GrpcMux { MOCK_METHOD(void, requestOnDemandUpdate, (const std::string& type_url, const absl::flat_hash_set& add_these_names)); + + MOCK_METHOD(bool, paused, (const std::string& type_url), (const)); }; class MockGrpcStreamCallbacks diff --git a/test/per_file_coverage.sh b/test/per_file_coverage.sh index 867bea789d89..f516f6a1abc2 100755 --- a/test/per_file_coverage.sh +++ b/test/per_file_coverage.sh @@ -21,6 +21,7 @@ declare -a KNOWN_LOW_COVERAGE=( "source/common/quic:91.2" "source/common/tracing:96.1" "source/common/watchdog:42.9" # Death tests don't report LCOV +"source/common/config/xds_mux:94.5" "source/exe:94.3" "source/extensions/common/crypto:91.5" "source/extensions/common/tap:95.9"