Skip to content

Commit

Permalink
xds-failover: plumb the API and the mux to the failover
Browse files Browse the repository at this point in the history
Signed-off-by: Adi Suissa-Peleg <[email protected]>
  • Loading branch information
adisuissa committed May 29, 2024
1 parent 72d653e commit 9517c44
Show file tree
Hide file tree
Showing 39 changed files with 920 additions and 212 deletions.
5 changes: 3 additions & 2 deletions envoy/config/subscription_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ class MuxFactory : public Config::UntypedFactory {
std::string category() const override { return "envoy.config_mux"; }
virtual void shutdownAll() PURE;
virtual std::shared_ptr<GrpcMux>
create(std::unique_ptr<Grpc::RawAsyncClient>&& async_client, Event::Dispatcher& dispatcher,
Random::RandomGenerator& random, Stats::Scope& scope,
create(std::unique_ptr<Grpc::RawAsyncClient>&& async_client,
std::unique_ptr<Grpc::RawAsyncClient>&& async_failover_client,
Event::Dispatcher& dispatcher, Random::RandomGenerator& random, Stats::Scope& scope,
const envoy::config::core::v3::ApiConfigSource& ads_config,
const LocalInfo::LocalInfo& local_info,
std::unique_ptr<CustomConfigValidators>&& config_validators,
Expand Down
59 changes: 44 additions & 15 deletions source/common/config/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include "source/common/common/assert.h"
#include "source/common/protobuf/utility.h"

#include "absl/status/status.h"

namespace Envoy {
namespace Config {

Expand Down Expand Up @@ -68,11 +70,13 @@ namespace {
/**
* Check the grpc_services and cluster_names for API config sanity. Throws on error.
* @param api_config_source the config source to validate.
* @param max_grpc_services the maximal number of grpc services allowed.
* @return an invalid status when an API config has the wrong number of gRPC
* services or cluster names, depending on expectations set by its API type.
*/
absl::Status
checkApiConfigSourceNames(const envoy::config::core::v3::ApiConfigSource& api_config_source) {
checkApiConfigSourceNames(const envoy::config::core::v3::ApiConfigSource& api_config_source,
int max_grpc_services) {
const bool is_grpc =
(api_config_source.api_type() == envoy::config::core::v3::ApiConfigSource::GRPC ||
api_config_source.api_type() == envoy::config::core::v3::ApiConfigSource::DELTA_GRPC);
Expand All @@ -89,10 +93,10 @@ checkApiConfigSourceNames(const envoy::config::core::v3::ApiConfigSource& api_co
fmt::format("{}::(DELTA_)GRPC must not have a cluster name specified: {}",
api_config_source.GetTypeName(), api_config_source.DebugString()));
}
if (api_config_source.grpc_services().size() > 1) {
return absl::InvalidArgumentError(
fmt::format("{}::(DELTA_)GRPC must have a single gRPC service specified: {}",
api_config_source.GetTypeName(), api_config_source.DebugString()));
if (api_config_source.grpc_services_size() > max_grpc_services) {
return absl::InvalidArgumentError(fmt::format(
"{}::(DELTA_)GRPC must have no more than {} gRPC services specified: {}",
api_config_source.GetTypeName(), max_grpc_services, api_config_source.DebugString()));
}
} else {
if (!api_config_source.grpc_services().empty()) {
Expand Down Expand Up @@ -133,7 +137,9 @@ absl::Status Utility::checkApiConfigSourceSubscriptionBackingCluster(
envoy::config::core::v3::ApiConfigSource::AGGREGATED_DELTA_GRPC) {
return absl::OkStatus();
}
RETURN_IF_NOT_OK(checkApiConfigSourceNames(api_config_source));
RETURN_IF_NOT_OK(checkApiConfigSourceNames(
api_config_source,
Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support") ? 2 : 1));

const bool is_grpc =
(api_config_source.api_type() == envoy::config::core::v3::ApiConfigSource::GRPC);
Expand All @@ -153,6 +159,14 @@ absl::Status Utility::checkApiConfigSourceSubscriptionBackingCluster(
primary_clusters, api_config_source.grpc_services()[0].envoy_grpc().cluster_name(),
api_config_source.GetTypeName()));
}
if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support") &&
api_config_source.grpc_services_size() == 2 &&
api_config_source.grpc_services()[1].has_envoy_grpc()) {
// If an Envoy failover gRPC exists, we validate its cluster name.
RETURN_IF_NOT_OK(Utility::validateClusterName(
primary_clusters, api_config_source.grpc_services()[1].envoy_grpc().cluster_name(),
api_config_source.GetTypeName()));
}
}
// Otherwise, there is no cluster name to validate.
return absl::OkStatus();
Expand All @@ -161,15 +175,23 @@ absl::Status Utility::checkApiConfigSourceSubscriptionBackingCluster(
absl::optional<std::string>
Utility::getGrpcControlPlane(const envoy::config::core::v3::ApiConfigSource& api_config_source) {
if (api_config_source.grpc_services_size() > 0) {
// Only checking for the first entry in grpc_services, because Envoy's xDS implementation
// currently only considers the first gRPC endpoint and ignores any other xDS management servers
// specified in an ApiConfigSource.
std::string res = "";
// In case more than one grpc service is defined, concatenate the names for
// a unique GrpcControlPlane identifier.
if (api_config_source.grpc_services(0).has_envoy_grpc()) {
return api_config_source.grpc_services(0).envoy_grpc().cluster_name();
res = api_config_source.grpc_services(0).envoy_grpc().cluster_name();
} else if (api_config_source.grpc_services(0).has_google_grpc()) {
res = api_config_source.grpc_services(0).google_grpc().target_uri();
}
if (api_config_source.grpc_services(0).has_google_grpc()) {
return api_config_source.grpc_services(0).google_grpc().target_uri();
// Concatenate the failover gRPC service.
if (api_config_source.grpc_services_size() == 2) {
if (api_config_source.grpc_services(1).has_envoy_grpc()) {
absl::StrAppend(&res, ",", api_config_source.grpc_services(1).envoy_grpc().cluster_name());
} else if (api_config_source.grpc_services(1).has_google_grpc()) {
absl::StrAppend(&res, ",", api_config_source.grpc_services(1).google_grpc().target_uri());
}
}
return res;
}
return absl::nullopt;
}
Expand Down Expand Up @@ -204,8 +226,10 @@ Utility::parseRateLimitSettings(const envoy::config::core::v3::ApiConfigSource&
absl::StatusOr<Grpc::AsyncClientFactoryPtr> Utility::factoryForGrpcApiConfigSource(
Grpc::AsyncClientManager& async_client_manager,
const envoy::config::core::v3::ApiConfigSource& api_config_source, Stats::Scope& scope,
bool skip_cluster_check) {
RETURN_IF_NOT_OK(checkApiConfigSourceNames(api_config_source));
bool skip_cluster_check, int grpc_service_idx) {
RETURN_IF_NOT_OK(checkApiConfigSourceNames(
api_config_source,
Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support") ? 2 : 1));

if (api_config_source.api_type() != envoy::config::core::v3::ApiConfigSource::GRPC &&
api_config_source.api_type() != envoy::config::core::v3::ApiConfigSource::DELTA_GRPC) {
Expand All @@ -214,8 +238,13 @@ absl::StatusOr<Grpc::AsyncClientFactoryPtr> Utility::factoryForGrpcApiConfigSour
api_config_source.DebugString()));
}

if (grpc_service_idx >= api_config_source.grpc_services_size()) {
// No returned factory in case there's no entry.
return nullptr;
}

envoy::config::core::v3::GrpcService grpc_service;
grpc_service.MergeFrom(api_config_source.grpc_services(0));
grpc_service.MergeFrom(api_config_source.grpc_services(grpc_service_idx));

return async_client_manager.factoryForGrpcService(grpc_service, scope, skip_cluster_check);
}
Expand Down
7 changes: 5 additions & 2 deletions source/common/config/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -391,12 +391,15 @@ class Utility {
* @param async_client_manager gRPC async client manager.
* @param api_config_source envoy::config::core::v3::ApiConfigSource. Must have config type GRPC.
* @param skip_cluster_check whether to skip cluster validation.
* @return Grpc::AsyncClientFactoryPtr gRPC async client factory.
* @param grpc_service_idx index of the grpc service in the api_config_source. If there's no entry
* in the given index, a nullptr factory will be returned.
* @return Grpc::AsyncClientFactoryPtr gRPC async client factory, or nullptr if there's no
* grpc_service in the given index.
*/
static absl::StatusOr<Grpc::AsyncClientFactoryPtr>
factoryForGrpcApiConfigSource(Grpc::AsyncClientManager& async_client_manager,
const envoy::config::core::v3::ApiConfigSource& api_config_source,
Stats::Scope& scope, bool skip_cluster_check);
Stats::Scope& scope, bool skip_cluster_check, int grpc_service_idx);

/**
* Translate opaque config from google.protobuf.Any to defined proto message.
Expand Down
2 changes: 2 additions & 0 deletions source/common/runtime/runtime_features.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ FALSE_RUNTIME_GUARD(envoy_reloadable_features_prefer_quic_client_udp_gro);
FALSE_RUNTIME_GUARD(envoy_reloadable_features_reresolve_null_addresses);
// TODO(alyssar) evaluate and either make this a config knob or remove.
FALSE_RUNTIME_GUARD(envoy_reloadable_features_reresolve_if_no_connections);
// TODO(adisuissa): flip to true after this is out of alpha mode.
FALSE_RUNTIME_GUARD(envoy_restart_features_xds_failover_support);

// A flag to set the maximum TLS version for google_grpc client to TLS1.2, when needed for
// compliance restrictions.
Expand Down
62 changes: 44 additions & 18 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,25 @@ getOrigin(const Network::TransportSocketOptionsConstSharedPtr& options, HostCons

bool isBlockingAdsCluster(const envoy::config::bootstrap::v3::Bootstrap& bootstrap,
absl::string_view cluster_name) {
bool blocking_ads_cluster = false;
if (bootstrap.dynamic_resources().has_ads_config()) {
const auto& ads_config_source = bootstrap.dynamic_resources().ads_config();
// We only care about EnvoyGrpc, not GoogleGrpc, because we only need to delay ADS mux
// initialization if it uses an Envoy cluster that needs to be initialized first. We don't
// depend on the same cluster initialization when opening a gRPC stream for GoogleGrpc.
return (ads_config_source.grpc_services_size() > 0 &&
ads_config_source.grpc_services(0).has_envoy_grpc() &&
ads_config_source.grpc_services(0).envoy_grpc().cluster_name() == cluster_name);
blocking_ads_cluster =
(ads_config_source.grpc_services_size() > 0 &&
ads_config_source.grpc_services(0).has_envoy_grpc() &&
ads_config_source.grpc_services(0).envoy_grpc().cluster_name() == cluster_name);
if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) {
// Validate the failover server if there is one.
blocking_ads_cluster |=
(ads_config_source.grpc_services_size() == 2 &&
ads_config_source.grpc_services(1).has_envoy_grpc() &&
ads_config_source.grpc_services(1).envoy_grpc().cluster_name() == cluster_name);
}
}
return false;
return blocking_ads_cluster;
}

} // namespace
Expand Down Expand Up @@ -447,14 +456,22 @@ ClusterManagerImpl::initialize(const envoy::config::bootstrap::v3::Bootstrap& bo
if (!factory) {
return absl::InvalidArgumentError(fmt::format("{} not found", name));
}
auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource(
*async_client_manager_, dyn_resources.ads_config(), *stats_.rootScope(), false);
THROW_IF_STATUS_NOT_OK(factory_or_error, throw);
ads_mux_ =
factory->create(factory_or_error.value()->createUncachedRawAsyncClient(), dispatcher_,
random_, *stats_.rootScope(), dyn_resources.ads_config(), local_info_,
std::move(custom_config_validators), std::move(backoff_strategy),
makeOptRefFromPtr(xds_config_tracker_.get()), {}, use_eds_cache);
auto factory_primary_or_error = Config::Utility::factoryForGrpcApiConfigSource(
*async_client_manager_, dyn_resources.ads_config(), *stats_.rootScope(), false, 0);
THROW_IF_STATUS_NOT_OK(factory_primary_or_error, throw);
Grpc::AsyncClientFactoryPtr factory_failover = nullptr;
if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) {
auto factory_failover_or_error = Config::Utility::factoryForGrpcApiConfigSource(
*async_client_manager_, dyn_resources.ads_config(), *stats_.rootScope(), false, 1);
THROW_IF_STATUS_NOT_OK(factory_failover_or_error, throw);
factory_failover = std::move(factory_failover_or_error.value());
}
ads_mux_ = factory->create(
factory_primary_or_error.value()->createUncachedRawAsyncClient(),
factory_failover ? factory_failover->createUncachedRawAsyncClient() : nullptr,
dispatcher_, random_, *stats_.rootScope(), dyn_resources.ads_config(), local_info_,
std::move(custom_config_validators), std::move(backoff_strategy),
makeOptRefFromPtr(xds_config_tracker_.get()), {}, use_eds_cache);
} else {
absl::Status status = Config::Utility::checkTransportVersion(dyn_resources.ads_config());
RETURN_IF_NOT_OK(status);
Expand All @@ -470,12 +487,20 @@ ClusterManagerImpl::initialize(const envoy::config::bootstrap::v3::Bootstrap& bo
if (!factory) {
return absl::InvalidArgumentError(fmt::format("{} not found", name));
}
auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource(
*async_client_manager_, dyn_resources.ads_config(), *stats_.rootScope(), false);
THROW_IF_STATUS_NOT_OK(factory_or_error, throw);
auto factory_primary_or_error = Config::Utility::factoryForGrpcApiConfigSource(
*async_client_manager_, dyn_resources.ads_config(), *stats_.rootScope(), false, 0);
THROW_IF_STATUS_NOT_OK(factory_primary_or_error, throw);
Grpc::AsyncClientFactoryPtr factory_failover = nullptr;
if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) {
auto factory_failover_or_error = Config::Utility::factoryForGrpcApiConfigSource(
*async_client_manager_, dyn_resources.ads_config(), *stats_.rootScope(), false, 1);
THROW_IF_STATUS_NOT_OK(factory_failover_or_error, throw);
factory_failover = std::move(factory_failover_or_error.value());
}
ads_mux_ = factory->create(
factory_or_error.value()->createUncachedRawAsyncClient(), dispatcher_, random_,
*stats_.rootScope(), dyn_resources.ads_config(), local_info_,
factory_primary_or_error.value()->createUncachedRawAsyncClient(),
factory_failover ? factory_failover->createUncachedRawAsyncClient() : nullptr,
dispatcher_, random_, *stats_.rootScope(), dyn_resources.ads_config(), local_info_,
std::move(custom_config_validators), std::move(backoff_strategy),
makeOptRefFromPtr(xds_config_tracker_.get()), xds_delegate_opt_ref, use_eds_cache);
}
Expand Down Expand Up @@ -566,8 +591,9 @@ absl::Status ClusterManagerImpl::initializeSecondaryClusters(

absl::Status status = Config::Utility::checkTransportVersion(load_stats_config);
RETURN_IF_NOT_OK(status);
// TODO(adisuissa): support xDS-Failover in LoadStats reporting.
auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource(
*async_client_manager_, load_stats_config, *stats_.rootScope(), false);
*async_client_manager_, load_stats_config, *stats_.rootScope(), false, 0);
THROW_IF_STATUS_NOT_OK(factory_or_error, throw);
load_stats_reporter_ = std::make_unique<LoadStatsReporter>(
local_info_, *this, *stats_.rootScope(),
Expand Down
2 changes: 2 additions & 0 deletions source/extensions/config_subscription/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ envoy_cc_extension(
deps = [
":eds_resources_cache_lib",
":grpc_mux_context_lib",
":grpc_mux_failover_lib",
":grpc_stream_lib",
":xds_source_id_lib",
"//envoy/config:custom_config_validators_interface",
Expand Down Expand Up @@ -62,6 +63,7 @@ envoy_cc_library(
":delta_subscription_state_lib",
":eds_resources_cache_lib",
":grpc_mux_context_lib",
":grpc_mux_failover_lib",
":grpc_stream_lib",
":pausable_ack_queue_lib",
":watch_map_lib",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@ SubscriptionPtr DeltaGrpcCollectionConfigSubscriptionFactory::create(
THROW_IF_STATUS_NOT_OK(strategy_or_error, throw);
JitteredExponentialBackOffStrategyPtr backoff_strategy = std::move(strategy_or_error.value());

auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource(
data.cm_.grpcAsyncClientManager(), api_config_source, data.scope_, true);
THROW_IF_STATUS_NOT_OK(factory_or_error, throw);
// xDS-Failover is only supported in ADS.
auto factory_primary_or_error = Config::Utility::factoryForGrpcApiConfigSource(
data.cm_.grpcAsyncClientManager(), api_config_source, data.scope_, true, 0);
THROW_IF_STATUS_NOT_OK(factory_primary_or_error, throw);
absl::StatusOr<RateLimitSettings> rate_limit_settings_or_error =
Utility::parseRateLimitSettings(api_config_source);
THROW_IF_STATUS_NOT_OK(rate_limit_settings_or_error, throw);
GrpcMuxContext grpc_mux_context{
factory_or_error.value()->createUncachedRawAsyncClient(),
factory_primary_or_error.value()->createUncachedRawAsyncClient(),
/*failover_async_client_*/ nullptr,
/*dispatcher_=*/data.dispatcher_,
/*service_method_=*/deltaGrpcMethod(data.type_url_),
/*local_info_=*/data.local_info_,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace Config {
// These are parameters needed for the creation of all GrpcMux objects.
struct GrpcMuxContext {
Grpc::RawAsyncClientPtr async_client_;
Grpc::RawAsyncClientPtr failover_async_client_;
Event::Dispatcher& dispatcher_;
const Protobuf::MethodDescriptor& service_method_;
const LocalInfo::LocalInfo& local_info_;
Expand Down
Loading

0 comments on commit 9517c44

Please sign in to comment.