diff --git a/source/common/config/utility.cc b/source/common/config/utility.cc index 963def83d91b..50a058aa7961 100644 --- a/source/common/config/utility.cc +++ b/source/common/config/utility.cc @@ -12,6 +12,8 @@ #include "source/common/common/assert.h" #include "source/common/protobuf/utility.h" +#include "absl/status/status.h" + namespace Envoy { namespace Config { @@ -68,11 +70,13 @@ namespace { /** * Check the grpc_services and cluster_names for API config sanity. Throws on error. * @param api_config_source the config source to validate. + * @param max_grpc_services the maximal number of grpc services allowed. * @return an invalid status when an API config has the wrong number of gRPC * services or cluster names, depending on expectations set by its API type. */ absl::Status -checkApiConfigSourceNames(const envoy::config::core::v3::ApiConfigSource& api_config_source) { +checkApiConfigSourceNames(const envoy::config::core::v3::ApiConfigSource& api_config_source, + int max_grpc_services) { const bool is_grpc = (api_config_source.api_type() == envoy::config::core::v3::ApiConfigSource::GRPC || api_config_source.api_type() == envoy::config::core::v3::ApiConfigSource::DELTA_GRPC); @@ -89,10 +93,10 @@ checkApiConfigSourceNames(const envoy::config::core::v3::ApiConfigSource& api_co fmt::format("{}::(DELTA_)GRPC must not have a cluster name specified: {}", api_config_source.GetTypeName(), api_config_source.DebugString())); } - if (api_config_source.grpc_services().size() > 1) { - return absl::InvalidArgumentError( - fmt::format("{}::(DELTA_)GRPC must have a single gRPC service specified: {}", - api_config_source.GetTypeName(), api_config_source.DebugString())); + if (api_config_source.grpc_services_size() > max_grpc_services) { + return absl::InvalidArgumentError(fmt::format( + "{}::(DELTA_)GRPC must have no more than {} gRPC services specified: {}", + api_config_source.GetTypeName(), max_grpc_services, api_config_source.DebugString())); } } else { if (!api_config_source.grpc_services().empty()) { @@ -133,7 +137,9 @@ absl::Status Utility::checkApiConfigSourceSubscriptionBackingCluster( envoy::config::core::v3::ApiConfigSource::AGGREGATED_DELTA_GRPC) { return absl::OkStatus(); } - RETURN_IF_NOT_OK(checkApiConfigSourceNames(api_config_source)); + RETURN_IF_NOT_OK(checkApiConfigSourceNames( + api_config_source, + Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support") ? 2 : 1)); const bool is_grpc = (api_config_source.api_type() == envoy::config::core::v3::ApiConfigSource::GRPC); @@ -153,6 +159,14 @@ absl::Status Utility::checkApiConfigSourceSubscriptionBackingCluster( primary_clusters, api_config_source.grpc_services()[0].envoy_grpc().cluster_name(), api_config_source.GetTypeName())); } + if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support") && + api_config_source.grpc_services_size() == 2 && + api_config_source.grpc_services()[1].has_envoy_grpc()) { + // If an Envoy failover gRPC exists, we validate its cluster name. + RETURN_IF_NOT_OK(Utility::validateClusterName( + primary_clusters, api_config_source.grpc_services()[1].envoy_grpc().cluster_name(), + api_config_source.GetTypeName())); + } } // Otherwise, there is no cluster name to validate. return absl::OkStatus(); @@ -161,15 +175,23 @@ absl::Status Utility::checkApiConfigSourceSubscriptionBackingCluster( absl::optional Utility::getGrpcControlPlane(const envoy::config::core::v3::ApiConfigSource& api_config_source) { if (api_config_source.grpc_services_size() > 0) { - // Only checking for the first entry in grpc_services, because Envoy's xDS implementation - // currently only considers the first gRPC endpoint and ignores any other xDS management servers - // specified in an ApiConfigSource. + std::string res = ""; + // In case more than one grpc service is defined, concatenate the names for + // a unique GrpcControlPlane identifier. if (api_config_source.grpc_services(0).has_envoy_grpc()) { - return api_config_source.grpc_services(0).envoy_grpc().cluster_name(); + res = api_config_source.grpc_services(0).envoy_grpc().cluster_name(); + } else if (api_config_source.grpc_services(0).has_google_grpc()) { + res = api_config_source.grpc_services(0).google_grpc().target_uri(); } - if (api_config_source.grpc_services(0).has_google_grpc()) { - return api_config_source.grpc_services(0).google_grpc().target_uri(); + // Concatenate the failover gRPC service. + if (api_config_source.grpc_services_size() == 2) { + if (api_config_source.grpc_services(1).has_envoy_grpc()) { + absl::StrAppend(&res, ",", api_config_source.grpc_services(1).envoy_grpc().cluster_name()); + } else if (api_config_source.grpc_services(1).has_google_grpc()) { + absl::StrAppend(&res, ",", api_config_source.grpc_services(1).google_grpc().target_uri()); + } } + return res; } return absl::nullopt; } @@ -204,8 +226,10 @@ Utility::parseRateLimitSettings(const envoy::config::core::v3::ApiConfigSource& absl::StatusOr Utility::factoryForGrpcApiConfigSource( Grpc::AsyncClientManager& async_client_manager, const envoy::config::core::v3::ApiConfigSource& api_config_source, Stats::Scope& scope, - bool skip_cluster_check) { - RETURN_IF_NOT_OK(checkApiConfigSourceNames(api_config_source)); + bool skip_cluster_check, int grpc_service_idx) { + RETURN_IF_NOT_OK(checkApiConfigSourceNames( + api_config_source, + Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support") ? 2 : 1)); if (api_config_source.api_type() != envoy::config::core::v3::ApiConfigSource::GRPC && api_config_source.api_type() != envoy::config::core::v3::ApiConfigSource::DELTA_GRPC) { @@ -214,8 +238,13 @@ absl::StatusOr Utility::factoryForGrpcApiConfigSour api_config_source.DebugString())); } + if (grpc_service_idx >= api_config_source.grpc_services_size()) { + // No returned factory in case there's no entry. + return nullptr; + } + envoy::config::core::v3::GrpcService grpc_service; - grpc_service.MergeFrom(api_config_source.grpc_services(0)); + grpc_service.MergeFrom(api_config_source.grpc_services(grpc_service_idx)); return async_client_manager.factoryForGrpcService(grpc_service, scope, skip_cluster_check); } diff --git a/source/common/config/utility.h b/source/common/config/utility.h index 25b4eaaccfa9..1faadb15f3e0 100644 --- a/source/common/config/utility.h +++ b/source/common/config/utility.h @@ -391,12 +391,15 @@ class Utility { * @param async_client_manager gRPC async client manager. * @param api_config_source envoy::config::core::v3::ApiConfigSource. Must have config type GRPC. * @param skip_cluster_check whether to skip cluster validation. - * @return Grpc::AsyncClientFactoryPtr gRPC async client factory. + * @param grpc_service_idx index of the grpc service in the api_config_source. If there's no entry + * in the given index, a nullptr factory will be returned. + * @return Grpc::AsyncClientFactoryPtr gRPC async client factory, or nullptr if there's no + * grpc_service in the given index. */ static absl::StatusOr factoryForGrpcApiConfigSource(Grpc::AsyncClientManager& async_client_manager, const envoy::config::core::v3::ApiConfigSource& api_config_source, - Stats::Scope& scope, bool skip_cluster_check); + Stats::Scope& scope, bool skip_cluster_check, int grpc_service_idx); /** * Translate opaque config from google.protobuf.Any to defined proto message. diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 122d66c2010a..44fa2498ec9b 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -88,16 +88,25 @@ getOrigin(const Network::TransportSocketOptionsConstSharedPtr& options, HostCons bool isBlockingAdsCluster(const envoy::config::bootstrap::v3::Bootstrap& bootstrap, absl::string_view cluster_name) { + bool blocking_ads_cluster = false; if (bootstrap.dynamic_resources().has_ads_config()) { const auto& ads_config_source = bootstrap.dynamic_resources().ads_config(); // We only care about EnvoyGrpc, not GoogleGrpc, because we only need to delay ADS mux // initialization if it uses an Envoy cluster that needs to be initialized first. We don't // depend on the same cluster initialization when opening a gRPC stream for GoogleGrpc. - return (ads_config_source.grpc_services_size() > 0 && - ads_config_source.grpc_services(0).has_envoy_grpc() && - ads_config_source.grpc_services(0).envoy_grpc().cluster_name() == cluster_name); + blocking_ads_cluster = + (ads_config_source.grpc_services_size() > 0 && + ads_config_source.grpc_services(0).has_envoy_grpc() && + ads_config_source.grpc_services(0).envoy_grpc().cluster_name() == cluster_name); + if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) { + // Validate the failover server if there is one. + blocking_ads_cluster |= + (ads_config_source.grpc_services_size() == 2 && + ads_config_source.grpc_services(1).has_envoy_grpc() && + ads_config_source.grpc_services(1).envoy_grpc().cluster_name() == cluster_name); + } } - return false; + return blocking_ads_cluster; } } // namespace @@ -447,14 +456,22 @@ ClusterManagerImpl::initialize(const envoy::config::bootstrap::v3::Bootstrap& bo if (!factory) { return absl::InvalidArgumentError(fmt::format("{} not found", name)); } - auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource( - *async_client_manager_, dyn_resources.ads_config(), *stats_.rootScope(), false); - RETURN_IF_STATUS_NOT_OK(factory_or_error); - ads_mux_ = factory->create(factory_or_error.value()->createUncachedRawAsyncClient(), nullptr, - dispatcher_, random_, *stats_.rootScope(), - dyn_resources.ads_config(), local_info_, - std::move(custom_config_validators), std::move(backoff_strategy), - makeOptRefFromPtr(xds_config_tracker_.get()), {}, use_eds_cache); + auto factory_primary_or_error = Config::Utility::factoryForGrpcApiConfigSource( + *async_client_manager_, dyn_resources.ads_config(), *stats_.rootScope(), false, 0); + RETURN_IF_STATUS_NOT_OK(factory_primary_or_error); + Grpc::AsyncClientFactoryPtr factory_failover = nullptr; + if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) { + auto factory_failover_or_error = Config::Utility::factoryForGrpcApiConfigSource( + *async_client_manager_, dyn_resources.ads_config(), *stats_.rootScope(), false, 1); + RETURN_IF_STATUS_NOT_OK(factory_failover_or_error); + factory_failover = std::move(factory_failover_or_error.value()); + } + ads_mux_ = factory->create( + factory_primary_or_error.value()->createUncachedRawAsyncClient(), + factory_failover ? factory_failover->createUncachedRawAsyncClient() : nullptr, + dispatcher_, random_, *stats_.rootScope(), dyn_resources.ads_config(), local_info_, + std::move(custom_config_validators), std::move(backoff_strategy), + makeOptRefFromPtr(xds_config_tracker_.get()), {}, use_eds_cache); } else { absl::Status status = Config::Utility::checkTransportVersion(dyn_resources.ads_config()); RETURN_IF_NOT_OK(status); @@ -470,12 +487,20 @@ ClusterManagerImpl::initialize(const envoy::config::bootstrap::v3::Bootstrap& bo if (!factory) { return absl::InvalidArgumentError(fmt::format("{} not found", name)); } - auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource( - *async_client_manager_, dyn_resources.ads_config(), *stats_.rootScope(), false); - RETURN_IF_STATUS_NOT_OK(factory_or_error); + auto factory_primary_or_error = Config::Utility::factoryForGrpcApiConfigSource( + *async_client_manager_, dyn_resources.ads_config(), *stats_.rootScope(), false, 0); + RETURN_IF_STATUS_NOT_OK(factory_primary_or_error); + Grpc::AsyncClientFactoryPtr factory_failover = nullptr; + if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) { + auto factory_failover_or_error = Config::Utility::factoryForGrpcApiConfigSource( + *async_client_manager_, dyn_resources.ads_config(), *stats_.rootScope(), false, 1); + RETURN_IF_STATUS_NOT_OK(factory_failover_or_error); + factory_failover = std::move(factory_failover_or_error.value()); + } ads_mux_ = factory->create( - factory_or_error.value()->createUncachedRawAsyncClient(), nullptr, dispatcher_, random_, - *stats_.rootScope(), dyn_resources.ads_config(), local_info_, + factory_primary_or_error.value()->createUncachedRawAsyncClient(), + factory_failover ? factory_failover->createUncachedRawAsyncClient() : nullptr, + dispatcher_, random_, *stats_.rootScope(), dyn_resources.ads_config(), local_info_, std::move(custom_config_validators), std::move(backoff_strategy), makeOptRefFromPtr(xds_config_tracker_.get()), xds_delegate_opt_ref, use_eds_cache); } @@ -568,7 +593,7 @@ absl::Status ClusterManagerImpl::initializeSecondaryClusters( absl::Status status = Config::Utility::checkTransportVersion(load_stats_config); RETURN_IF_NOT_OK(status); auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource( - *async_client_manager_, load_stats_config, *stats_.rootScope(), false); + *async_client_manager_, load_stats_config, *stats_.rootScope(), false, 0); RETURN_IF_STATUS_NOT_OK(factory_or_error); load_stats_reporter_ = std::make_unique( local_info_, *this, *stats_.rootScope(), diff --git a/source/extensions/config_subscription/grpc/grpc_collection_subscription_factory.cc b/source/extensions/config_subscription/grpc/grpc_collection_subscription_factory.cc index 5b8433fd989c..9468f1d32c2e 100644 --- a/source/extensions/config_subscription/grpc/grpc_collection_subscription_factory.cc +++ b/source/extensions/config_subscription/grpc/grpc_collection_subscription_factory.cc @@ -23,15 +23,15 @@ SubscriptionPtr DeltaGrpcCollectionConfigSubscriptionFactory::create( THROW_IF_STATUS_NOT_OK(strategy_or_error, throw); JitteredExponentialBackOffStrategyPtr backoff_strategy = std::move(strategy_or_error.value()); - auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource( - data.cm_.grpcAsyncClientManager(), api_config_source, data.scope_, true); - THROW_IF_STATUS_NOT_OK(factory_or_error, throw); + auto factory_primary_or_error = Config::Utility::factoryForGrpcApiConfigSource( + data.cm_.grpcAsyncClientManager(), api_config_source, data.scope_, true, 0); + THROW_IF_STATUS_NOT_OK(factory_primary_or_error, throw); absl::StatusOr rate_limit_settings_or_error = Utility::parseRateLimitSettings(api_config_source); THROW_IF_STATUS_NOT_OK(rate_limit_settings_or_error, throw); GrpcMuxContext grpc_mux_context{ - factory_or_error.value()->createUncachedRawAsyncClient(), - /*failover_async_client_*/ nullptr, + factory_primary_or_error.value()->createUncachedRawAsyncClient(), + /*failover_async_client_=*/nullptr, /*dispatcher_=*/data.dispatcher_, /*service_method_=*/deltaGrpcMethod(data.type_url_), /*local_info_=*/data.local_info_, diff --git a/source/extensions/config_subscription/grpc/grpc_mux_failover.h b/source/extensions/config_subscription/grpc/grpc_mux_failover.h index 60cba40cbd18..00876af5c555 100644 --- a/source/extensions/config_subscription/grpc/grpc_mux_failover.h +++ b/source/extensions/config_subscription/grpc/grpc_mux_failover.h @@ -50,6 +50,8 @@ template class GrpcMuxFailover : public GrpcStreamInterface, public Logger::Loggable { public: + static constexpr uint32_t DefaultFailoverBackoffMilliseconds = 500; + // A GrpcStream creator function that receives the stream callbacks and returns a // GrpcStream object. This is introduced to facilitate dependency injection for // testing and will be used to create the primary and failover streams. diff --git a/source/extensions/config_subscription/grpc/grpc_mux_impl.cc b/source/extensions/config_subscription/grpc/grpc_mux_impl.cc index 8091835caa6e..f402fafb8878 100644 --- a/source/extensions/config_subscription/grpc/grpc_mux_impl.cc +++ b/source/extensions/config_subscription/grpc/grpc_mux_impl.cc @@ -97,8 +97,28 @@ GrpcMuxImpl::createGrpcStreamObject(GrpcMuxContext& grpc_mux_context) { grpc_mux_context.rate_limit_settings_); }, /*failover_stream_creator=*/ - // TODO(adisuissa): implement when failover is fully plumbed. - absl::nullopt, + grpc_mux_context.failover_async_client_ + ? absl::make_optional( + [&grpc_mux_context]( + GrpcStreamCallbacks* + callbacks) + -> GrpcStreamInterfacePtr { + return std::make_unique< + GrpcStream>( + callbacks, std::move(grpc_mux_context.failover_async_client_), + grpc_mux_context.service_method_, grpc_mux_context.dispatcher_, + grpc_mux_context.scope_, + // TODO(adisuissa): the backoff strategy for the failover should + // be the same as the primary source. + std::make_unique( + GrpcMuxFailover:: + DefaultFailoverBackoffMilliseconds), + grpc_mux_context.rate_limit_settings_); + }) + : absl::nullopt, /*grpc_mux_callbacks=*/*this, /*dispatch=*/grpc_mux_context.dispatcher_); } diff --git a/source/extensions/config_subscription/grpc/grpc_subscription_factory.cc b/source/extensions/config_subscription/grpc/grpc_subscription_factory.cc index e3792b49b42f..da174b438ce7 100644 --- a/source/extensions/config_subscription/grpc/grpc_subscription_factory.cc +++ b/source/extensions/config_subscription/grpc/grpc_subscription_factory.cc @@ -26,15 +26,15 @@ GrpcConfigSubscriptionFactory::create(ConfigSubscriptionFactory::SubscriptionDat THROW_IF_STATUS_NOT_OK(strategy_or_error, throw); JitteredExponentialBackOffStrategyPtr backoff_strategy = std::move(strategy_or_error.value()); - auto factory_or_error = Utility::factoryForGrpcApiConfigSource( - data.cm_.grpcAsyncClientManager(), api_config_source, data.scope_, true); - THROW_IF_STATUS_NOT_OK(factory_or_error, throw); + auto factory_primary_or_error = Utility::factoryForGrpcApiConfigSource( + data.cm_.grpcAsyncClientManager(), api_config_source, data.scope_, true, 0); + THROW_IF_STATUS_NOT_OK(factory_primary_or_error, throw); absl::StatusOr rate_limit_settings_or_error = Utility::parseRateLimitSettings(api_config_source); THROW_IF_STATUS_NOT_OK(rate_limit_settings_or_error, throw); GrpcMuxContext grpc_mux_context{ - /*async_client_=*/factory_or_error.value()->createUncachedRawAsyncClient(), - /*failover_async_client_=*/nullptr, + /*async_client_=*/factory_primary_or_error.value()->createUncachedRawAsyncClient(), + /*failover_async_client_=*/nullptr, // Failover is only supported for ADS. /*dispatcher_=*/data.dispatcher_, /*service_method_=*/sotwGrpcMethod(data.type_url_), /*local_info_=*/data.local_info_, @@ -75,15 +75,15 @@ DeltaGrpcConfigSubscriptionFactory::create(ConfigSubscriptionFactory::Subscripti THROW_IF_STATUS_NOT_OK(strategy_or_error, throw); JitteredExponentialBackOffStrategyPtr backoff_strategy = std::move(strategy_or_error.value()); - auto factory_or_error = Utility::factoryForGrpcApiConfigSource( - data.cm_.grpcAsyncClientManager(), api_config_source, data.scope_, true); - THROW_IF_STATUS_NOT_OK(factory_or_error, throw); + auto factory_primary_or_error = Utility::factoryForGrpcApiConfigSource( + data.cm_.grpcAsyncClientManager(), api_config_source, data.scope_, true, 0); + THROW_IF_STATUS_NOT_OK(factory_primary_or_error, throw); absl::StatusOr rate_limit_settings_or_error = Utility::parseRateLimitSettings(api_config_source); THROW_IF_STATUS_NOT_OK(rate_limit_settings_or_error, throw); GrpcMuxContext grpc_mux_context{ - /*async_client_=*/factory_or_error.value()->createUncachedRawAsyncClient(), - /*failover_async_client_=*/nullptr, + /*async_client_=*/factory_primary_or_error.value()->createUncachedRawAsyncClient(), + /*failover_async_client_=*/nullptr, // Failover is only supported for ADS. /*dispatcher_=*/data.dispatcher_, /*service_method_=*/deltaGrpcMethod(data.type_url_), /*local_info_=*/data.local_info_, diff --git a/source/extensions/config_subscription/grpc/new_grpc_mux_impl.cc b/source/extensions/config_subscription/grpc/new_grpc_mux_impl.cc index c06685f13af6..44e64c1ca43e 100644 --- a/source/extensions/config_subscription/grpc/new_grpc_mux_impl.cc +++ b/source/extensions/config_subscription/grpc/new_grpc_mux_impl.cc @@ -71,8 +71,29 @@ NewGrpcMuxImpl::createGrpcStreamObject(GrpcMuxContext& grpc_mux_context) { grpc_mux_context.rate_limit_settings_); }, /*failover_stream_creator=*/ - // TODO(adisuissa): implement when failover is fully plumbed. - absl::nullopt, + grpc_mux_context.failover_async_client_ + ? absl::make_optional( + [&grpc_mux_context]( + GrpcStreamCallbacks* + callbacks) + -> GrpcStreamInterfacePtr< + envoy::service::discovery::v3::DeltaDiscoveryRequest, + envoy::service::discovery::v3::DeltaDiscoveryResponse> { + return std::make_unique< + GrpcStream>( + callbacks, std::move(grpc_mux_context.failover_async_client_), + grpc_mux_context.service_method_, grpc_mux_context.dispatcher_, + grpc_mux_context.scope_, + // TODO(adisuissa): the backoff strategy for the failover should + // be the same as the primary source. + std::make_unique( + GrpcMuxFailover:: + DefaultFailoverBackoffMilliseconds), + grpc_mux_context.rate_limit_settings_); + }) + : absl::nullopt, /*grpc_mux_callbacks=*/*this, /*dispatch=*/grpc_mux_context.dispatcher_); } diff --git a/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.cc b/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.cc index b564b0b929db..ef39566a8383 100644 --- a/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.cc +++ b/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.cc @@ -72,8 +72,20 @@ GrpcMuxImpl::createGrpcStreamObject(GrpcMuxContext& grpc_mux_conte grpc_mux_context.rate_limit_settings_); }, /*failover_stream_creator=*/ - // TODO(adisuissa): implement when failover is fully plumbed. - absl::nullopt, + grpc_mux_context.failover_async_client_ + ? absl::make_optional([&grpc_mux_context](GrpcStreamCallbacks* callbacks) + -> GrpcStreamInterfacePtr { + return std::make_unique>( + callbacks, std::move(grpc_mux_context.failover_async_client_), + grpc_mux_context.service_method_, grpc_mux_context.dispatcher_, + grpc_mux_context.scope_, + // TODO(adisuissa): the backoff strategy for the failover should + // be the same as the primary source. + std::make_unique( + GrpcMuxFailover::DefaultFailoverBackoffMilliseconds), + grpc_mux_context.rate_limit_settings_); + }) + : absl::nullopt, /*grpc_mux_callbacks=*/*this, /*dispatch=*/grpc_mux_context.dispatcher_); } diff --git a/source/server/server.cc b/source/server/server.cc index 54d2ac257a1b..f8bef3157697 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -818,8 +818,9 @@ void InstanceBase::onRuntimeReady() { bootstrap_.grpc_async_client_manager_config()); TRY_ASSERT_MAIN_THREAD { THROW_IF_NOT_OK(Config::Utility::checkTransportVersion(hds_config)); + // HDS does not support xDS-Failover. auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource( - *async_client_manager_, hds_config, *stats_store_.rootScope(), false); + *async_client_manager_, hds_config, *stats_store_.rootScope(), false, 0); THROW_IF_STATUS_NOT_OK(factory_or_error, throw); hds_delegate_ = std::make_unique( serverFactoryContext(), *stats_store_.rootScope(), diff --git a/test/common/config/BUILD b/test/common/config/BUILD index b11e527d8c7b..b72678b0089e 100644 --- a/test/common/config/BUILD +++ b/test/common/config/BUILD @@ -137,6 +137,7 @@ envoy_cc_test( "//test/mocks/upstream:thread_local_cluster_mocks", "//test/test_common:environment_lib", "//test/test_common:logging_lib", + "//test/test_common:test_runtime_lib", "//test/test_common:utility_lib", "@com_github_cncf_xds//udpa/type/v1:pkg_cc_proto", "@com_github_cncf_xds//xds/type/v3:pkg_cc_proto", diff --git a/test/common/config/utility_test.cc b/test/common/config/utility_test.cc index 72d7864f1731..b3945df1e62d 100644 --- a/test/common/config/utility_test.cc +++ b/test/common/config/utility_test.cc @@ -20,6 +20,7 @@ #include "test/mocks/upstream/thread_local_cluster.h" #include "test/test_common/environment.h" #include "test/test_common/logging.h" +#include "test/test_common/test_runtime.h" #include "test/test_common/utility.h" #include "absl/types/optional.h" @@ -137,7 +138,7 @@ TEST(UtilityTest, FactoryForGrpcApiConfigSource) { envoy::config::core::v3::ApiConfigSource api_config_source; api_config_source.set_api_type(envoy::config::core::v3::ApiConfigSource::GRPC); EXPECT_THAT(Utility::factoryForGrpcApiConfigSource(async_client_manager, api_config_source, - scope, false) + scope, false, 0) .status() .message(), HasSubstr("API configs must have either a gRPC service or a cluster name defined")); @@ -149,12 +150,12 @@ TEST(UtilityTest, FactoryForGrpcApiConfigSource) { api_config_source.add_grpc_services(); api_config_source.add_grpc_services(); EXPECT_THAT(Utility::factoryForGrpcApiConfigSource(async_client_manager, api_config_source, - scope, false) + scope, false, 0) .status() .message(), - ContainsRegex(fmt::format("{}::.DELTA_.GRPC must have a single gRPC service " - "specified:", - api_config_source.GetTypeName()))); + ContainsRegex(fmt::format( + "{}::.DELTA_.GRPC must have no more than 1 gRPC services specified:", + api_config_source.GetTypeName()))); } { @@ -163,7 +164,7 @@ TEST(UtilityTest, FactoryForGrpcApiConfigSource) { api_config_source.add_cluster_names(); // this also logs a warning for setting REST cluster names for a gRPC API config. EXPECT_THAT(Utility::factoryForGrpcApiConfigSource(async_client_manager, api_config_source, - scope, false) + scope, false, 0) .status() .message(), ContainsRegex(fmt::format("{}::.DELTA_.GRPC must not have a cluster name " @@ -177,7 +178,7 @@ TEST(UtilityTest, FactoryForGrpcApiConfigSource) { api_config_source.add_cluster_names(); api_config_source.add_cluster_names(); EXPECT_THAT(Utility::factoryForGrpcApiConfigSource(async_client_manager, api_config_source, - scope, false) + scope, false, 0) .status() .message(), ContainsRegex(fmt::format("{}::.DELTA_.GRPC must not have a cluster name " @@ -191,7 +192,7 @@ TEST(UtilityTest, FactoryForGrpcApiConfigSource) { api_config_source.add_grpc_services()->mutable_envoy_grpc()->set_cluster_name("foo"); // this also logs a warning for configuring gRPC clusters for a REST API config. EXPECT_THAT(Utility::factoryForGrpcApiConfigSource(async_client_manager, api_config_source, - scope, false) + scope, false, 0) .status() .message(), ContainsRegex(fmt::format("{}, if not a gRPC type, must not have a gRPC service " @@ -205,7 +206,7 @@ TEST(UtilityTest, FactoryForGrpcApiConfigSource) { api_config_source.add_cluster_names("foo"); EXPECT_THAT( Utility::factoryForGrpcApiConfigSource(async_client_manager, api_config_source, scope, - false) + false, 0) .status() .message(), ContainsRegex(fmt::format("{} type must be gRPC:", api_config_source.GetTypeName()))); @@ -220,7 +221,7 @@ TEST(UtilityTest, FactoryForGrpcApiConfigSource) { EXPECT_CALL(async_client_manager, factoryForGrpcService(ProtoEq(expected_grpc_service), Ref(scope), false)); EXPECT_TRUE(Utility::factoryForGrpcApiConfigSource(async_client_manager, api_config_source, - scope, false) + scope, false, 0) .ok()); } @@ -231,9 +232,76 @@ TEST(UtilityTest, FactoryForGrpcApiConfigSource) { EXPECT_CALL( async_client_manager, factoryForGrpcService(ProtoEq(api_config_source.grpc_services(0)), Ref(scope), true)); - EXPECT_TRUE( - Utility::factoryForGrpcApiConfigSource(async_client_manager, api_config_source, scope, true) - .ok()); + EXPECT_TRUE(Utility::factoryForGrpcApiConfigSource(async_client_manager, api_config_source, + scope, true, 0) + .ok()); + } +} + +// Validates that when failover is supported, the validation works as expected. +TEST(UtilityTest, FactoryForGrpcApiConfigSourceWithFailover) { + // When envoy.restart_features.xds_failover_support is deprecated, the + // following tests will need to be merged with the tests in + // FactoryForGrpcApiConfigSource. + TestScopedRuntime scoped_runtime; + scoped_runtime.mergeValues({{"envoy.restart_features.xds_failover_support", "true"}}); + + NiceMock async_client_manager; + Stats::MockStore store; + Stats::Scope& scope = *store.rootScope(); + + // No more than 2 config sources. + { + envoy::config::core::v3::ApiConfigSource api_config_source; + api_config_source.set_api_type(envoy::config::core::v3::ApiConfigSource::GRPC); + api_config_source.add_grpc_services(); + api_config_source.add_grpc_services(); + api_config_source.add_grpc_services(); + EXPECT_THAT(Utility::factoryForGrpcApiConfigSource(async_client_manager, api_config_source, + scope, false, 0) + .status() + .message(), + ContainsRegex(fmt::format( + "{}::.DELTA_.GRPC must have no more than 2 gRPC services specified:", + api_config_source.GetTypeName()))); + } + + // A single gRPC service is valid. + { + envoy::config::core::v3::ApiConfigSource api_config_source; + api_config_source.set_api_type(envoy::config::core::v3::ApiConfigSource::GRPC); + api_config_source.add_grpc_services()->mutable_envoy_grpc()->set_cluster_name("foo"); + envoy::config::core::v3::GrpcService expected_grpc_service; + expected_grpc_service.mutable_envoy_grpc()->set_cluster_name("foo"); + EXPECT_CALL(async_client_manager, + factoryForGrpcService(ProtoEq(expected_grpc_service), Ref(scope), false)); + EXPECT_TRUE(Utility::factoryForGrpcApiConfigSource(async_client_manager, api_config_source, + scope, false, 0) + .ok()); + } + + // 2 gRPC services is valid. + { + envoy::config::core::v3::ApiConfigSource api_config_source; + api_config_source.set_api_type(envoy::config::core::v3::ApiConfigSource::GRPC); + api_config_source.add_grpc_services()->mutable_envoy_grpc()->set_cluster_name("foo"); + api_config_source.add_grpc_services()->mutable_envoy_grpc()->set_cluster_name("bar"); + + envoy::config::core::v3::GrpcService expected_grpc_service_foo; + expected_grpc_service_foo.mutable_envoy_grpc()->set_cluster_name("foo"); + EXPECT_CALL(async_client_manager, + factoryForGrpcService(ProtoEq(expected_grpc_service_foo), Ref(scope), false)); + EXPECT_TRUE(Utility::factoryForGrpcApiConfigSource(async_client_manager, api_config_source, + scope, false, 0) + .ok()); + + envoy::config::core::v3::GrpcService expected_grpc_service_bar; + expected_grpc_service_bar.mutable_envoy_grpc()->set_cluster_name("bar"); + EXPECT_CALL(async_client_manager, + factoryForGrpcService(ProtoEq(expected_grpc_service_bar), Ref(scope), false)); + EXPECT_TRUE(Utility::factoryForGrpcApiConfigSource(async_client_manager, api_config_source, + scope, false, 1) + .ok()); } } diff --git a/test/extensions/config_subscription/common/subscription_factory_impl_test.cc b/test/extensions/config_subscription/common/subscription_factory_impl_test.cc index ed4470656532..34ac0dfb02eb 100644 --- a/test/extensions/config_subscription/common/subscription_factory_impl_test.cc +++ b/test/extensions/config_subscription/common/subscription_factory_impl_test.cc @@ -213,11 +213,92 @@ TEST_P(SubscriptionFactoryTestUnifiedOrLegacyMux, GrpcClusterMultiton) { EXPECT_CALL(cm_, primaryClusters()).WillRepeatedly(ReturnRef(primary_clusters)); EXPECT_THROW_WITH_REGEX(subscriptionFromConfigSource(config), EnvoyException, - fmt::format("{}::.DELTA_.GRPC must have a " - "single gRPC service specified:", + fmt::format("{}::.DELTA_.GRPC must have no " + "more than 1 gRPC services specified:", config.mutable_api_config_source()->GetTypeName())); } +// Validate that failover is accepted. +TEST_P(SubscriptionFactoryTestUnifiedOrLegacyMux, GrpcClusterMultitonFailover) { + // Once "envoy.restart_features.xds_failover_support" is deprecated, the + // "GrpcClusterMultiton" test above will be removed and this one kept. + TestScopedRuntime scoped_runtime; + scoped_runtime.mergeValues({{"envoy.restart_features.xds_failover_support", "true"}}); + envoy::config::core::v3::ConfigSource config; + Upstream::ClusterManager::ClusterSet primary_clusters; + + config.mutable_api_config_source()->set_api_type(envoy::config::core::v3::ApiConfigSource::GRPC); + config.mutable_api_config_source()->set_transport_api_version(envoy::config::core::v3::V3); + config.mutable_api_config_source()->mutable_refresh_delay()->set_seconds(1); + + // A single entry is accepted. + config.mutable_api_config_source()->add_grpc_services()->mutable_envoy_grpc()->set_cluster_name( + "static_cluster_foo"); + primary_clusters.insert("static_cluster_foo"); + + { + envoy::config::core::v3::GrpcService expected_grpc_service; + expected_grpc_service.mutable_envoy_grpc()->set_cluster_name("static_cluster_foo"); + + EXPECT_CALL(cm_, primaryClusters()).WillOnce(ReturnRef(primary_clusters)); + EXPECT_CALL(cm_, grpcAsyncClientManager()).WillOnce(ReturnRef(cm_.async_client_manager_)); + EXPECT_CALL(cm_.async_client_manager_, + factoryForGrpcService(ProtoEq(expected_grpc_service), _, _)) + .WillOnce(Invoke([](const envoy::config::core::v3::GrpcService&, Stats::Scope&, bool) { + auto async_client_factory = std::make_unique(); + EXPECT_CALL(*async_client_factory, createUncachedRawAsyncClient()).WillOnce(Invoke([] { + return std::make_unique(); + })); + return async_client_factory; + })); + EXPECT_CALL(dispatcher_, createTimer_(_)); + + subscriptionFromConfigSource(config); + } + + // 2 entries (primary and failover) are accepted. + config.mutable_api_config_source()->add_grpc_services()->mutable_envoy_grpc()->set_cluster_name( + "static_cluster_bar"); + primary_clusters.insert("static_cluster_bar"); + + { + envoy::config::core::v3::GrpcService expected_grpc_service; + expected_grpc_service.mutable_envoy_grpc()->set_cluster_name("static_cluster_foo"); + + EXPECT_CALL(cm_, primaryClusters()).WillOnce(ReturnRef(primary_clusters)); + EXPECT_CALL(cm_, grpcAsyncClientManager()).WillOnce(ReturnRef(cm_.async_client_manager_)); + // A factory will only be created for the primary source, because the + // failover source is only supported for ADS. + EXPECT_CALL(cm_.async_client_manager_, + factoryForGrpcService(ProtoEq(expected_grpc_service), _, _)) + .WillOnce(Invoke([](const envoy::config::core::v3::GrpcService&, Stats::Scope&, bool) { + auto async_client_factory = std::make_unique(); + EXPECT_CALL(*async_client_factory, createUncachedRawAsyncClient()).WillOnce(Invoke([] { + return std::make_unique(); + })); + return async_client_factory; + })); + EXPECT_CALL(dispatcher_, createTimer_(_)); + + subscriptionFromConfigSource(config); + } + + // 3 entries are rejected. + config.mutable_api_config_source()->add_grpc_services()->mutable_envoy_grpc()->set_cluster_name( + "static_cluster_baz"); + primary_clusters.insert("static_cluster_baz"); + + { + EXPECT_CALL(cm_, primaryClusters()).Times(2).WillRepeatedly(ReturnRef(primary_clusters)); + EXPECT_CALL(cm_, grpcAsyncClientManager()).WillRepeatedly(ReturnRef(cm_.async_client_manager_)); + + EXPECT_THROW_WITH_REGEX(subscriptionFromConfigSource(config), EnvoyException, + fmt::format("{}::.DELTA_.GRPC must have no " + "more than 2 gRPC services specified:", + config.mutable_api_config_source()->GetTypeName())); + } +} + TEST_F(SubscriptionFactoryTest, FilesystemSubscription) { envoy::config::core::v3::ConfigSource config; std::string test_path = TestEnvironment::temporaryDirectory(); diff --git a/test/extensions/config_subscription/grpc/BUILD b/test/extensions/config_subscription/grpc/BUILD index a5fd0ef8df0a..b3f06ff1c743 100644 --- a/test/extensions/config_subscription/grpc/BUILD +++ b/test/extensions/config_subscription/grpc/BUILD @@ -261,3 +261,21 @@ envoy_cc_test( "@envoy_api//envoy/service/discovery/v3:pkg_cc_proto", ], ) + +envoy_cc_test( + name = "xds_failover_integration_test", + srcs = + ["xds_failover_integration_test.cc"], + deps = [ + "//source/common/config:protobuf_link_hacks", + "//source/common/protobuf:utility_lib", + "//test/common/grpc:grpc_client_integration_lib", + "//test/integration:ads_integration_lib", + "//test/integration:http_integration_lib", + "//test/test_common:network_utility_lib", + "//test/test_common:resources_lib", + "//test/test_common:utility_lib", + "@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto", + "@envoy_api//envoy/extensions/transport_sockets/tls/v3:pkg_cc_proto", + ], +) diff --git a/test/extensions/config_subscription/grpc/xds_failover_integration_test.cc b/test/extensions/config_subscription/grpc/xds_failover_integration_test.cc new file mode 100644 index 000000000000..7b30041ddcb7 --- /dev/null +++ b/test/extensions/config_subscription/grpc/xds_failover_integration_test.cc @@ -0,0 +1,575 @@ +#include "envoy/config/bootstrap/v3/bootstrap.pb.h" +#include "envoy/extensions/transport_sockets/tls/v3/cert.pb.h" + +#include "source/common/common/logger.h" +#include "source/common/tls/context_config_impl.h" +#include "source/common/tls/server_ssl_socket.h" +#include "source/common/tls/ssl_socket.h" + +#include "test/integration/ads_integration.h" +#include "test/integration/fake_upstream.h" +#include "test/integration/http_integration.h" +#include "test/test_common/environment.h" + +#include "gtest/gtest.h" + +namespace Envoy { + +namespace { +const auto CdsTypeUrl = Config::getTypeUrl(); +const auto EdsTypeUrl = Config::getTypeUrl(); +const auto LdsTypeUrl = Config::getTypeUrl(); +} // namespace + +// Tests the use of Envoy with a primary and failover sources. +class XdsFailoverAdsIntegrationTest : public AdsDeltaSotwIntegrationSubStateParamTest, + public HttpIntegrationTest { +public: + XdsFailoverAdsIntegrationTest() + : HttpIntegrationTest( + Http::CodecType::HTTP2, ipVersion(), + ConfigHelper::adsBootstrap((sotwOrDelta() == Grpc::SotwOrDelta::Sotw) || + (sotwOrDelta() == Grpc::SotwOrDelta::UnifiedSotw) + ? "GRPC" + : "DELTA_GRPC")) { + config_helper_.addRuntimeOverride("envoy.restart_features.xds_failover_support", "true"); + config_helper_.addRuntimeOverride("envoy.reloadable_features.unified_mux", + (sotwOrDelta() == Grpc::SotwOrDelta::UnifiedSotw || + sotwOrDelta() == Grpc::SotwOrDelta::UnifiedDelta) + ? "true" + : "false"); + use_lds_ = false; + create_xds_upstream_ = true; + tls_xds_upstream_ = true; + sotw_or_delta_ = sotwOrDelta(); + setUpstreamProtocol(Http::CodecType::HTTP2); + } + + void TearDown() override { + cleanUpConnection(failover_xds_connection_); + cleanUpXdsConnection(); + } + + void initialize() override { initialize(true); } + + void initialize(bool failover_defined) { + failover_defined_ = failover_defined; + + config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + // Configure the primary ADS gRPC. + auto* ads_config = bootstrap.mutable_dynamic_resources()->mutable_ads_config(); + auto* grpc_service = ads_config->add_grpc_services(); + setGrpcService(*grpc_service, "ads_cluster", xds_upstream_->localAddress()); + auto* ads_cluster = bootstrap.mutable_static_resources()->add_clusters(); + ads_cluster->MergeFrom(bootstrap.static_resources().clusters()[0]); + ads_cluster->set_name("ads_cluster"); + envoy::extensions::transport_sockets::tls::v3::UpstreamTlsContext context; + auto* validation_context = context.mutable_common_tls_context()->mutable_validation_context(); + validation_context->mutable_trusted_ca()->set_filename( + TestEnvironment::runfilesPath("test/config/integration/certs/upstreamcacert.pem")); + auto* san_matcher = validation_context->add_match_typed_subject_alt_names(); + san_matcher->mutable_matcher()->set_suffix("lyft.com"); + san_matcher->set_san_type( + envoy::extensions::transport_sockets::tls::v3::SubjectAltNameMatcher::DNS); + if (clientType() == Grpc::ClientType::GoogleGrpc) { + auto* google_grpc = grpc_service->mutable_google_grpc(); + auto* ssl_creds = google_grpc->mutable_channel_credentials()->mutable_ssl_credentials(); + ssl_creds->mutable_root_certs()->set_filename( + TestEnvironment::runfilesPath("test/config/integration/certs/upstreamcacert.pem")); + } + ads_cluster->mutable_transport_socket()->set_name("envoy.transport_sockets.tls"); + ads_cluster->mutable_transport_socket()->mutable_typed_config()->PackFrom(context); + if (failover_defined_) { + // Configure the API to use a failover gRPC service. + auto* failover_grpc_service = ads_config->add_grpc_services(); + setGrpcService(*failover_grpc_service, "failover_ads_cluster", + failover_xds_upstream_->localAddress()); + auto* failover_ads_cluster = bootstrap.mutable_static_resources()->add_clusters(); + failover_ads_cluster->MergeFrom(bootstrap.static_resources().clusters()[0]); + failover_ads_cluster->set_name("failover_ads_cluster"); + envoy::extensions::transport_sockets::tls::v3::UpstreamTlsContext failover_context; + auto* failover_validation_context = + failover_context.mutable_common_tls_context()->mutable_validation_context(); + failover_validation_context->mutable_trusted_ca()->set_filename( + TestEnvironment::runfilesPath("test/config/integration/certs/upstreamcacert.pem")); + auto* failover_san_matcher = + failover_validation_context->add_match_typed_subject_alt_names(); + failover_san_matcher->mutable_matcher()->set_suffix("lyft.com"); + failover_san_matcher->set_san_type( + envoy::extensions::transport_sockets::tls::v3::SubjectAltNameMatcher::DNS); + if (clientType() == Grpc::ClientType::GoogleGrpc) { + auto* failover_google_grpc = failover_grpc_service->mutable_google_grpc(); + auto* failover_ssl_creds = + failover_google_grpc->mutable_channel_credentials()->mutable_ssl_credentials(); + failover_ssl_creds->mutable_root_certs()->set_filename( + TestEnvironment::runfilesPath("test/config/integration/certs/upstreamcacert.pem")); + } + failover_ads_cluster->mutable_transport_socket()->set_name("envoy.transport_sockets.tls"); + failover_ads_cluster->mutable_transport_socket()->mutable_typed_config()->PackFrom( + failover_context); + } + }); + HttpIntegrationTest::initialize(); + // Do not respond to the initial primary stream request. + } + + void createFailoverXdsUpstream() { + if (tls_xds_upstream_ == false) { + addFakeUpstream(Http::CodecType::HTTP2); + } else { + envoy::extensions::transport_sockets::tls::v3::DownstreamTlsContext tls_context; + auto* common_tls_context = tls_context.mutable_common_tls_context(); + common_tls_context->add_alpn_protocols(Http::Utility::AlpnNames::get().Http2); + auto* tls_cert = common_tls_context->add_tls_certificates(); + tls_cert->mutable_certificate_chain()->set_filename( + TestEnvironment::runfilesPath("test/config/integration/certs/upstreamcert.pem")); + tls_cert->mutable_private_key()->set_filename( + TestEnvironment::runfilesPath("test/config/integration/certs/upstreamkey.pem")); + auto cfg = *Extensions::TransportSockets::Tls::ServerContextConfigImpl::create( + tls_context, factory_context_); + // upstream_stats_store_ should have been initialized be prior call to + // BaseIntegrationTest::createXdsUpstream(). + ASSERT(upstream_stats_store_ != nullptr); + auto context = *Extensions::TransportSockets::Tls::ServerSslSocketFactory::create( + std::move(cfg), context_manager_, *upstream_stats_store_->rootScope(), + std::vector{}); + addFakeUpstream(std::move(context), Http::CodecType::HTTP2, /*autonomous_upstream=*/false); + } + failover_xds_upstream_ = fake_upstreams_.back().get(); + } + + void initializeFailoverXdsStream() { + if (failover_xds_stream_ == nullptr) { + auto result = failover_xds_connection_->waitForNewStream(*dispatcher_, failover_xds_stream_); + RELEASE_ASSERT(result, result.message()); + failover_xds_stream_->startGrpcStream(); + } + } + + void createXdsUpstream() override { + BaseIntegrationTest::createXdsUpstream(); + // Setup the failover xDS upstream. + createFailoverXdsUpstream(); + } + + void cleanUpConnection(FakeHttpConnectionPtr& connection) { + if (connection != nullptr) { + AssertionResult result = connection->close(); + RELEASE_ASSERT(result, result.message()); + result = connection->waitForDisconnect(); + RELEASE_ASSERT(result, result.message()); + connection.reset(); + } + } + + // Waits for a primary source connected and immediately disconnects. + void primaryConnectionFailure() { + AssertionResult result = xds_upstream_->waitForHttpConnection(*dispatcher_, xds_connection_); + RELEASE_ASSERT(result, result.message()); + result = xds_connection_->close(); + RELEASE_ASSERT(result, result.message()); + } + + envoy::config::endpoint::v3::ClusterLoadAssignment + buildClusterLoadAssignment(const std::string& name) { + return ConfigHelper::buildClusterLoadAssignment( + name, Network::Test::getLoopbackAddressString(ipVersion()), + fake_upstreams_[0]->localAddress()->ip()->port()); + } + + void makeSingleRequest() { + registerTestServerPorts({"http"}); + testRouterHeaderOnlyRequestAndResponse(); + cleanupUpstreamAndDownstream(); + } + + envoy::config::listener::v3::Listener buildSimpleListener(const std::string& listener_name, + const std::string& cluster_name) { + std::string hcm = fmt::format( + R"EOF( + filters: + - name: http + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: {} + codec_type: HTTP2 + route_config: + name: route_config_1 + virtual_hosts: + - name: integration + domains: ["*"] + routes: + - match: {{ prefix: "/" }} + route: {{ cluster: "{}" }} + http_filters: + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + )EOF", + "ads_test", cluster_name); + return ConfigHelper::buildBaseListener( + listener_name, Network::Test::getLoopbackAddressString(ipVersion()), hcm); + } + + void validateAllXdsResponsesAndDataplaneRequest(FakeStream* xds_stream) { + EXPECT_TRUE(compareDiscoveryRequest(CdsTypeUrl, "", {}, {}, {}, true, + Grpc::Status::WellKnownGrpcStatus::Ok, "", xds_stream)); + sendDiscoveryResponse( + CdsTypeUrl, {ConfigHelper::buildCluster("cluster_0")}, + {ConfigHelper::buildCluster("cluster_0")}, {}, "1", {}, xds_stream); + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 1); + test_server_->waitForGaugeEq("cluster.cluster_0.warming_state", 1); + EXPECT_TRUE(compareDiscoveryRequest(EdsTypeUrl, "", {"cluster_0"}, {"cluster_0"}, {}, false, + Grpc::Status::WellKnownGrpcStatus::Ok, "", xds_stream)); + sendDiscoveryResponse( + EdsTypeUrl, {buildClusterLoadAssignment("cluster_0")}, + {buildClusterLoadAssignment("cluster_0")}, {}, "1", {}, xds_stream); + + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0); + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 2); + test_server_->waitForGaugeEq("cluster.cluster_0.warming_state", 0); + + EXPECT_TRUE(compareDiscoveryRequest(CdsTypeUrl, "1", {}, {}, {}, false, + Grpc::Status::WellKnownGrpcStatus::Ok, "", xds_stream)); + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Listener, "", {}, {}, {}, false, + Grpc::Status::WellKnownGrpcStatus::Ok, "", xds_stream)); + + sendDiscoveryResponse( + LdsTypeUrl, {buildSimpleListener("listener_0", "cluster_0")}, + {buildSimpleListener("listener_0", "cluster_0")}, {}, "1", {}, xds_stream); + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "1", + {"cluster_0"}, {}, {}, false, + Grpc::Status::WellKnownGrpcStatus::Ok, "", xds_stream)); + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Listener, "1", {}, {}, {}, false, + Grpc::Status::WellKnownGrpcStatus::Ok, "", xds_stream)); + + test_server_->waitForCounterGe("listener_manager.listener_create_success", 1); + makeSingleRequest(); + } + + bool failover_defined_; + // Holds the failover xDS server data (if needed). + FakeUpstream* failover_xds_upstream_; + FakeHttpConnectionPtr failover_xds_connection_; + FakeStreamPtr failover_xds_stream_; +}; + +INSTANTIATE_TEST_SUITE_P(IpVersionsClientTypeDeltaWildcard, XdsFailoverAdsIntegrationTest, + DELTA_SOTW_UNIFIED_GRPC_CLIENT_INTEGRATION_PARAMS); + +// Validate that when there's no failover defined (but runtime flag enabled), +// the primary is used. +TEST_P(XdsFailoverAdsIntegrationTest, NoFailoverBasic) { + initialize(false); + + createXdsConnection(); + AssertionResult result = xds_connection_->waitForNewStream(*dispatcher_, xds_stream_); + xds_stream_->startGrpcStream(); + + // Ensure basic flow using the primary (when failover is not defined) works. + validateAllXdsResponsesAndDataplaneRequest(xds_stream_.get()); +} + +// Validate that when there's failover defined and the primary is available, +// then a connection to the failover won't be attempted. +TEST_P(XdsFailoverAdsIntegrationTest, FailoverNotAttemptedWhenPrimaryAvailable) { + // This test does not work when GoogleGrpc is used, because GoogleGrpc + // attempts to create the connection when the client is created (not when it + // actually attempts to send a request). This is due to the call to GetChannel: + // https://github.com/envoyproxy/envoy/blob/c9848c7be992c489a16a3218ec4bf373c9616d70/source/common/grpc/google_async_client_impl.cc#L106 + SKIP_IF_GRPC_CLIENT(Grpc::ClientType::GoogleGrpc); + initialize(); + + createXdsConnection(); + AssertionResult result = xds_connection_->waitForNewStream(*dispatcher_, xds_stream_); + xds_stream_->startGrpcStream(); + + // Ensure basic flow using the primary (while failover is defined) works. + validateAllXdsResponsesAndDataplaneRequest(xds_stream_.get()); + + // A failover connection should not be attempted, so a failure is expected here. + // Setting smaller timeout to avoid long execution times. + EXPECT_FALSE(failover_xds_upstream_->waitForHttpConnection(*dispatcher_, failover_xds_connection_, + std::chrono::seconds(5))); +} + +// Validates that when there's a failover defined, and the primary isn't responding, +// then Envoy will use the failover, and will receive a valid config. +TEST_P(XdsFailoverAdsIntegrationTest, StartupPrimaryNotResponding) { + initialize(); + + // Expect a connection to the primary. Reject the connection immediately. + primaryConnectionFailure(); + + // Expect another connection attempt to the primary. Reject the stream (gRPC failure) immediately. + // As this is a 2nd consecutive failure, it will trigger failover. + primaryConnectionFailure(); + + AssertionResult result = + failover_xds_upstream_->waitForHttpConnection(*dispatcher_, failover_xds_connection_); + RELEASE_ASSERT(result, result.message()); + // Failover is healthy, start the ADS gRPC stream. + result = failover_xds_connection_->waitForNewStream(*dispatcher_, failover_xds_stream_); + RELEASE_ASSERT(result, result.message()); + failover_xds_stream_->startGrpcStream(); + + // Ensure basic flow using the failover source works. + validateAllXdsResponsesAndDataplaneRequest(failover_xds_stream_.get()); +} + +// Validates that when there's a failover defined, and the primary returns a +// gRPC failure, then Envoy will use the failover, and will receive a valid config. +TEST_P(XdsFailoverAdsIntegrationTest, StartupPrimaryGrpcFailure) { +#ifdef ENVOY_ENABLE_UHV + // With UHV the finishGrpcStream() isn't detected as invalid frame because of + // no ":status" header, unless "envoy.reloadable_features.enable_universal_header_validator" + // is also enabled. + config_helper_.addRuntimeOverride("envoy.reloadable_features.enable_universal_header_validator", + "true"); +#endif + initialize(); + + // Expect a connection to the primary. Send a gRPC failure immediately. + AssertionResult result = xds_upstream_->waitForHttpConnection(*dispatcher_, xds_connection_); + RELEASE_ASSERT(result, result.message()); + result = xds_connection_->waitForNewStream(*dispatcher_, xds_stream_); + RELEASE_ASSERT(result, result.message()); + xds_stream_->finishGrpcStream(Grpc::Status::Internal); + + // Second attempt to the primary. + // When using GoogleGrpc the same connection is reused, whereas for EnvoyGrpc + // a new connection will be established. + if (clientType() == Grpc::ClientType::EnvoyGrpc) { + result = xds_upstream_->waitForHttpConnection(*dispatcher_, xds_connection_); + RELEASE_ASSERT(result, result.message()); + } + result = xds_connection_->waitForNewStream(*dispatcher_, xds_stream_); + RELEASE_ASSERT(result, result.message()); + xds_stream_->finishGrpcStream(Grpc::Status::Internal); + + ASSERT(failover_xds_connection_ == nullptr); + result = failover_xds_upstream_->waitForHttpConnection(*dispatcher_, failover_xds_connection_); + RELEASE_ASSERT(result, result.message()); + // Failover is healthy, start the ADS gRPC stream. + result = failover_xds_connection_->waitForNewStream(*dispatcher_, failover_xds_stream_); + RELEASE_ASSERT(result, result.message()); + failover_xds_stream_->startGrpcStream(); + + // Ensure basic flow using the failover source works. + validateAllXdsResponsesAndDataplaneRequest(failover_xds_stream_.get()); +} + +// Validates that when there's a failover defined, and the primary returns a +// gRPC failure after sending headers, then Envoy will use the failover, and will receive a valid +// config. +TEST_P(XdsFailoverAdsIntegrationTest, StartupPrimaryGrpcFailureAfterHeaders) { +#ifdef ENVOY_ENABLE_UHV + // With UHV the finishGrpcStream() isn't detected as invalid frame because of + // no ":status" header, unless "envoy.reloadable_features.enable_universal_header_validator" + // is also enabled. + config_helper_.addRuntimeOverride("envoy.reloadable_features.enable_universal_header_validator", + "true"); +#endif + initialize(); + + // Expect a connection to the primary. Send a gRPC failure immediately. + AssertionResult result = xds_upstream_->waitForHttpConnection(*dispatcher_, xds_connection_); + RELEASE_ASSERT(result, result.message()); + result = xds_connection_->waitForNewStream(*dispatcher_, xds_stream_); + RELEASE_ASSERT(result, result.message()); + xds_stream_->startGrpcStream(); + xds_stream_->finishGrpcStream(Grpc::Status::Internal); + + // Second attempt to the primary, reusing stream as headers were previously + // sent. + result = xds_connection_->waitForNewStream(*dispatcher_, xds_stream_); + RELEASE_ASSERT(result, result.message()); + xds_stream_->startGrpcStream(); + xds_stream_->finishGrpcStream(Grpc::Status::Internal); + + ASSERT(failover_xds_connection_ == nullptr); + result = failover_xds_upstream_->waitForHttpConnection(*dispatcher_, failover_xds_connection_); + RELEASE_ASSERT(result, result.message()); + // Failover is healthy, start the ADS gRPC stream. + result = failover_xds_connection_->waitForNewStream(*dispatcher_, failover_xds_stream_); + RELEASE_ASSERT(result, result.message()); + failover_xds_stream_->startGrpcStream(); + + // Ensure basic flow using the failover source works. + validateAllXdsResponsesAndDataplaneRequest(failover_xds_stream_.get()); +} + +// Validate that once primary answers, failover will not be used, even after disconnecting. +TEST_P(XdsFailoverAdsIntegrationTest, NoFailoverUseAfterPrimaryResponse) { +#ifdef ENVOY_ENABLE_UHV + // With UHV the finishGrpcStream() isn't detected as invalid frame because of + // no ":status" header, unless "envoy.reloadable_features.enable_universal_header_validator" + // is also enabled. + config_helper_.addRuntimeOverride("envoy.reloadable_features.enable_universal_header_validator", + "true"); +#endif + initialize(); + + // Let the primary source respond. + createXdsConnection(); + AssertionResult result = xds_connection_->waitForNewStream(*dispatcher_, xds_stream_); + xds_stream_->startGrpcStream(); + + // Ensure basic flow with failover works. + EXPECT_TRUE(compareDiscoveryRequest(CdsTypeUrl, "", {}, {}, {}, true, + Grpc::Status::WellKnownGrpcStatus::Ok, "", + xds_stream_.get())); + sendDiscoveryResponse( + CdsTypeUrl, {ConfigHelper::buildCluster("cluster_0")}, + {ConfigHelper::buildCluster("cluster_0")}, {}, "1", {}, xds_stream_.get()); + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 1); + test_server_->waitForGaugeEq("cluster.cluster_0.warming_state", 1); + + // Envoy has received a CDS response, it means the primary is available. + // Now disconnect the primary. + xds_stream_->finishGrpcStream(Grpc::Status::Internal); + + // In this case (received a response), both EnvoyGrpc and GoogleGrpc keep the connection open. + result = xds_connection_->waitForNewStream(*dispatcher_, xds_stream_); + RELEASE_ASSERT(result, result.message()); + // Immediately fail the connection. + xds_stream_->finishGrpcStream(Grpc::Status::Internal); + + // Ensure that Envoy still attempts to connect to the primary, + // and keep disconnecting a few times and validate that the failover + // connection isn't attempted. + for (int i = 3; i < 5; ++i) { + // EnvoyGrpc will disconnect if the gRPC stream is immediately closed (as + // done above). + if (clientType() == Grpc::ClientType::EnvoyGrpc) { + result = xds_upstream_->waitForHttpConnection(*dispatcher_, xds_connection_); + RELEASE_ASSERT(result, result.message()); + } + result = xds_connection_->waitForNewStream(*dispatcher_, xds_stream_); + RELEASE_ASSERT(result, result.message()); + // Immediately fail the connection. + xds_stream_->finishGrpcStream(Grpc::Status::Internal); + } + + // When GoogleGrpc is used, a connection to the failover_xds_upstream will be + // attempted, but no stream will be created. When EnvoyGrpc is used, no + // connection to the failover will be attempted. + if (clientType() == Grpc::ClientType::EnvoyGrpc) { + // A failover connection should not be attempted, so a failure is expected here. + // Setting smaller timeout to avoid long execution times. + EXPECT_FALSE(failover_xds_upstream_->waitForHttpConnection( + *dispatcher_, failover_xds_connection_, std::chrono::seconds(1))); + } else { + result = failover_xds_upstream_->waitForHttpConnection(*dispatcher_, failover_xds_connection_); + RELEASE_ASSERT(result, result.message()); + EXPECT_FALSE(failover_xds_connection_->waitForNewStream(*dispatcher_, failover_xds_stream_, + std::chrono::seconds(1))); + } + + // Allow a connection to the primary. + // Expect a connection to the primary when using EnvoyGrpc. + // In case GoogleGrpc is used the current connection will be reused (new stream). + if (clientType() == Grpc::ClientType::EnvoyGrpc) { + result = xds_upstream_->waitForHttpConnection(*dispatcher_, xds_connection_); + RELEASE_ASSERT(result, result.message()); + } + result = xds_connection_->waitForNewStream(*dispatcher_, xds_stream_); + xds_stream_->startGrpcStream(); + + // The rest will be a normal primary source xDS back and forth. + validateAllXdsResponsesAndDataplaneRequest(xds_stream_.get()); +} + +// Validate that once failover answers, primary will not be used, even after disconnecting. +TEST_P(XdsFailoverAdsIntegrationTest, NoPrimaryUseAfterFailoverResponse) { +#ifdef ENVOY_ENABLE_UHV + // With UHV the finishGrpcStream() isn't detected as invalid frame because of + // no ":status" header, unless "envoy.reloadable_features.enable_universal_header_validator" + // is also enabled. + config_helper_.addRuntimeOverride("envoy.reloadable_features.enable_universal_header_validator", + "true"); +#endif + initialize(); + + // 2 consecutive primary failures. + // Expect a connection to the primary. Reject the connection immediately. + primaryConnectionFailure(); + // Expect another connection attempt to the primary. Reject the stream (gRPC failure) immediately. + // As this is a 2nd consecutive failure, it will trigger failover. + primaryConnectionFailure(); + + AssertionResult result = + failover_xds_upstream_->waitForHttpConnection(*dispatcher_, failover_xds_connection_); + RELEASE_ASSERT(result, result.message()); + // Failover is healthy, start the ADS gRPC stream. + result = failover_xds_connection_->waitForNewStream(*dispatcher_, failover_xds_stream_); + RELEASE_ASSERT(result, result.message()); + failover_xds_stream_->startGrpcStream(); + + // Ensure basic flow with failover works. + EXPECT_TRUE(compareDiscoveryRequest(CdsTypeUrl, "", {}, {}, {}, true, + Grpc::Status::WellKnownGrpcStatus::Ok, "", + failover_xds_stream_.get())); + sendDiscoveryResponse( + CdsTypeUrl, {ConfigHelper::buildCluster("cluster_0")}, + {ConfigHelper::buildCluster("cluster_0")}, {}, "1", {}, failover_xds_stream_.get()); + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 1); + test_server_->waitForGaugeEq("cluster.cluster_0.warming_state", 1); + + // Envoy has received a CDS response, it means the primary is available. + // Now disconnect the primary. + failover_xds_stream_->finishGrpcStream(Grpc::Status::Internal); + + // In this case (received a response), both EnvoyGrpc and GoogleGrpc keep the connection open. + result = failover_xds_connection_->waitForNewStream(*dispatcher_, failover_xds_stream_); + RELEASE_ASSERT(result, result.message()); + // Immediately fail the connection. + failover_xds_stream_->finishGrpcStream(Grpc::Status::Internal); + + // Ensure that Envoy still attempts to connect to the primary, + // and keep disconnecting a few times and validate that the failover + // connection isn't attempted. + for (int i = 3; i < 5; ++i) { + // EnvoyGrpc will disconnect if the gRPC stream is immediately closed (as + // done above). + if (clientType() == Grpc::ClientType::EnvoyGrpc) { + result = + failover_xds_upstream_->waitForHttpConnection(*dispatcher_, failover_xds_connection_); + RELEASE_ASSERT(result, result.message()); + } + result = failover_xds_connection_->waitForNewStream(*dispatcher_, failover_xds_stream_); + RELEASE_ASSERT(result, result.message()); + // Immediately fail the connection. + failover_xds_stream_->finishGrpcStream(Grpc::Status::Internal); + } + + // When GoogleGrpc is used, a connection to the (primary) xds_upstream will be + // attempted, but no stream will be created. When EnvoyGrpc is used, no + // connection to the primary will be attempted. + if (clientType() == Grpc::ClientType::EnvoyGrpc) { + // A primary connection should not be attempted, so a failure is expected here. + // Setting smaller timeout to avoid long execution times. + EXPECT_FALSE(xds_upstream_->waitForHttpConnection(*dispatcher_, xds_connection_, + std::chrono::seconds(1))); + } else { + result = xds_upstream_->waitForHttpConnection(*dispatcher_, xds_connection_); + RELEASE_ASSERT(result, result.message()); + EXPECT_FALSE( + xds_connection_->waitForNewStream(*dispatcher_, xds_stream_, std::chrono::seconds(1))); + } + + // Allow a connection to the failover. + // Expect a connection to the failover when using EnvoyGrpc. + // In case GoogleGrpc is used the current connection will be reused (new stream). + if (clientType() == Grpc::ClientType::EnvoyGrpc) { + result = failover_xds_upstream_->waitForHttpConnection(*dispatcher_, failover_xds_connection_); + RELEASE_ASSERT(result, result.message()); + } + result = failover_xds_connection_->waitForNewStream(*dispatcher_, failover_xds_stream_); + failover_xds_stream_->startGrpcStream(); + + // The rest will be a normal failover source xDS back and forth. + validateAllXdsResponsesAndDataplaneRequest(failover_xds_stream_.get()); +} +} // namespace Envoy diff --git a/test/integration/base_integration_test.h b/test/integration/base_integration_test.h index b6cc8fb7336d..6278768d6d2d 100644 --- a/test/integration/base_integration_test.h +++ b/test/integration/base_integration_test.h @@ -167,7 +167,7 @@ class BaseIntegrationTest : protected Logger::Loggable { envoy::config::core::v3::Node last_node_; // Functions for testing reloadable config (xDS) - void createXdsUpstream(); + virtual void createXdsUpstream(); void createXdsConnection(); void cleanUpXdsConnection();