diff --git a/DEPRECATED.md b/DEPRECATED.md index 33eff1bc489a..e5308f46e43e 100644 --- a/DEPRECATED.md +++ b/DEPRECATED.md @@ -10,7 +10,7 @@ A logged warning is expected for each deprecated item that is in deprecation win * Use of the v1 API is deprecated. See envoy-announce [email](https://groups.google.com/forum/#!topic/envoy-announce/oPnYMZw8H4U). -* Use of the legacy +* Use of the legacy [ratelimit.proto](https://github.com/envoyproxy/envoy/blob/b0a518d064c8255e0e20557a8f909b6ff457558f/source/common/ratelimit/ratelimit.proto) is deprecated, in favor of the proto defined in [date-plane-api](https://github.com/envoyproxy/envoy/blob/master/api/envoy/service/ratelimit/v2/rls.proto) @@ -23,6 +23,7 @@ A logged warning is expected for each deprecated item that is in deprecation win is deprecated. Please use the new `upgrade_configs` in the [HttpConnectionManager](https://github.com/envoyproxy/envoy/blob/master/api/envoy/config/filter/network/http_connection_manager/v2/http_connection_manager.proto) instead. +* Setting hosts via `hosts` field in `Cluster` is deprecated. Use `load_assignment` instead. ## Version 1.7.0 diff --git a/api/envoy/api/v2/cds.proto b/api/envoy/api/v2/cds.proto index f649dafe0fb6..ea19212772e9 100644 --- a/api/envoy/api/v2/cds.proto +++ b/api/envoy/api/v2/cds.proto @@ -160,7 +160,13 @@ message Cluster { // :ref:`STRICT_DNS` // or :ref:`LOGICAL_DNS`, // then hosts is required. - repeated core.Address hosts = 7; + // + // .. attention:: + // + // **This field is deprecated**. Set the + // :ref:`load_assignment` field instead. + // + repeated core.Address hosts = 7 [deprecated = true]; // Setting this is required for specifying members of // :ref:`STATIC`, @@ -176,7 +182,6 @@ message Cluster { // :ref:`endpoint assignments`. // Setting this overrides :ref:`hosts` values. // - // [#not-implemented-hide:] ClusterLoadAssignment load_assignment = 33; // Optional :ref:`active health checking ` diff --git a/api/envoy/api/v2/endpoint/endpoint.proto b/api/envoy/api/v2/endpoint/endpoint.proto index 6f4cad1ce66e..bfdc6bcffe1a 100644 --- a/api/envoy/api/v2/endpoint/endpoint.proto +++ b/api/envoy/api/v2/endpoint/endpoint.proto @@ -29,7 +29,7 @@ message Endpoint { // and will be resolved via DNS. core.Address address = 1; - // [#not-implemented-hide:] The optional health check configuration. + // The optional health check configuration. message HealthCheckConfig { // Optional alternative health check port value. // @@ -40,8 +40,8 @@ message Endpoint { uint32 port_value = 1; } - // [#not-implemented-hide:] The optional health check configuration is used as - // configuration for the health checker to contact the health checked host. + // The optional health check configuration is used as configuration for the + // health checker to contact the health checked host. // // .. attention:: // diff --git a/docs/root/configuration/overview/v2_overview.rst b/docs/root/configuration/overview/v2_overview.rst index feb1032a4d4a..3c8e0aff06c9 100644 --- a/docs/root/configuration/overview/v2_overview.rst +++ b/docs/root/configuration/overview/v2_overview.rst @@ -95,7 +95,14 @@ A minimal fully static bootstrap config is provided below: connect_timeout: 0.25s type: STATIC lb_policy: ROUND_ROBIN - hosts: [{ socket_address: { address: 127.0.0.2, port_value: 1234 }}] + load_assignment: + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 1234 Mostly static with dynamic EDS ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -150,7 +157,14 @@ on 127.0.0.3:5678 is provided below: type: STATIC lb_policy: ROUND_ROBIN http2_protocol_options: {} - hosts: [{ socket_address: { address: 127.0.0.3, port_value: 5678 }}] + load_assignment: + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 5678 Notice above that *xds_cluster* is defined to point Envoy at the management server. Even in an otherwise completely dynamic configurations, some static resources need to @@ -214,7 +228,14 @@ below: type: STATIC lb_policy: ROUND_ROBIN http2_protocol_options: {} - hosts: [{ socket_address: { address: 127.0.0.3, port_value: 5678 }}] + load_assignment: + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 5678 The management server could respond to LDS requests with: @@ -543,11 +564,11 @@ the shared ADS channel. Management Server Unreachability -------------------------------- -When Envoy instance looses connectivity with the management server, Envoy will latch on to -the previous configuration while actively retrying in the background to reestablish the -connection with the management server. +When an Envoy instance loses connectivity with the management server, Envoy will latch on to +the previous configuration while actively retrying in the background to reestablish the +connection with the management server. -Envoy debug logs the fact that it is not able to establish a connection with the management server +Envoy debug logs the fact that it is not able to establish a connection with the management server every time it attempts a connection. :ref:`upstream_cx_connect_fail ` a cluster level statistic diff --git a/docs/root/intro/arch_overview/health_checking.rst b/docs/root/intro/arch_overview/health_checking.rst index 6928ac94d669..08812c0b62e5 100644 --- a/docs/root/intro/arch_overview/health_checking.rst +++ b/docs/root/intro/arch_overview/health_checking.rst @@ -24,6 +24,35 @@ unhealthy, successes required before marking a host healthy, etc.): maintenance by setting the specified key to any value and waiting for traffic to drain. See :ref:`redis_key `. +.. _arch_overview_per_cluster_health_check_config: + +Per cluster member health check config +-------------------------------------- + +If active health checking is configured for an upstream cluster, a specific additional configuration +for each registered member can be specified by setting the +:ref:`HealthCheckConfig` +in the :ref:`Endpoint` of an :ref:`LbEndpoint` +of each defined :ref:`LocalityLbEndpoints` in a +:ref:`ClusterLoadAssignment`. + +An example of setting up :ref:`health check config` +to set a :ref:`cluster member`'s alternative health check +:ref:`port` is: + +.. code-block:: yaml + + load_assignment: + endpoints: + - lb_endpoints: + - endpoint: + health_check_config: + port_value: 8080 + address: + socket_address: + address: localhost + port_value: 80 + .. _arch_overview_health_check_logging: Health check event logging diff --git a/source/common/config/utility.cc b/source/common/config/utility.cc index 007204761b8a..37fac934f2ad 100644 --- a/source/common/config/utility.cc +++ b/source/common/config/utility.cc @@ -225,5 +225,20 @@ Grpc::AsyncClientFactoryPtr Utility::factoryForGrpcApiConfigSource( return async_client_manager.factoryForGrpcService(grpc_service, scope, false); } +envoy::api::v2::ClusterLoadAssignment Utility::translateClusterHosts( + const Protobuf::RepeatedPtrField& hosts) { + envoy::api::v2::ClusterLoadAssignment load_assignment; + envoy::api::v2::endpoint::LocalityLbEndpoints* locality_lb_endpoints = + load_assignment.add_endpoints(); + // Since this LocalityLbEndpoints is built from hosts list, set the default weight to 1. + locality_lb_endpoints->mutable_load_balancing_weight()->set_value(1); + for (const envoy::api::v2::core::Address& host : hosts) { + envoy::api::v2::endpoint::LbEndpoint* lb_endpoint = locality_lb_endpoints->add_lb_endpoints(); + lb_endpoint->mutable_endpoint()->mutable_address()->MergeFrom(host); + lb_endpoint->mutable_load_balancing_weight()->set_value(1); + } + return load_assignment; +} + } // namespace Config } // namespace Envoy diff --git a/source/common/config/utility.h b/source/common/config/utility.h index 32797c4f627b..79086504ad94 100644 --- a/source/common/config/utility.h +++ b/source/common/config/utility.h @@ -288,6 +288,14 @@ class Utility { factoryForGrpcApiConfigSource(Grpc::AsyncClientManager& async_client_manager, const envoy::api::v2::core::ApiConfigSource& api_config_source, Stats::Scope& scope); + + /** + * Translate a set of cluster's hosts into a load assignment configuration. + * @param hosts cluster's list of hosts. + * @return envoy::api::v2::ClusterLoadAssignment a load assignment configuration. + */ + static envoy::api::v2::ClusterLoadAssignment + translateClusterHosts(const Protobuf::RepeatedPtrField& hosts); }; } // namespace Config diff --git a/source/common/upstream/eds.cc b/source/common/upstream/eds.cc index 4fb00c0d2e40..46435a6de4d3 100644 --- a/source/common/upstream/eds.cc +++ b/source/common/upstream/eds.cc @@ -140,7 +140,7 @@ bool EdsClusterImpl::updateHostsPerLocality(const uint32_t priority, const HostV info_->name(), host_set.hosts().size(), host_set.priority()); priority_state_manager.updateClusterPrioritySet(priority, std::move(current_hosts_copy), - hosts_added, hosts_removed); + hosts_added, hosts_removed, absl::nullopt); return true; } return false; diff --git a/source/common/upstream/logical_dns_cluster.cc b/source/common/upstream/logical_dns_cluster.cc index 75284b9c31ad..5b3709041b02 100644 --- a/source/common/upstream/logical_dns_cluster.cc +++ b/source/common/upstream/logical_dns_cluster.cc @@ -18,6 +18,7 @@ namespace Upstream { LogicalDnsCluster::LogicalDnsCluster(const envoy::api::v2::Cluster& cluster, Runtime::Loader& runtime, Stats::Store& stats, Ssl::ContextManager& ssl_context_manager, + const LocalInfo::LocalInfo& local_info, Network::DnsResolverSharedPtr dns_resolver, ThreadLocal::SlotAllocator& tls, ClusterManager& cm, Event::Dispatcher& dispatcher, bool added_via_api) @@ -27,10 +28,19 @@ LogicalDnsCluster::LogicalDnsCluster(const envoy::api::v2::Cluster& cluster, dns_refresh_rate_ms_( std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(cluster, dns_refresh_rate, 5000))), tls_(tls.allocateSlot()), - resolve_timer_(dispatcher.createTimer([this]() -> void { startResolve(); })) { - const auto& hosts = cluster.hosts(); - if (hosts.size() != 1) { - throw EnvoyException("logical_dns clusters must have a single host"); + resolve_timer_(dispatcher.createTimer([this]() -> void { startResolve(); })), + local_info_(local_info), + load_assignment_(cluster.has_load_assignment() + ? cluster.load_assignment() + : Config::Utility::translateClusterHosts(cluster.hosts())) { + const auto& locality_lb_endpoints = load_assignment_.endpoints(); + if (locality_lb_endpoints.size() != 1 || locality_lb_endpoints[0].lb_endpoints().size() != 1) { + if (cluster.has_load_assignment()) { + throw EnvoyException( + "LOGICAL_DNS clusters must have a single locality_lb_endpoint and a single lb_endpoint"); + } else { + throw EnvoyException("LOGICAL_DNS clusters must have a single host"); + } } switch (cluster.dns_lookup_family()) { @@ -47,7 +57,8 @@ LogicalDnsCluster::LogicalDnsCluster(const envoy::api::v2::Cluster& cluster, NOT_REACHED_GCOVR_EXCL_LINE; } - const auto& socket_address = hosts[0].socket_address(); + const envoy::api::v2::core::SocketAddress& socket_address = + lbEndpoint().endpoint().address().socket_address(); dns_url_ = fmt::format("tcp://{}:{}", socket_address.address(), socket_address.port_value()); hostname_ = Network::Utility::hostFromTcpUrl(dns_url_); Network::Utility::portFromTcpUrl(dns_url_); @@ -88,7 +99,8 @@ void LogicalDnsCluster::startResolve() { current_resolved_address_ = new_address; // Capture URL to avoid a race with another update. tls_->runOnAllThreads([this, new_address]() -> void { - tls_->getTyped().current_resolved_address_ = new_address; + PerThreadCurrentHostData& data = tls_->getTyped(); + data.current_resolved_address_ = new_address; }); } @@ -107,14 +119,16 @@ void LogicalDnsCluster::startResolve() { new LogicalHost(info_, hostname_, Network::Utility::getIpv6AnyAddress(), *this)); break; } - HostVectorSharedPtr new_hosts(new HostVector()); - new_hosts->emplace_back(logical_host_); - // Given the current config, only EDS clusters support multiple priorities. - ASSERT(priority_set_.hostSetsPerPriority().size() == 1); - auto& first_host_set = priority_set_.getOrCreateHostSet(0); - first_host_set.updateHosts(new_hosts, createHealthyHostList(*new_hosts), - HostsPerLocalityImpl::empty(), HostsPerLocalityImpl::empty(), - {}, *new_hosts, {}); + const auto& locality_lb_endpoint = localityLbEndpoint(); + PriorityStateManager priority_state_manager(*this, local_info_); + priority_state_manager.initializePriorityFor(locality_lb_endpoint); + priority_state_manager.registerHostForPriority(logical_host_, locality_lb_endpoint, + lbEndpoint(), absl::nullopt); + + const uint32_t priority = locality_lb_endpoint.priority(); + priority_state_manager.updateClusterPrioritySet( + priority, std::move(priority_state_manager.priorityState()[priority].first), + absl::nullopt, absl::nullopt, absl::nullopt); } } @@ -131,7 +145,8 @@ Upstream::Host::CreateConnectionData LogicalDnsCluster::LogicalHost::createConne return {HostImpl::createConnection(dispatcher, *parent_.info_, data.current_resolved_address_, options), HostDescriptionConstSharedPtr{ - new RealHostDescription(data.current_resolved_address_, shared_from_this())}}; + new RealHostDescription(data.current_resolved_address_, parent_.localityLbEndpoint(), + parent_.lbEndpoint(), shared_from_this())}}; } } // namespace Upstream diff --git a/source/common/upstream/logical_dns_cluster.h b/source/common/upstream/logical_dns_cluster.h index fa0a94955472..2feec15579b7 100644 --- a/source/common/upstream/logical_dns_cluster.h +++ b/source/common/upstream/logical_dns_cluster.h @@ -30,6 +30,7 @@ class LogicalDnsCluster : public ClusterImplBase { public: LogicalDnsCluster(const envoy::api::v2::Cluster& cluster, Runtime::Loader& runtime, Stats::Store& stats, Ssl::ContextManager& ssl_context_manager, + const LocalInfo::LocalInfo& local_info, Network::DnsResolverSharedPtr dns_resolver, ThreadLocal::SlotAllocator& tls, ClusterManager& cm, Event::Dispatcher& dispatcher, bool added_via_api); @@ -42,9 +43,10 @@ class LogicalDnsCluster : public ClusterImplBase { struct LogicalHost : public HostImpl { LogicalHost(ClusterInfoConstSharedPtr cluster, const std::string& hostname, Network::Address::InstanceConstSharedPtr address, LogicalDnsCluster& parent) - : HostImpl(cluster, hostname, address, envoy::api::v2::core::Metadata::default_instance(), - 1, envoy::api::v2::core::Locality().default_instance(), - envoy::api::v2::endpoint::Endpoint::HealthCheckConfig().default_instance()), + : HostImpl(cluster, hostname, address, parent.lbEndpoint().metadata(), + parent.lbEndpoint().load_balancing_weight().value(), + parent.localityLbEndpoint().locality(), + parent.lbEndpoint().endpoint().health_check_config()), parent_(parent) {} // Upstream::Host @@ -57,10 +59,17 @@ class LogicalDnsCluster : public ClusterImplBase { struct RealHostDescription : public HostDescription { RealHostDescription(Network::Address::InstanceConstSharedPtr address, + const envoy::api::v2::endpoint::LocalityLbEndpoints& locality_lb_endpoint, + const envoy::api::v2::endpoint::LbEndpoint& lb_endpoint, HostConstSharedPtr logical_host) : address_(address), logical_host_(logical_host), - metadata_(std::make_shared( - envoy::api::v2::core::Metadata::default_instance())) {} + metadata_(std::make_shared(lb_endpoint.metadata())), + health_check_address_( + lb_endpoint.endpoint().health_check_config().port_value() == 0 + ? address + : Network::Utility::getAddressWithPort( + *address, lb_endpoint.endpoint().health_check_config().port_value())), + locality_lb_endpoint_(locality_lb_endpoint), lb_endpoint_(lb_endpoint) {} // Upstream:HostDescription bool canary() const override { return false; } @@ -81,21 +90,36 @@ class LogicalDnsCluster : public ClusterImplBase { const std::string& hostname() const override { return logical_host_->hostname(); } Network::Address::InstanceConstSharedPtr address() const override { return address_; } const envoy::api::v2::core::Locality& locality() const override { - return envoy::api::v2::core::Locality().default_instance(); + return locality_lb_endpoint_.locality(); } - // TODO(dio): To support different address port. Network::Address::InstanceConstSharedPtr healthCheckAddress() const override { - return address_; + return health_check_address_; } + uint32_t priority() const { return locality_lb_endpoint_.priority(); } Network::Address::InstanceConstSharedPtr address_; HostConstSharedPtr logical_host_; const std::shared_ptr metadata_; + Network::Address::InstanceConstSharedPtr health_check_address_; + const envoy::api::v2::endpoint::LocalityLbEndpoints& locality_lb_endpoint_; + const envoy::api::v2::endpoint::LbEndpoint& lb_endpoint_; }; struct PerThreadCurrentHostData : public ThreadLocal::ThreadLocalObject { Network::Address::InstanceConstSharedPtr current_resolved_address_; }; + const envoy::api::v2::endpoint::LocalityLbEndpoints& localityLbEndpoint() const { + // This is checked in the constructor, i.e. at config load time. + ASSERT(load_assignment_.endpoints().size() == 1); + return load_assignment_.endpoints()[0]; + } + + const envoy::api::v2::endpoint::LbEndpoint& lbEndpoint() const { + // This is checked in the constructor, i.e. at config load time. + ASSERT(localityLbEndpoint().lb_endpoints().size() == 1); + return localityLbEndpoint().lb_endpoints()[0]; + } + void startResolve(); // ClusterImplBase @@ -111,6 +135,8 @@ class LogicalDnsCluster : public ClusterImplBase { Network::Address::InstanceConstSharedPtr current_resolved_address_; HostSharedPtr logical_host_; Network::ActiveDnsQuery* active_dns_query_{}; + const LocalInfo::LocalInfo& local_info_; + const envoy::api::v2::ClusterLoadAssignment load_assignment_; }; } // namespace Upstream diff --git a/source/common/upstream/upstream_impl.cc b/source/common/upstream/upstream_impl.cc index 014c9a6c9400..a2f7d8bc85ce 100644 --- a/source/common/upstream/upstream_impl.cc +++ b/source/common/upstream/upstream_impl.cc @@ -382,17 +382,17 @@ ClusterSharedPtr ClusterImplBase::create( switch (cluster.type()) { case envoy::api::v2::Cluster::STATIC: - new_cluster.reset( - new StaticClusterImpl(cluster, runtime, stats, ssl_context_manager, cm, added_via_api)); + new_cluster.reset(new StaticClusterImpl(cluster, runtime, stats, ssl_context_manager, + local_info, cm, added_via_api)); break; case envoy::api::v2::Cluster::STRICT_DNS: new_cluster.reset(new StrictDnsClusterImpl(cluster, runtime, stats, ssl_context_manager, - selected_dns_resolver, cm, dispatcher, + local_info, selected_dns_resolver, cm, dispatcher, added_via_api)); break; case envoy::api::v2::Cluster::LOGICAL_DNS: new_cluster.reset(new LogicalDnsCluster(cluster, runtime, stats, ssl_context_manager, - selected_dns_resolver, tls, cm, dispatcher, + local_info, selected_dns_resolver, tls, cm, dispatcher, added_via_api)); break; case envoy::api::v2::Cluster::ORIGINAL_DST: @@ -666,27 +666,37 @@ void PriorityStateManager::registerHostForPriority( const std::string& hostname, Network::Address::InstanceConstSharedPtr address, const envoy::api::v2::endpoint::LocalityLbEndpoints& locality_lb_endpoint, const envoy::api::v2::endpoint::LbEndpoint& lb_endpoint, - const Upstream::Host::HealthFlag health_checker_flag) { + const absl::optional health_checker_flag) { + const HostSharedPtr host(new HostImpl(parent_.info(), hostname, address, lb_endpoint.metadata(), + lb_endpoint.load_balancing_weight().value(), + locality_lb_endpoint.locality(), + lb_endpoint.endpoint().health_check_config())); + registerHostForPriority(host, locality_lb_endpoint, lb_endpoint, health_checker_flag); +} + +void PriorityStateManager::registerHostForPriority( + const HostSharedPtr& host, + const envoy::api::v2::endpoint::LocalityLbEndpoints& locality_lb_endpoint, + const envoy::api::v2::endpoint::LbEndpoint& lb_endpoint, + const absl::optional health_checker_flag) { const uint32_t priority = locality_lb_endpoint.priority(); // Should be called after initializePriorityFor. ASSERT(priority_state_[priority].first); - priority_state_[priority].first->emplace_back( - new HostImpl(parent_.info(), hostname, address, lb_endpoint.metadata(), - lb_endpoint.load_balancing_weight().value(), locality_lb_endpoint.locality(), - lb_endpoint.endpoint().health_check_config())); - - const auto& health_status = lb_endpoint.health_status(); - if (health_status == envoy::api::v2::core::HealthStatus::UNHEALTHY || - health_status == envoy::api::v2::core::HealthStatus::DRAINING || - health_status == envoy::api::v2::core::HealthStatus::TIMEOUT) { - priority_state_[priority].first->back()->healthFlagSet(health_checker_flag); + priority_state_[priority].first->emplace_back(host); + if (health_checker_flag.has_value()) { + const auto& health_status = lb_endpoint.health_status(); + if (health_status == envoy::api::v2::core::HealthStatus::UNHEALTHY || + health_status == envoy::api::v2::core::HealthStatus::DRAINING || + health_status == envoy::api::v2::core::HealthStatus::TIMEOUT) { + priority_state_[priority].first->back()->healthFlagSet(health_checker_flag.value()); + } } } void PriorityStateManager::updateClusterPrioritySet( const uint32_t priority, HostVectorSharedPtr&& current_hosts, - const absl::optional& hosts_added, - const absl::optional& hosts_removed) { + const absl::optional& hosts_added, const absl::optional& hosts_removed, + const absl::optional health_checker_flag) { // If local locality is not defined then skip populating per locality hosts. const auto& local_locality = local_info_node_.locality(); ENVOY_LOG(trace, "Local locality: {}", local_locality.DebugString()); @@ -710,9 +720,12 @@ void PriorityStateManager::updateClusterPrioritySet( std::map hosts_per_locality; for (const HostSharedPtr& host : *hosts) { - // TODO(dio): Take into consideration when a non-EDS cluster has active health checking, i.e. to - // mark all the hosts unhealthy (host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC)) and - // then fire update callbacks to start the health checking process. + // Take into consideration when a non-EDS cluster has active health checking, i.e. to mark all + // the hosts unhealthy (host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC)) and then fire + // update callbacks to start the health checking process. + if (health_checker_flag.has_value()) { + host->healthFlagSet(health_checker_flag.value()); + } hosts_per_locality[host->locality()].push_back(host); } @@ -754,36 +767,41 @@ void PriorityStateManager::updateClusterPrioritySet( StaticClusterImpl::StaticClusterImpl(const envoy::api::v2::Cluster& cluster, Runtime::Loader& runtime, Stats::Store& stats, - Ssl::ContextManager& ssl_context_manager, ClusterManager& cm, + Ssl::ContextManager& ssl_context_manager, + const LocalInfo::LocalInfo& local_info, ClusterManager& cm, bool added_via_api) : ClusterImplBase(cluster, cm.bindConfig(), runtime, stats, ssl_context_manager, cm.clusterManagerFactory().secretManager(), added_via_api), - initial_hosts_(new HostVector()) { - - for (const auto& host : cluster.hosts()) { - initial_hosts_->emplace_back(HostSharedPtr{new HostImpl( - info_, "", resolveProtoAddress(host), envoy::api::v2::core::Metadata::default_instance(), 1, - envoy::api::v2::core::Locality().default_instance(), - envoy::api::v2::endpoint::Endpoint::HealthCheckConfig().default_instance())}); + priority_state_manager_(new PriorityStateManager(*this, local_info)) { + // TODO(dio): Use by-reference when cluster.hosts() is removed. + const envoy::api::v2::ClusterLoadAssignment cluster_load_assignment( + cluster.has_load_assignment() ? cluster.load_assignment() + : Config::Utility::translateClusterHosts(cluster.hosts())); + + for (const auto& locality_lb_endpoint : cluster_load_assignment.endpoints()) { + priority_state_manager_->initializePriorityFor(locality_lb_endpoint); + for (const auto& lb_endpoint : locality_lb_endpoint.lb_endpoints()) { + priority_state_manager_->registerHostForPriority( + "", resolveProtoAddress(lb_endpoint.endpoint().address()), locality_lb_endpoint, + lb_endpoint, absl::nullopt); + } } } void StaticClusterImpl::startPreInit() { - // At this point see if we have a health checker. If so, mark all the hosts unhealthy and then - // fire update callbacks to start the health checking process. - if (health_checker_) { - for (const auto& host : *initial_hosts_) { - host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC); - } + // At this point see if we have a health checker. If so, mark all the hosts unhealthy and + // then fire update callbacks to start the health checking process. + const auto& health_checker_flag = + health_checker_ != nullptr + ? absl::optional(Host::HealthFlag::FAILED_ACTIVE_HC) + : absl::nullopt; + + auto& priority_state = priority_state_manager_->priorityState(); + for (size_t i = 0; i < priority_state.size(); ++i) { + priority_state_manager_->updateClusterPrioritySet( + i, std::move(priority_state[i].first), absl::nullopt, absl::nullopt, health_checker_flag); } - - // Given the current config, only EDS clusters support multiple priorities. - ASSERT(priority_set_.hostSetsPerPriority().size() == 1); - auto& first_host_set = priority_set_.getOrCreateHostSet(0); - first_host_set.updateHosts(initial_hosts_, createHealthyHostList(*initial_hosts_), - HostsPerLocalityImpl::empty(), HostsPerLocalityImpl::empty(), {}, - *initial_hosts_, {}); - initial_hosts_ = nullptr; + priority_state_manager_.reset(); onPreInitComplete(); } @@ -934,12 +952,13 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const HostVector& new_hosts, StrictDnsClusterImpl::StrictDnsClusterImpl(const envoy::api::v2::Cluster& cluster, Runtime::Loader& runtime, Stats::Store& stats, Ssl::ContextManager& ssl_context_manager, + const LocalInfo::LocalInfo& local_info, Network::DnsResolverSharedPtr dns_resolver, ClusterManager& cm, Event::Dispatcher& dispatcher, bool added_via_api) : BaseDynamicClusterImpl(cluster, cm.bindConfig(), runtime, stats, ssl_context_manager, cm.clusterManagerFactory().secretManager(), added_via_api), - dns_resolver_(dns_resolver), + local_info_(local_info), dns_resolver_(dns_resolver), dns_refresh_rate_ms_( std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(cluster, dns_refresh_rate, 5000))) { switch (cluster.dns_lookup_family()) { @@ -956,11 +975,18 @@ StrictDnsClusterImpl::StrictDnsClusterImpl(const envoy::api::v2::Cluster& cluste NOT_REACHED_GCOVR_EXCL_LINE; } - for (const auto& host : cluster.hosts()) { - resolve_targets_.emplace_back( - new ResolveTarget(*this, dispatcher, - fmt::format("tcp://{}:{}", host.socket_address().address(), - host.socket_address().port_value()))); + const envoy::api::v2::ClusterLoadAssignment load_assignment( + cluster.has_load_assignment() ? cluster.load_assignment() + : Config::Utility::translateClusterHosts(cluster.hosts())); + const auto& locality_lb_endpoints = load_assignment.endpoints(); + for (const auto& locality_lb_endpoint : locality_lb_endpoints) { + for (const auto& lb_endpoint : locality_lb_endpoint.lb_endpoints()) { + const auto& host = lb_endpoint.endpoint().address(); + const std::string& url = fmt::format("tcp://{}:{}", host.socket_address().address(), + host.socket_address().port_value()); + resolve_targets_.emplace_back( + new ResolveTarget(*this, dispatcher, url, locality_lb_endpoint, lb_endpoint)); + } } } @@ -971,29 +997,34 @@ void StrictDnsClusterImpl::startPreInit() { } void StrictDnsClusterImpl::updateAllHosts(const HostVector& hosts_added, - const HostVector& hosts_removed) { + const HostVector& hosts_removed, + uint32_t current_priority) { + PriorityStateManager priority_state_manager(*this, local_info_); // At this point we know that we are different so make a new host list and notify. - HostVectorSharedPtr new_hosts(new HostVector()); for (const ResolveTargetPtr& target : resolve_targets_) { + priority_state_manager.initializePriorityFor(target->locality_lb_endpoint_); for (const HostSharedPtr& host : target->hosts_) { - new_hosts->emplace_back(host); + if (target->locality_lb_endpoint_.priority() == current_priority) { + priority_state_manager.registerHostForPriority(host, target->locality_lb_endpoint_, + target->lb_endpoint_, absl::nullopt); + } } } - // Given the current config, only EDS clusters support multiple priorities. - ASSERT(priority_set_.hostSetsPerPriority().size() == 1); - auto& first_host_set = priority_set_.getOrCreateHostSet(0); - first_host_set.updateHosts(new_hosts, createHealthyHostList(*new_hosts), - HostsPerLocalityImpl::empty(), HostsPerLocalityImpl::empty(), {}, - hosts_added, hosts_removed); + // TODO(dio): Add assertion in here. + priority_state_manager.updateClusterPrioritySet( + current_priority, std::move(priority_state_manager.priorityState()[current_priority].first), + hosts_added, hosts_removed, absl::nullopt); } -StrictDnsClusterImpl::ResolveTarget::ResolveTarget(StrictDnsClusterImpl& parent, - Event::Dispatcher& dispatcher, - const std::string& url) +StrictDnsClusterImpl::ResolveTarget::ResolveTarget( + StrictDnsClusterImpl& parent, Event::Dispatcher& dispatcher, const std::string& url, + const envoy::api::v2::endpoint::LocalityLbEndpoints& locality_lb_endpoint, + const envoy::api::v2::endpoint::LbEndpoint& lb_endpoint) : parent_(parent), dns_address_(Network::Utility::hostFromTcpUrl(url)), port_(Network::Utility::portFromTcpUrl(url)), - resolve_timer_(dispatcher.createTimer([this]() -> void { startResolve(); })) {} + resolve_timer_(dispatcher.createTimer([this]() -> void { startResolve(); })), + locality_lb_endpoint_(locality_lb_endpoint), lb_endpoint_(lb_endpoint) {} StrictDnsClusterImpl::ResolveTarget::~ResolveTarget() { if (active_query_) { @@ -1014,28 +1045,28 @@ void StrictDnsClusterImpl::ResolveTarget::startResolve() { HostVector new_hosts; for (const Network::Address::InstanceConstSharedPtr& address : address_list) { - // TODO(mattklein123): Currently the DNS interface does not consider port. We need to make - // a new address that has port in it. We need to both support IPv6 as well as potentially - // move port handling into the DNS interface itself, which would work better for SRV. + // TODO(mattklein123): Currently the DNS interface does not consider port. We need to + // make a new address that has port in it. We need to both support IPv6 as well as + // potentially move port handling into the DNS interface itself, which would work better + // for SRV. ASSERT(address != nullptr); new_hosts.emplace_back(new HostImpl( parent_.info_, dns_address_, Network::Utility::getAddressWithPort(*address, port_), - envoy::api::v2::core::Metadata::default_instance(), 1, - envoy::api::v2::core::Locality().default_instance(), - envoy::api::v2::endpoint::Endpoint::HealthCheckConfig().default_instance())); + lb_endpoint_.metadata(), lb_endpoint_.load_balancing_weight().value(), + locality_lb_endpoint_.locality(), lb_endpoint_.endpoint().health_check_config())); } HostVector hosts_added; HostVector hosts_removed; if (parent_.updateDynamicHostList(new_hosts, hosts_, hosts_added, hosts_removed)) { ENVOY_LOG(debug, "DNS hosts have changed for {}", dns_address_); - parent_.updateAllHosts(hosts_added, hosts_removed); + parent_.updateAllHosts(hosts_added, hosts_removed, locality_lb_endpoint_.priority()); } // If there is an initialize callback, fire it now. Note that if the cluster refers to - // multiple DNS names, this will return initialized after a single DNS resolution completes. - // This is not perfect but is easier to code and unclear if the extra complexity is needed - // so will start with this. + // multiple DNS names, this will return initialized after a single DNS resolution + // completes. This is not perfect but is easier to code and unclear if the extra + // complexity is needed so will start with this. parent_.onPreInitComplete(); resolve_timer_->enableTimer(parent_.dns_refresh_rate_ms_); }); diff --git a/source/common/upstream/upstream_impl.h b/source/common/upstream/upstream_impl.h index f5ba7e83794c..90b7935d55d4 100644 --- a/source/common/upstream/upstream_impl.h +++ b/source/common/upstream/upstream_impl.h @@ -541,18 +541,19 @@ class PriorityStateManager : protected Logger::Loggable { Network::Address::InstanceConstSharedPtr address, const envoy::api::v2::endpoint::LocalityLbEndpoints& locality_lb_endpoint, const envoy::api::v2::endpoint::LbEndpoint& lb_endpoint, - const Upstream::Host::HealthFlag health_checker_flag); + const absl::optional health_checker_flag); - // TODO(dio): Add an override of registerHostForPriority to register a host to the PriorityState - // based on a specified priority. This will be useful for non-EDS cluster hosts setup. - // - // void registerHostForPriority(const HostSharedPtr& host, const uint32_t priority); + void + registerHostForPriority(const HostSharedPtr& host, + const envoy::api::v2::endpoint::LocalityLbEndpoints& locality_lb_endpoint, + const envoy::api::v2::endpoint::LbEndpoint& lb_endpoint, + const absl::optional health_checker_flag); - // Updates the cluster priority set. This should be called after the PriorityStateManager is - // initialized. - void updateClusterPrioritySet(const uint32_t priority, HostVectorSharedPtr&& current_hosts, - const absl::optional& hosts_added, - const absl::optional& hosts_removed); + void + updateClusterPrioritySet(const uint32_t priority, HostVectorSharedPtr&& current_hosts, + const absl::optional& hosts_added, + const absl::optional& hosts_removed, + const absl::optional health_checker_flag); // Returns the size of the current cluster priority state. size_t size() const { return priority_state_.size(); } @@ -566,6 +567,8 @@ class PriorityStateManager : protected Logger::Loggable { const envoy::api::v2::core::Node& local_info_node_; }; +typedef std::unique_ptr PriorityStateManagerPtr; + /** * Implementation of Upstream::Cluster for static clusters (clusters that have a fixed number of * hosts with resolved IP addresses). @@ -574,7 +577,7 @@ class StaticClusterImpl : public ClusterImplBase { public: StaticClusterImpl(const envoy::api::v2::Cluster& cluster, Runtime::Loader& runtime, Stats::Store& stats, Ssl::ContextManager& ssl_context_manager, - ClusterManager& cm, bool added_via_api); + const LocalInfo::LocalInfo& local_info, ClusterManager& cm, bool added_via_api); // Upstream::Cluster InitializePhase initializePhase() const override { return InitializePhase::Primary; } @@ -583,7 +586,7 @@ class StaticClusterImpl : public ClusterImplBase { // ClusterImplBase void startPreInit() override; - HostVectorSharedPtr initial_hosts_; + PriorityStateManagerPtr priority_state_manager_; }; /** @@ -605,6 +608,7 @@ class StrictDnsClusterImpl : public BaseDynamicClusterImpl { public: StrictDnsClusterImpl(const envoy::api::v2::Cluster& cluster, Runtime::Loader& runtime, Stats::Store& stats, Ssl::ContextManager& ssl_context_manager, + const LocalInfo::LocalInfo& local_info, Network::DnsResolverSharedPtr dns_resolver, ClusterManager& cm, Event::Dispatcher& dispatcher, bool added_via_api); @@ -614,7 +618,9 @@ class StrictDnsClusterImpl : public BaseDynamicClusterImpl { private: struct ResolveTarget { ResolveTarget(StrictDnsClusterImpl& parent, Event::Dispatcher& dispatcher, - const std::string& url); + const std::string& url, + const envoy::api::v2::endpoint::LocalityLbEndpoints& locality_lb_endpoint, + const envoy::api::v2::endpoint::LbEndpoint& lb_endpoint); ~ResolveTarget(); void startResolve(); @@ -624,15 +630,19 @@ class StrictDnsClusterImpl : public BaseDynamicClusterImpl { uint32_t port_; Event::TimerPtr resolve_timer_; HostVector hosts_; + const envoy::api::v2::endpoint::LocalityLbEndpoints locality_lb_endpoint_; + const envoy::api::v2::endpoint::LbEndpoint lb_endpoint_; }; typedef std::unique_ptr ResolveTargetPtr; - void updateAllHosts(const HostVector& hosts_added, const HostVector& hosts_removed); + void updateAllHosts(const HostVector& hosts_added, const HostVector& hosts_removed, + uint32_t priority); // ClusterImplBase void startPreInit() override; + const LocalInfo::LocalInfo& local_info_; Network::DnsResolverSharedPtr dns_resolver_; std::list resolve_targets_; const std::chrono::milliseconds dns_refresh_rate_ms_; diff --git a/test/common/upstream/BUILD b/test/common/upstream/BUILD index 067ad5734c4b..441ce2437613 100644 --- a/test/common/upstream/BUILD +++ b/test/common/upstream/BUILD @@ -172,6 +172,7 @@ envoy_cc_test( "//source/common/upstream:upstream_lib", "//source/extensions/transport_sockets/raw_buffer:config", "//test/mocks:common_lib", + "//test/mocks/local_info:local_info_mocks", "//test/mocks/network:network_mocks", "//test/mocks/runtime:runtime_mocks", "//test/mocks/ssl:ssl_mocks", @@ -326,6 +327,7 @@ envoy_cc_test( "//source/common/upstream:upstream_lib", "//source/extensions/transport_sockets/raw_buffer:config", "//test/mocks:common_lib", + "//test/mocks/local_info:local_info_mocks", "//test/mocks/network:network_mocks", "//test/mocks/runtime:runtime_mocks", "//test/mocks/ssl:ssl_mocks", diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index 5afbfac84911..8082c0c090c0 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -122,7 +122,7 @@ class TestClusterManagerFactory : public ClusterManagerFactory { NiceMock random_; Ssl::ContextManagerImpl ssl_context_manager_{runtime_}; NiceMock dispatcher_; - LocalInfo::MockLocalInfo local_info_; + NiceMock local_info_; Secret::MockSecretManager secret_manager_; }; diff --git a/test/common/upstream/logical_dns_cluster_test.cc b/test/common/upstream/logical_dns_cluster_test.cc index df74b6f1aba4..80a5c7b705fc 100644 --- a/test/common/upstream/logical_dns_cluster_test.cc +++ b/test/common/upstream/logical_dns_cluster_test.cc @@ -9,6 +9,7 @@ #include "test/common/upstream/utility.h" #include "test/mocks/common.h" +#include "test/mocks/local_info/mocks.h" #include "test/mocks/network/mocks.h" #include "test/mocks/runtime/mocks.h" #include "test/mocks/ssl/mocks.h" @@ -26,14 +27,16 @@ using testing::_; namespace Envoy { namespace Upstream { +enum class ConfigType { V2_YAML, V1_JSON }; + class LogicalDnsClusterTest : public testing::Test { public: - void setup(const std::string& json) { + void setupFromV1Json(const std::string& json) { resolve_timer_ = new Event::MockTimer(&dispatcher_); NiceMock cm; cluster_.reset(new LogicalDnsCluster(parseClusterFromJson(json), runtime_, stats_store_, - ssl_context_manager_, dns_resolver_, tls_, cm, dispatcher_, - false)); + ssl_context_manager_, local_info_, dns_resolver_, tls_, cm, + dispatcher_, false)); cluster_->prioritySet().addMemberUpdateCb( [&](uint32_t, const HostVector&, const HostVector&) -> void { membership_updated_.ready(); @@ -41,8 +44,22 @@ class LogicalDnsClusterTest : public testing::Test { cluster_->initialize([&]() -> void { initialized_.ready(); }); } - void expectResolve(Network::DnsLookupFamily dns_lookup_family) { - EXPECT_CALL(*dns_resolver_, resolve("foo.bar.com", dns_lookup_family, _)) + void setupFromV2Yaml(const std::string& yaml) { + resolve_timer_ = new Event::MockTimer(&dispatcher_); + NiceMock cm; + cluster_.reset(new LogicalDnsCluster(parseClusterFromV2Yaml(yaml), runtime_, stats_store_, + ssl_context_manager_, local_info_, dns_resolver_, tls_, cm, + dispatcher_, false)); + cluster_->prioritySet().addMemberUpdateCb( + [&](uint32_t, const HostVector&, const HostVector&) -> void { + membership_updated_.ready(); + }); + cluster_->initialize([&]() -> void { initialized_.ready(); }); + } + + void expectResolve(Network::DnsLookupFamily dns_lookup_family, + const std::string& expected_address) { + EXPECT_CALL(*dns_resolver_, resolve(expected_address, dns_lookup_family, _)) .WillOnce(Invoke([&](const std::string&, Network::DnsLookupFamily, Network::DnsResolver::ResolveCb cb) -> Network::ActiveDnsQuery* { dns_callback_ = cb; @@ -50,6 +67,102 @@ class LogicalDnsClusterTest : public testing::Test { })); } + void testBasicSetup(const std::string& config, const std::string& expected_address, + ConfigType config_type = ConfigType::V2_YAML) { + expectResolve(Network::DnsLookupFamily::V4Only, expected_address); + if (config_type == ConfigType::V1_JSON) { + setupFromV1Json(config); + } else { + setupFromV2Yaml(config); + } + + EXPECT_CALL(membership_updated_, ready()); + EXPECT_CALL(initialized_, ready()); + EXPECT_CALL(*resolve_timer_, enableTimer(std::chrono::milliseconds(4000))); + dns_callback_(TestUtility::makeDnsResponse({"127.0.0.1", "127.0.0.2"})); + + EXPECT_EQ(1UL, cluster_->prioritySet().hostSetsPerPriority()[0]->hosts().size()); + EXPECT_EQ(1UL, cluster_->prioritySet().hostSetsPerPriority()[0]->healthyHosts().size()); + EXPECT_EQ(1UL, + cluster_->prioritySet().hostSetsPerPriority()[0]->hostsPerLocality().get().size()); + EXPECT_EQ( + 1UL, + cluster_->prioritySet().hostSetsPerPriority()[0]->healthyHostsPerLocality().get().size()); + EXPECT_EQ(cluster_->prioritySet().hostSetsPerPriority()[0]->hosts()[0], + cluster_->prioritySet().hostSetsPerPriority()[0]->healthyHosts()[0]); + HostSharedPtr logical_host = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts()[0]; + + EXPECT_CALL(dispatcher_, + createClientConnection_( + PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.1:443")), _, _, _)) + .WillOnce(Return(new NiceMock())); + logical_host->createConnection(dispatcher_, nullptr); + logical_host->outlierDetector().putHttpResponseCode(200); + + expectResolve(Network::DnsLookupFamily::V4Only, expected_address); + resolve_timer_->callback_(); + + // Should not cause any changes. + EXPECT_CALL(*resolve_timer_, enableTimer(_)); + dns_callback_(TestUtility::makeDnsResponse({"127.0.0.1", "127.0.0.2", "127.0.0.3"})); + + EXPECT_EQ(logical_host, cluster_->prioritySet().hostSetsPerPriority()[0]->hosts()[0]); + EXPECT_CALL(dispatcher_, + createClientConnection_( + PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.1:443")), _, _, _)) + .WillOnce(Return(new NiceMock())); + Host::CreateConnectionData data = logical_host->createConnection(dispatcher_, nullptr); + EXPECT_FALSE(data.host_description_->canary()); + EXPECT_EQ(&cluster_->prioritySet().hostSetsPerPriority()[0]->hosts()[0]->cluster(), + &data.host_description_->cluster()); + EXPECT_EQ(&cluster_->prioritySet().hostSetsPerPriority()[0]->hosts()[0]->stats(), + &data.host_description_->stats()); + EXPECT_EQ("127.0.0.1:443", data.host_description_->address()->asString()); + EXPECT_EQ("", data.host_description_->locality().region()); + EXPECT_EQ("", data.host_description_->locality().zone()); + EXPECT_EQ("", data.host_description_->locality().sub_zone()); + EXPECT_EQ("foo.bar.com", data.host_description_->hostname()); + EXPECT_TRUE(TestUtility::protoEqual(envoy::api::v2::core::Metadata::default_instance(), + *data.host_description_->metadata())); + data.host_description_->outlierDetector().putHttpResponseCode(200); + data.host_description_->healthChecker().setUnhealthy(); + + expectResolve(Network::DnsLookupFamily::V4Only, expected_address); + resolve_timer_->callback_(); + + // Should cause a change. + EXPECT_CALL(*resolve_timer_, enableTimer(_)); + dns_callback_(TestUtility::makeDnsResponse({"127.0.0.3", "127.0.0.1", "127.0.0.2"})); + + EXPECT_EQ(logical_host, cluster_->prioritySet().hostSetsPerPriority()[0]->hosts()[0]); + EXPECT_CALL(dispatcher_, + createClientConnection_( + PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.3:443")), _, _, _)) + .WillOnce(Return(new NiceMock())); + logical_host->createConnection(dispatcher_, nullptr); + + expectResolve(Network::DnsLookupFamily::V4Only, expected_address); + resolve_timer_->callback_(); + + // Empty should not cause any change. + EXPECT_CALL(*resolve_timer_, enableTimer(_)); + dns_callback_({}); + + EXPECT_EQ(logical_host, cluster_->prioritySet().hostSetsPerPriority()[0]->hosts()[0]); + EXPECT_CALL(dispatcher_, + createClientConnection_( + PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.3:443")), _, _, _)) + .WillOnce(Return(new NiceMock())); + logical_host->createConnection(dispatcher_, nullptr); + + // Make sure we cancel. + EXPECT_CALL(active_dns_query_, cancel()); + expectResolve(Network::DnsLookupFamily::V4Only, expected_address); + resolve_timer_->callback_(); + + tls_.shutdownThread(); + } + Stats::IsolatedStoreImpl stats_store_; Ssl::MockContextManager ssl_context_manager_; std::shared_ptr> dns_resolver_{ @@ -63,6 +176,7 @@ class LogicalDnsClusterTest : public testing::Test { ReadyWatcher initialized_; NiceMock runtime_; NiceMock dispatcher_; + NiceMock local_info_; }; typedef std::tuple> @@ -127,7 +241,7 @@ TEST_P(LogicalDnsParamTest, ImmediateResolve) { cb(TestUtility::makeDnsResponse(std::get<2>(GetParam()))); return nullptr; })); - setup(json); + setupFromV1Json(json); EXPECT_EQ(1UL, cluster_->prioritySet().hostSetsPerPriority()[0]->hosts().size()); EXPECT_EQ(1UL, cluster_->prioritySet().hostSetsPerPriority()[0]->healthyHosts().size()); EXPECT_EQ("foo.bar.com", @@ -137,7 +251,7 @@ TEST_P(LogicalDnsParamTest, ImmediateResolve) { } TEST_F(LogicalDnsClusterTest, BadConfig) { - const std::string json = R"EOF( + const std::string multiple_hosts_json = R"EOF( { "name": "name", "connect_timeout_ms": 250, @@ -147,7 +261,72 @@ TEST_F(LogicalDnsClusterTest, BadConfig) { } )EOF"; - EXPECT_THROW(setup(json), EnvoyException); + EXPECT_THROW_WITH_MESSAGE(setupFromV1Json(multiple_hosts_json), EnvoyException, + "LOGICAL_DNS clusters must have a single host"); + + const std::string multiple_lb_endpoints_yaml = R"EOF( + name: name + type: LOGICAL_DNS + dns_refresh_rate: 4s + connect_timeout: 0.25s + lb_policy: ROUND_ROBIN + dns_lookup_family: V4_ONLY + load_assignment: + cluster_name: name + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: foo.bar.com + port_value: 443 + health_check_config: + port_value: 8000 + - endpoint: + address: + socket_address: + address: hello.world.com + port_value: 443 + health_check_config: + port_value: 8000 + )EOF"; + + EXPECT_THROW_WITH_MESSAGE( + setupFromV2Yaml(multiple_lb_endpoints_yaml), EnvoyException, + "LOGICAL_DNS clusters must have a single locality_lb_endpoint and a single lb_endpoint"); + + const std::string multiple_endpoints_yaml = R"EOF( + name: name + type: LOGICAL_DNS + dns_refresh_rate: 4s + connect_timeout: 0.25s + lb_policy: ROUND_ROBIN + dns_lookup_family: V4_ONLY + load_assignment: + cluster_name: name + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: foo.bar.com + port_value: 443 + health_check_config: + port_value: 8000 + + - lb_endpoints: + - endpoint: + address: + socket_address: + address: hello.world.com + port_value: 443 + health_check_config: + port_value: 8000 + )EOF"; + + EXPECT_THROW_WITH_MESSAGE( + setupFromV2Yaml(multiple_endpoints_yaml), EnvoyException, + "LOGICAL_DNS clusters must have a single locality_lb_endpoint and a single lb_endpoint"); } TEST_F(LogicalDnsClusterTest, Basic) { @@ -162,93 +341,46 @@ TEST_F(LogicalDnsClusterTest, Basic) { } )EOF"; - expectResolve(Network::DnsLookupFamily::V4Only); - setup(json); - - EXPECT_CALL(membership_updated_, ready()); - EXPECT_CALL(initialized_, ready()); - EXPECT_CALL(*resolve_timer_, enableTimer(std::chrono::milliseconds(4000))); - dns_callback_(TestUtility::makeDnsResponse({"127.0.0.1", "127.0.0.2"})); + const std::string basic_yaml_hosts = R"EOF( + name: name + type: LOGICAL_DNS + dns_refresh_rate: 4s + connect_timeout: 0.25s + lb_policy: ROUND_ROBIN + # Since the following expectResolve() requires Network::DnsLookupFamily::V4Only we need to set + # dns_lookup_family to V4_ONLY explicitly for v2 .yaml config. + dns_lookup_family: V4_ONLY + hosts: + - socket_address: + address: foo.bar.com + port_value: 443 + )EOF"; - EXPECT_EQ(1UL, cluster_->prioritySet().hostSetsPerPriority()[0]->hosts().size()); - EXPECT_EQ(1UL, cluster_->prioritySet().hostSetsPerPriority()[0]->healthyHosts().size()); - EXPECT_EQ(0UL, cluster_->prioritySet().hostSetsPerPriority()[0]->hostsPerLocality().get().size()); - EXPECT_EQ( - 0UL, - cluster_->prioritySet().hostSetsPerPriority()[0]->healthyHostsPerLocality().get().size()); - EXPECT_EQ(cluster_->prioritySet().hostSetsPerPriority()[0]->hosts()[0], - cluster_->prioritySet().hostSetsPerPriority()[0]->healthyHosts()[0]); - HostSharedPtr logical_host = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts()[0]; - - EXPECT_CALL(dispatcher_, - createClientConnection_( - PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.1:443")), _, _, _)) - .WillOnce(Return(new NiceMock())); - logical_host->createConnection(dispatcher_, nullptr); - logical_host->outlierDetector().putHttpResponseCode(200); - - expectResolve(Network::DnsLookupFamily::V4Only); - resolve_timer_->callback_(); - - // Should not cause any changes. - EXPECT_CALL(*resolve_timer_, enableTimer(_)); - dns_callback_(TestUtility::makeDnsResponse({"127.0.0.1", "127.0.0.2", "127.0.0.3"})); - - EXPECT_EQ(logical_host, cluster_->prioritySet().hostSetsPerPriority()[0]->hosts()[0]); - EXPECT_CALL(dispatcher_, - createClientConnection_( - PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.1:443")), _, _, _)) - .WillOnce(Return(new NiceMock())); - Host::CreateConnectionData data = logical_host->createConnection(dispatcher_, nullptr); - EXPECT_FALSE(data.host_description_->canary()); - EXPECT_EQ(&cluster_->prioritySet().hostSetsPerPriority()[0]->hosts()[0]->cluster(), - &data.host_description_->cluster()); - EXPECT_EQ(&cluster_->prioritySet().hostSetsPerPriority()[0]->hosts()[0]->stats(), - &data.host_description_->stats()); - EXPECT_EQ("127.0.0.1:443", data.host_description_->address()->asString()); - EXPECT_EQ("", data.host_description_->locality().region()); - EXPECT_EQ("", data.host_description_->locality().zone()); - EXPECT_EQ("", data.host_description_->locality().sub_zone()); - EXPECT_EQ("foo.bar.com", data.host_description_->hostname()); - EXPECT_TRUE(TestUtility::protoEqual(envoy::api::v2::core::Metadata::default_instance(), - *data.host_description_->metadata())); - data.host_description_->outlierDetector().putHttpResponseCode(200); - data.host_description_->healthChecker().setUnhealthy(); - - expectResolve(Network::DnsLookupFamily::V4Only); - resolve_timer_->callback_(); - - // Should cause a change. - EXPECT_CALL(*resolve_timer_, enableTimer(_)); - dns_callback_(TestUtility::makeDnsResponse({"127.0.0.3", "127.0.0.1", "127.0.0.2"})); - - EXPECT_EQ(logical_host, cluster_->prioritySet().hostSetsPerPriority()[0]->hosts()[0]); - EXPECT_CALL(dispatcher_, - createClientConnection_( - PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.3:443")), _, _, _)) - .WillOnce(Return(new NiceMock())); - logical_host->createConnection(dispatcher_, nullptr); - - expectResolve(Network::DnsLookupFamily::V4Only); - resolve_timer_->callback_(); - - // Empty should not cause any change. - EXPECT_CALL(*resolve_timer_, enableTimer(_)); - dns_callback_({}); - - EXPECT_EQ(logical_host, cluster_->prioritySet().hostSetsPerPriority()[0]->hosts()[0]); - EXPECT_CALL(dispatcher_, - createClientConnection_( - PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.3:443")), _, _, _)) - .WillOnce(Return(new NiceMock())); - logical_host->createConnection(dispatcher_, nullptr); - - // Make sure we cancel. - EXPECT_CALL(active_dns_query_, cancel()); - expectResolve(Network::DnsLookupFamily::V4Only); - resolve_timer_->callback_(); + const std::string basic_yaml_load_assignment = R"EOF( + name: name + type: LOGICAL_DNS + dns_refresh_rate: 4s + connect_timeout: 0.25s + lb_policy: ROUND_ROBIN + # Since the following expectResolve() requires Network::DnsLookupFamily::V4Only we need to set + # dns_lookup_family to V4_ONLY explicitly for v2 .yaml config. + dns_lookup_family: V4_ONLY + load_assignment: + cluster_name: name + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: foo.bar.com + port_value: 443 + health_check_config: + port_value: 8000 + )EOF"; - tls_.shutdownThread(); + testBasicSetup(json, "foo.bar.com", ConfigType::V1_JSON); + testBasicSetup(basic_yaml_hosts, "foo.bar.com"); + testBasicSetup(basic_yaml_load_assignment, "foo.bar.com"); } } // namespace Upstream diff --git a/test/common/upstream/upstream_impl_test.cc b/test/common/upstream/upstream_impl_test.cc index 17888a900630..f4b43c393d2d 100644 --- a/test/common/upstream/upstream_impl_test.cc +++ b/test/common/upstream/upstream_impl_test.cc @@ -17,6 +17,7 @@ #include "test/common/upstream/utility.h" #include "test/mocks/common.h" +#include "test/mocks/local_info/mocks.h" #include "test/mocks/network/mocks.h" #include "test/mocks/runtime/mocks.h" #include "test/mocks/ssl/mocks.h" @@ -119,6 +120,7 @@ TEST_P(StrictDnsParamTest, ImmediateResolve) { auto dns_resolver = std::make_shared>(); NiceMock dispatcher; NiceMock runtime; + NiceMock local_info; ReadyWatcher initialized; const std::string json = R"EOF( @@ -141,7 +143,7 @@ TEST_P(StrictDnsParamTest, ImmediateResolve) { })); NiceMock cm; StrictDnsClusterImpl cluster(parseClusterFromJson(json), runtime, stats, ssl_context_manager, - dns_resolver, cm, dispatcher, false); + local_info, dns_resolver, cm, dispatcher, false); cluster.initialize([&]() -> void { initialized.ready(); }); EXPECT_EQ(2UL, cluster.prioritySet().hostSetsPerPriority()[0]->hosts().size()); EXPECT_EQ(2UL, cluster.prioritySet().hostSetsPerPriority()[0]->healthyHosts().size()); @@ -155,6 +157,7 @@ TEST(StrictDnsClusterImplTest, ZeroHostsHealthChecker) { NiceMock dispatcher; NiceMock runtime; NiceMock cm; + NiceMock local_info; ReadyWatcher initialized; const std::string yaml = R"EOF( @@ -167,7 +170,7 @@ TEST(StrictDnsClusterImplTest, ZeroHostsHealthChecker) { ResolverData resolver(*dns_resolver, dispatcher); StrictDnsClusterImpl cluster(parseClusterFromV2Yaml(yaml), runtime, stats, ssl_context_manager, - dns_resolver, cm, dispatcher, false); + local_info, dns_resolver, cm, dispatcher, false); std::shared_ptr health_checker(new MockHealthChecker()); EXPECT_CALL(*health_checker, start()); EXPECT_CALL(*health_checker, addHostCheckCompleteCb(_)); @@ -188,6 +191,7 @@ TEST(StrictDnsClusterImplTest, Basic) { auto dns_resolver = std::make_shared>(); NiceMock dispatcher; NiceMock runtime; + NiceMock local_info; // gmock matches in LIFO order which is why these are swapped. ResolverData resolver2(*dns_resolver, dispatcher); @@ -225,7 +229,7 @@ TEST(StrictDnsClusterImplTest, Basic) { NiceMock cm; StrictDnsClusterImpl cluster(parseClusterFromJson(json), runtime, stats, ssl_context_manager, - dns_resolver, cm, dispatcher, false); + local_info, dns_resolver, cm, dispatcher, false); EXPECT_CALL(runtime.snapshot_, getInteger("circuit_breakers.name.default.max_connections", 43)); EXPECT_EQ(43U, cluster.info()->resourceManager(ResourcePriority::Default).connections().max()); EXPECT_CALL(runtime.snapshot_, @@ -302,8 +306,8 @@ TEST(StrictDnsClusterImplTest, Basic) { ContainerEq(hostListToAddresses(cluster.prioritySet().hostSetsPerPriority()[0]->hosts()))); EXPECT_EQ(2UL, cluster.prioritySet().hostSetsPerPriority()[0]->healthyHosts().size()); - EXPECT_EQ(0UL, cluster.prioritySet().hostSetsPerPriority()[0]->hostsPerLocality().get().size()); - EXPECT_EQ(0UL, + EXPECT_EQ(1UL, cluster.prioritySet().hostSetsPerPriority()[0]->hostsPerLocality().get().size()); + EXPECT_EQ(1UL, cluster.prioritySet().hostSetsPerPriority()[0]->healthyHostsPerLocality().get().size()); for (const HostSharedPtr& host : cluster.prioritySet().hostSetsPerPriority()[0]->hosts()) { @@ -329,6 +333,7 @@ TEST(StrictDnsClusterImplTest, HostRemovalActiveHealthSkipped) { NiceMock dispatcher; NiceMock runtime; NiceMock cm; + NiceMock local_info; const std::string yaml = R"EOF( name: name @@ -341,7 +346,7 @@ TEST(StrictDnsClusterImplTest, HostRemovalActiveHealthSkipped) { ResolverData resolver(*dns_resolver, dispatcher); StrictDnsClusterImpl cluster(parseClusterFromV2Yaml(yaml), runtime, stats, ssl_context_manager, - dns_resolver, cm, dispatcher, false); + local_info, dns_resolver, cm, dispatcher, false); std::shared_ptr health_checker(new MockHealthChecker()); EXPECT_CALL(*health_checker, start()); EXPECT_CALL(*health_checker, addHostCheckCompleteCb(_)); @@ -372,6 +377,298 @@ TEST(StrictDnsClusterImplTest, HostRemovalActiveHealthSkipped) { EXPECT_EQ(1UL, hosts.size()); } +TEST(StrictDnsClusterImplTest, LoadAssignmentBasic) { + Stats::IsolatedStoreImpl stats; + Ssl::MockContextManager ssl_context_manager; + auto dns_resolver = std::make_shared>(); + NiceMock dispatcher; + NiceMock runtime; + NiceMock local_info; + + // gmock matches in LIFO order which is why these are swapped. + ResolverData resolver2(*dns_resolver, dispatcher); + ResolverData resolver1(*dns_resolver, dispatcher); + + const std::string yaml = R"EOF( + name: name + type: STRICT_DNS + + dns_lookup_family: V4_ONLY + connect_timeout: 0.25s + dns_refresh_rate: 4s + + lb_policy: ROUND_ROBIN + + circuit_breakers: + thresholds: + - priority: DEFAULT + max_connections: 43 + max_pending_requests: 57 + max_requests: 50 + max_retries: 10 + - priority: HIGH + max_connections: 1 + max_pending_requests: 2 + max_requests: 3 + max_retries: 4 + + max_requests_per_connection: 3 + + http2_protocol_options: + hpack_table_size: 0 + + load_assignment: + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: localhost1 + port_value: 11001 + health_check_config: + port_value: 8000 + - endpoint: + address: + socket_address: + address: localhost2 + port_value: 11002 + health_check_config: + port_value: 8000 + )EOF"; + + NiceMock cm; + StrictDnsClusterImpl cluster(parseClusterFromV2Yaml(yaml), runtime, stats, ssl_context_manager, + local_info, dns_resolver, cm, dispatcher, false); + EXPECT_CALL(runtime.snapshot_, getInteger("circuit_breakers.name.default.max_connections", 43)); + EXPECT_EQ(43U, cluster.info()->resourceManager(ResourcePriority::Default).connections().max()); + EXPECT_CALL(runtime.snapshot_, + getInteger("circuit_breakers.name.default.max_pending_requests", 57)); + EXPECT_EQ(57U, + cluster.info()->resourceManager(ResourcePriority::Default).pendingRequests().max()); + EXPECT_CALL(runtime.snapshot_, getInteger("circuit_breakers.name.default.max_requests", 50)); + EXPECT_EQ(50U, cluster.info()->resourceManager(ResourcePriority::Default).requests().max()); + EXPECT_CALL(runtime.snapshot_, getInteger("circuit_breakers.name.default.max_retries", 10)); + EXPECT_EQ(10U, cluster.info()->resourceManager(ResourcePriority::Default).retries().max()); + EXPECT_CALL(runtime.snapshot_, getInteger("circuit_breakers.name.high.max_connections", 1)); + EXPECT_EQ(1U, cluster.info()->resourceManager(ResourcePriority::High).connections().max()); + EXPECT_CALL(runtime.snapshot_, getInteger("circuit_breakers.name.high.max_pending_requests", 2)); + EXPECT_EQ(2U, cluster.info()->resourceManager(ResourcePriority::High).pendingRequests().max()); + EXPECT_CALL(runtime.snapshot_, getInteger("circuit_breakers.name.high.max_requests", 3)); + EXPECT_EQ(3U, cluster.info()->resourceManager(ResourcePriority::High).requests().max()); + EXPECT_CALL(runtime.snapshot_, getInteger("circuit_breakers.name.high.max_retries", 4)); + EXPECT_EQ(4U, cluster.info()->resourceManager(ResourcePriority::High).retries().max()); + EXPECT_EQ(3U, cluster.info()->maxRequestsPerConnection()); + EXPECT_EQ(0U, cluster.info()->http2Settings().hpack_table_size_); + + cluster.info()->stats().upstream_rq_total_.inc(); + EXPECT_EQ(1UL, stats.counter("cluster.name.upstream_rq_total").value()); + + EXPECT_CALL(runtime.snapshot_, featureEnabled("upstream.maintenance_mode.name", 0)); + EXPECT_FALSE(cluster.info()->maintenanceMode()); + + ReadyWatcher membership_updated; + cluster.prioritySet().addMemberUpdateCb( + [&](uint32_t, const HostVector&, const HostVector&) -> void { membership_updated.ready(); }); + + cluster.initialize([] {}); + + resolver1.expectResolve(*dns_resolver); + EXPECT_CALL(*resolver1.timer_, enableTimer(std::chrono::milliseconds(4000))); + EXPECT_CALL(membership_updated, ready()); + resolver1.dns_callback_(TestUtility::makeDnsResponse({"127.0.0.1", "127.0.0.2"})); + EXPECT_THAT( + std::list({"127.0.0.1:11001", "127.0.0.2:11001"}), + ContainerEq(hostListToAddresses(cluster.prioritySet().hostSetsPerPriority()[0]->hosts()))); + EXPECT_EQ("localhost1", cluster.prioritySet().hostSetsPerPriority()[0]->hosts()[0]->hostname()); + EXPECT_EQ("localhost1", cluster.prioritySet().hostSetsPerPriority()[0]->hosts()[1]->hostname()); + + resolver1.expectResolve(*dns_resolver); + resolver1.timer_->callback_(); + EXPECT_CALL(*resolver1.timer_, enableTimer(std::chrono::milliseconds(4000))); + resolver1.dns_callback_(TestUtility::makeDnsResponse({"127.0.0.2", "127.0.0.1"})); + EXPECT_THAT( + std::list({"127.0.0.1:11001", "127.0.0.2:11001"}), + ContainerEq(hostListToAddresses(cluster.prioritySet().hostSetsPerPriority()[0]->hosts()))); + + resolver1.expectResolve(*dns_resolver); + resolver1.timer_->callback_(); + EXPECT_CALL(*resolver1.timer_, enableTimer(std::chrono::milliseconds(4000))); + resolver1.dns_callback_(TestUtility::makeDnsResponse({"127.0.0.2", "127.0.0.1"})); + EXPECT_THAT( + std::list({"127.0.0.1:11001", "127.0.0.2:11001"}), + ContainerEq(hostListToAddresses(cluster.prioritySet().hostSetsPerPriority()[0]->hosts()))); + + resolver1.timer_->callback_(); + EXPECT_CALL(*resolver1.timer_, enableTimer(std::chrono::milliseconds(4000))); + EXPECT_CALL(membership_updated, ready()); + resolver1.dns_callback_(TestUtility::makeDnsResponse({"127.0.0.3"})); + EXPECT_THAT( + std::list({"127.0.0.3:11001"}), + ContainerEq(hostListToAddresses(cluster.prioritySet().hostSetsPerPriority()[0]->hosts()))); + + // Make sure we de-dup the same address. + EXPECT_CALL(*resolver2.timer_, enableTimer(std::chrono::milliseconds(4000))); + EXPECT_CALL(membership_updated, ready()); + resolver2.dns_callback_(TestUtility::makeDnsResponse({"10.0.0.1", "10.0.0.1"})); + EXPECT_THAT( + std::list({"127.0.0.3:11001", "10.0.0.1:11002"}), + ContainerEq(hostListToAddresses(cluster.prioritySet().hostSetsPerPriority()[0]->hosts()))); + + EXPECT_EQ(2UL, cluster.prioritySet().hostSetsPerPriority()[0]->healthyHosts().size()); + EXPECT_EQ(1UL, cluster.prioritySet().hostSetsPerPriority()[0]->hostsPerLocality().get().size()); + EXPECT_EQ(1UL, + cluster.prioritySet().hostSetsPerPriority()[0]->healthyHostsPerLocality().get().size()); + + for (const HostSharedPtr& host : cluster.prioritySet().hostSetsPerPriority()[0]->hosts()) { + EXPECT_EQ(cluster.info().get(), &host->cluster()); + } + + // Make sure we cancel. + resolver1.expectResolve(*dns_resolver); + resolver1.timer_->callback_(); + resolver2.expectResolve(*dns_resolver); + resolver2.timer_->callback_(); + + EXPECT_CALL(resolver1.active_dns_query_, cancel()); + EXPECT_CALL(resolver2.active_dns_query_, cancel()); +} + +TEST(StrictDnsClusterImplTest, LoadAssignmentBasicMultiplePriorities) { + Stats::IsolatedStoreImpl stats; + Ssl::MockContextManager ssl_context_manager; + auto dns_resolver = std::make_shared>(); + NiceMock dispatcher; + NiceMock runtime; + NiceMock local_info; + + // gmock matches in LIFO order which is why these are swapped. + ResolverData resolver3(*dns_resolver, dispatcher); + ResolverData resolver2(*dns_resolver, dispatcher); + ResolverData resolver1(*dns_resolver, dispatcher); + + const std::string yaml = R"EOF( + name: name + type: STRICT_DNS + + dns_lookup_family: V4_ONLY + connect_timeout: 0.25s + dns_refresh_rate: 4s + + lb_policy: ROUND_ROBIN + + load_assignment: + endpoints: + - priority: 0 + lb_endpoints: + - endpoint: + address: + socket_address: + address: localhost1 + port_value: 11001 + health_check_config: + port_value: 8000 + - endpoint: + address: + socket_address: + address: localhost2 + port_value: 11002 + health_check_config: + port_value: 8000 + + - priority: 1 + lb_endpoints: + - endpoint: + address: + socket_address: + address: localhost3 + port_value: 11003 + health_check_config: + port_value: 8000 + )EOF"; + + NiceMock cm; + StrictDnsClusterImpl cluster(parseClusterFromV2Yaml(yaml), runtime, stats, ssl_context_manager, + local_info, dns_resolver, cm, dispatcher, false); + + ReadyWatcher membership_updated; + cluster.prioritySet().addMemberUpdateCb( + [&](uint32_t, const HostVector&, const HostVector&) -> void { membership_updated.ready(); }); + + cluster.initialize([] {}); + + resolver1.expectResolve(*dns_resolver); + EXPECT_CALL(*resolver1.timer_, enableTimer(std::chrono::milliseconds(4000))); + EXPECT_CALL(membership_updated, ready()); + resolver1.dns_callback_(TestUtility::makeDnsResponse({"127.0.0.1", "127.0.0.2"})); + EXPECT_THAT( + std::list({"127.0.0.1:11001", "127.0.0.2:11001"}), + ContainerEq(hostListToAddresses(cluster.prioritySet().hostSetsPerPriority()[0]->hosts()))); + EXPECT_EQ("localhost1", cluster.prioritySet().hostSetsPerPriority()[0]->hosts()[0]->hostname()); + EXPECT_EQ("localhost1", cluster.prioritySet().hostSetsPerPriority()[0]->hosts()[1]->hostname()); + + resolver1.expectResolve(*dns_resolver); + resolver1.timer_->callback_(); + EXPECT_CALL(*resolver1.timer_, enableTimer(std::chrono::milliseconds(4000))); + resolver1.dns_callback_(TestUtility::makeDnsResponse({"127.0.0.2", "127.0.0.1"})); + EXPECT_THAT( + std::list({"127.0.0.1:11001", "127.0.0.2:11001"}), + ContainerEq(hostListToAddresses(cluster.prioritySet().hostSetsPerPriority()[0]->hosts()))); + + resolver1.expectResolve(*dns_resolver); + resolver1.timer_->callback_(); + EXPECT_CALL(*resolver1.timer_, enableTimer(std::chrono::milliseconds(4000))); + resolver1.dns_callback_(TestUtility::makeDnsResponse({"127.0.0.2", "127.0.0.1"})); + EXPECT_THAT( + std::list({"127.0.0.1:11001", "127.0.0.2:11001"}), + ContainerEq(hostListToAddresses(cluster.prioritySet().hostSetsPerPriority()[0]->hosts()))); + + resolver1.timer_->callback_(); + EXPECT_CALL(*resolver1.timer_, enableTimer(std::chrono::milliseconds(4000))); + EXPECT_CALL(membership_updated, ready()); + resolver1.dns_callback_(TestUtility::makeDnsResponse({"127.0.0.3"})); + EXPECT_THAT( + std::list({"127.0.0.3:11001"}), + ContainerEq(hostListToAddresses(cluster.prioritySet().hostSetsPerPriority()[0]->hosts()))); + + // Make sure we de-dup the same address. + EXPECT_CALL(*resolver2.timer_, enableTimer(std::chrono::milliseconds(4000))); + EXPECT_CALL(membership_updated, ready()); + resolver2.dns_callback_(TestUtility::makeDnsResponse({"10.0.0.1", "10.0.0.1"})); + EXPECT_THAT( + std::list({"127.0.0.3:11001", "10.0.0.1:11002"}), + ContainerEq(hostListToAddresses(cluster.prioritySet().hostSetsPerPriority()[0]->hosts()))); + + EXPECT_EQ(2UL, cluster.prioritySet().hostSetsPerPriority()[0]->healthyHosts().size()); + EXPECT_EQ(1UL, cluster.prioritySet().hostSetsPerPriority()[0]->hostsPerLocality().get().size()); + EXPECT_EQ(1UL, + cluster.prioritySet().hostSetsPerPriority()[0]->healthyHostsPerLocality().get().size()); + + for (const HostSharedPtr& host : cluster.prioritySet().hostSetsPerPriority()[0]->hosts()) { + EXPECT_EQ(cluster.info().get(), &host->cluster()); + } + + EXPECT_CALL(*resolver3.timer_, enableTimer(std::chrono::milliseconds(4000))); + EXPECT_CALL(membership_updated, ready()); + resolver3.dns_callback_(TestUtility::makeDnsResponse({"192.168.1.1", "192.168.1.2"})); + + // Make sure we have multiple priorities. + EXPECT_THAT( + std::list({"192.168.1.1:11003", "192.168.1.2:11003"}), + ContainerEq(hostListToAddresses(cluster.prioritySet().hostSetsPerPriority()[1]->hosts()))); + + // Make sure we cancel. + resolver1.expectResolve(*dns_resolver); + resolver1.timer_->callback_(); + resolver2.expectResolve(*dns_resolver); + resolver2.timer_->callback_(); + resolver3.expectResolve(*dns_resolver); + resolver3.timer_->callback_(); + + EXPECT_CALL(resolver1.active_dns_query_, cancel()); + EXPECT_CALL(resolver2.active_dns_query_, cancel()); + EXPECT_CALL(resolver3.active_dns_query_, cancel()); +} + TEST(HostImplTest, HostCluster) { MockCluster cluster; HostSharedPtr host = makeTestHost(cluster.info_, "tcp://10.0.0.1:1234", 1); @@ -419,10 +716,38 @@ TEST(HostImplTest, HostnameCanaryAndLocality) { EXPECT_EQ("world", host.locality().sub_zone()); } +TEST(StaticClusterImplTest, InitialHosts) { + Stats::IsolatedStoreImpl stats; + Ssl::MockContextManager ssl_context_manager; + NiceMock runtime; + NiceMock local_info; + const std::string yaml = R"EOF( + name: staticcluster + connect_timeout: 0.25s + type: STATIC + lb_policy: ROUND_ROBIN + hosts: + - socket_address: + address: 10.0.0.1 + port_value: 443 + )EOF"; + + NiceMock cm; + StaticClusterImpl cluster(parseClusterFromV2Yaml(yaml), runtime, stats, ssl_context_manager, + local_info, cm, false); + cluster.initialize([] {}); + + EXPECT_EQ(1UL, cluster.prioritySet().hostSetsPerPriority()[0]->healthyHosts().size()); + EXPECT_EQ("", cluster.prioritySet().hostSetsPerPriority()[0]->hosts()[0]->hostname()); + EXPECT_FALSE(cluster.info()->addedViaApi()); +} + TEST(StaticClusterImplTest, EmptyHostname) { Stats::IsolatedStoreImpl stats; Ssl::MockContextManager ssl_context_manager; NiceMock runtime; + NiceMock local_info; + const std::string json = R"EOF( { "name": "staticcluster", @@ -434,8 +759,41 @@ TEST(StaticClusterImplTest, EmptyHostname) { )EOF"; NiceMock cm; - StaticClusterImpl cluster(parseClusterFromJson(json), runtime, stats, ssl_context_manager, cm, - false); + StaticClusterImpl cluster(parseClusterFromJson(json), runtime, stats, ssl_context_manager, + local_info, cm, false); + cluster.initialize([] {}); + + EXPECT_EQ(1UL, cluster.prioritySet().hostSetsPerPriority()[0]->healthyHosts().size()); + EXPECT_EQ("", cluster.prioritySet().hostSetsPerPriority()[0]->hosts()[0]->hostname()); + EXPECT_FALSE(cluster.info()->addedViaApi()); +} + +TEST(StaticClusterImplTest, LoadAssignmentEmptyHostname) { + Stats::IsolatedStoreImpl stats; + Ssl::MockContextManager ssl_context_manager; + NiceMock runtime; + NiceMock local_info; + + const std::string yaml = R"EOF( + name: staticcluster + connect_timeout: 0.25s + type: STATIC + lb_policy: ROUND_ROBIN + load_assignment: + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 10.0.0.1 + port_value: 443 + health_check_config: + port_value: 8000 + )EOF"; + + NiceMock cm; + StaticClusterImpl cluster(parseClusterFromV2Yaml(yaml), runtime, stats, ssl_context_manager, + local_info, cm, false); cluster.initialize([] {}); EXPECT_EQ(1UL, cluster.prioritySet().hostSetsPerPriority()[0]->healthyHosts().size()); @@ -443,10 +801,114 @@ TEST(StaticClusterImplTest, EmptyHostname) { EXPECT_FALSE(cluster.info()->addedViaApi()); } +TEST(StaticClusterImplTest, LoadAssignmentMultiplePriorities) { + Stats::IsolatedStoreImpl stats; + Ssl::MockContextManager ssl_context_manager; + NiceMock runtime; + NiceMock local_info; + + const std::string yaml = R"EOF( + name: staticcluster + connect_timeout: 0.25s + type: STATIC + lb_policy: ROUND_ROBIN + load_assignment: + endpoints: + - priority: 0 + lb_endpoints: + - endpoint: + address: + socket_address: + address: 10.0.0.1 + port_value: 443 + health_check_config: + port_value: 8000 + - endpoint: + address: + socket_address: + address: 10.0.0.2 + port_value: 443 + health_check_config: + port_value: 8000 + + - priority: 1 + lb_endpoints: + - endpoint: + address: + socket_address: + address: 10.0.0.3 + port_value: 443 + health_check_config: + port_value: 8000 + )EOF"; + + NiceMock cm; + StaticClusterImpl cluster(parseClusterFromV2Yaml(yaml), runtime, stats, ssl_context_manager, + local_info, cm, false); + cluster.initialize([] {}); + + EXPECT_EQ(2UL, cluster.prioritySet().hostSetsPerPriority()[0]->healthyHosts().size()); + EXPECT_EQ(1UL, cluster.prioritySet().hostSetsPerPriority()[1]->healthyHosts().size()); + EXPECT_EQ("", cluster.prioritySet().hostSetsPerPriority()[0]->hosts()[0]->hostname()); + EXPECT_FALSE(cluster.info()->addedViaApi()); +} + +TEST(StaticClusterImplTest, LoadAssignmentLocality) { + Stats::IsolatedStoreImpl stats; + Ssl::MockContextManager ssl_context_manager; + NiceMock runtime; + NiceMock local_info; + + const std::string yaml = R"EOF( + name: staticcluster + connect_timeout: 0.25s + type: STATIC + lb_policy: ROUND_ROBIN + load_assignment: + endpoints: + - locality: + region: oceania + zone: hello + sub_zone: world + lb_endpoints: + - endpoint: + address: + socket_address: + address: 10.0.0.1 + port_value: 443 + health_check_config: + port_value: 8000 + - endpoint: + address: + socket_address: + address: 10.0.0.2 + port_value: 443 + health_check_config: + port_value: 8000 + )EOF"; + + NiceMock cm; + StaticClusterImpl cluster(parseClusterFromV2Yaml(yaml), runtime, stats, ssl_context_manager, + local_info, cm, false); + cluster.initialize([] {}); + + auto& hosts = cluster.prioritySet().hostSetsPerPriority()[0]->hosts(); + EXPECT_EQ(hosts.size(), 2); + for (int i = 0; i < 2; ++i) { + const auto& locality = hosts[i]->locality(); + EXPECT_EQ("oceania", locality.region()); + EXPECT_EQ("hello", locality.zone()); + EXPECT_EQ("world", locality.sub_zone()); + } + EXPECT_EQ(nullptr, cluster.prioritySet().hostSetsPerPriority()[0]->localityWeights()); + EXPECT_FALSE(cluster.info()->addedViaApi()); +} + TEST(StaticClusterImplTest, AltStatName) { Stats::IsolatedStoreImpl stats; Ssl::MockContextManager ssl_context_manager; NiceMock runtime; + NiceMock local_info; const std::string yaml = R"EOF( name: staticcluster @@ -458,8 +920,8 @@ TEST(StaticClusterImplTest, AltStatName) { )EOF"; NiceMock cm; - StaticClusterImpl cluster(parseClusterFromV2Yaml(yaml), runtime, stats, ssl_context_manager, cm, - false); + StaticClusterImpl cluster(parseClusterFromV2Yaml(yaml), runtime, stats, ssl_context_manager, + local_info, cm, false); cluster.initialize([] {}); // Increment a stat and verify it is emitted with alt_stat_name cluster.info()->stats().upstream_rq_total_.inc(); @@ -470,6 +932,8 @@ TEST(StaticClusterImplTest, RingHash) { Stats::IsolatedStoreImpl stats; Ssl::MockContextManager ssl_context_manager; NiceMock runtime; + NiceMock local_info; + const std::string json = R"EOF( { "name": "staticcluster", @@ -481,8 +945,8 @@ TEST(StaticClusterImplTest, RingHash) { )EOF"; NiceMock cm; - StaticClusterImpl cluster(parseClusterFromJson(json), runtime, stats, ssl_context_manager, cm, - true); + StaticClusterImpl cluster(parseClusterFromJson(json), runtime, stats, ssl_context_manager, + local_info, cm, true); cluster.initialize([] {}); EXPECT_EQ(1UL, cluster.prioritySet().hostSetsPerPriority()[0]->healthyHosts().size()); @@ -494,6 +958,8 @@ TEST(StaticClusterImplTest, OutlierDetector) { Stats::IsolatedStoreImpl stats; Ssl::MockContextManager ssl_context_manager; NiceMock runtime; + NiceMock local_info; + const std::string json = R"EOF( { "name": "addressportconfig", @@ -506,8 +972,8 @@ TEST(StaticClusterImplTest, OutlierDetector) { )EOF"; NiceMock cm; - StaticClusterImpl cluster(parseClusterFromJson(json), runtime, stats, ssl_context_manager, cm, - false); + StaticClusterImpl cluster(parseClusterFromJson(json), runtime, stats, ssl_context_manager, + local_info, cm, false); Outlier::MockDetector* detector = new Outlier::MockDetector(); EXPECT_CALL(*detector, addChangedStateCb(_)); @@ -541,6 +1007,8 @@ TEST(StaticClusterImplTest, HealthyStat) { Stats::IsolatedStoreImpl stats; Ssl::MockContextManager ssl_context_manager; NiceMock runtime; + NiceMock local_info; + const std::string json = R"EOF( { "name": "addressportconfig", @@ -553,8 +1021,8 @@ TEST(StaticClusterImplTest, HealthyStat) { )EOF"; NiceMock cm; - StaticClusterImpl cluster(parseClusterFromJson(json), runtime, stats, ssl_context_manager, cm, - false); + StaticClusterImpl cluster(parseClusterFromJson(json), runtime, stats, ssl_context_manager, + local_info, cm, false); Outlier::MockDetector* outlier_detector = new NiceMock(); cluster.setOutlierDetector(Outlier::DetectorSharedPtr{outlier_detector}); @@ -623,6 +1091,8 @@ TEST(StaticClusterImplTest, UrlConfig) { Stats::IsolatedStoreImpl stats; Ssl::MockContextManager ssl_context_manager; NiceMock runtime; + NiceMock local_info; + const std::string json = R"EOF( { "name": "addressportconfig", @@ -635,8 +1105,8 @@ TEST(StaticClusterImplTest, UrlConfig) { )EOF"; NiceMock cm; - StaticClusterImpl cluster(parseClusterFromJson(json), runtime, stats, ssl_context_manager, cm, - false); + StaticClusterImpl cluster(parseClusterFromJson(json), runtime, stats, ssl_context_manager, + local_info, cm, false); cluster.initialize([] {}); EXPECT_EQ(1024U, cluster.info()->resourceManager(ResourcePriority::Default).connections().max()); @@ -656,8 +1126,8 @@ TEST(StaticClusterImplTest, UrlConfig) { std::list({"10.0.0.1:11001", "10.0.0.2:11002"}), ContainerEq(hostListToAddresses(cluster.prioritySet().hostSetsPerPriority()[0]->hosts()))); EXPECT_EQ(2UL, cluster.prioritySet().hostSetsPerPriority()[0]->healthyHosts().size()); - EXPECT_EQ(0UL, cluster.prioritySet().hostSetsPerPriority()[0]->hostsPerLocality().get().size()); - EXPECT_EQ(0UL, + EXPECT_EQ(1UL, cluster.prioritySet().hostSetsPerPriority()[0]->hostsPerLocality().get().size()); + EXPECT_EQ(1UL, cluster.prioritySet().hostSetsPerPriority()[0]->healthyHostsPerLocality().get().size()); cluster.prioritySet().hostSetsPerPriority()[0]->hosts()[0]->healthChecker().setUnhealthy(); } @@ -667,6 +1137,8 @@ TEST(StaticClusterImplTest, UnsupportedLBType) { Ssl::MockContextManager ssl_context_manager; NiceMock runtime; NiceMock cm; + NiceMock local_info; + const std::string json = R"EOF( { "name": "addressportconfig", @@ -678,15 +1150,17 @@ TEST(StaticClusterImplTest, UnsupportedLBType) { } )EOF"; - EXPECT_THROW( - StaticClusterImpl(parseClusterFromJson(json), runtime, stats, ssl_context_manager, cm, false), - EnvoyException); + EXPECT_THROW(StaticClusterImpl(parseClusterFromJson(json), runtime, stats, ssl_context_manager, + local_info, cm, false), + EnvoyException); } TEST(StaticClusterImplTest, MalformedHostIP) { Stats::IsolatedStoreImpl stats; Ssl::MockContextManager ssl_context_manager; NiceMock runtime; + NiceMock local_info; + const std::string yaml = R"EOF( name: name connect_timeout: 0.25s @@ -697,7 +1171,7 @@ TEST(StaticClusterImplTest, MalformedHostIP) { NiceMock cm; EXPECT_THROW_WITH_MESSAGE(StaticClusterImpl(parseClusterFromV2Yaml(yaml), runtime, stats, - ssl_context_manager, cm, false), + ssl_context_manager, local_info, cm, false), EnvoyException, "malformed IP address: foo.bar.com. Consider setting resolver_name or " "setting cluster type to 'STRICT_DNS' or 'LOGICAL_DNS'"); @@ -739,6 +1213,8 @@ TEST(StaticClusterImplTest, SourceAddressPriority) { Stats::IsolatedStoreImpl stats; Ssl::MockContextManager ssl_context_manager; NiceMock runtime; + NiceMock local_info; + envoy::api::v2::Cluster config; config.set_name("staticcluster"); config.mutable_connect_timeout(); @@ -747,7 +1223,7 @@ TEST(StaticClusterImplTest, SourceAddressPriority) { // If the cluster manager gets a source address from the bootstrap proto, use it. NiceMock cm; cm.bind_config_.mutable_source_address()->set_address("1.2.3.5"); - StaticClusterImpl cluster(config, runtime, stats, ssl_context_manager, cm, false); + StaticClusterImpl cluster(config, runtime, stats, ssl_context_manager, local_info, cm, false); EXPECT_EQ("1.2.3.5:0", cluster.info()->sourceAddress()->asString()); } @@ -756,7 +1232,7 @@ TEST(StaticClusterImplTest, SourceAddressPriority) { { // Verify source address from cluster config is used when present. NiceMock cm; - StaticClusterImpl cluster(config, runtime, stats, ssl_context_manager, cm, false); + StaticClusterImpl cluster(config, runtime, stats, ssl_context_manager, local_info, cm, false); EXPECT_EQ(cluster_address, cluster.info()->sourceAddress()->ip()->addressAsString()); } @@ -764,7 +1240,7 @@ TEST(StaticClusterImplTest, SourceAddressPriority) { // The source address from cluster config takes precedence over one from the bootstrap proto. NiceMock cm; cm.bind_config_.mutable_source_address()->set_address("1.2.3.5"); - StaticClusterImpl cluster(config, runtime, stats, ssl_context_manager, cm, false); + StaticClusterImpl cluster(config, runtime, stats, ssl_context_manager, local_info, cm, false); EXPECT_EQ(cluster_address, cluster.info()->sourceAddress()->ip()->addressAsString()); } } @@ -778,6 +1254,7 @@ TEST(ClusterImplTest, CloseConnectionsOnHostHealthFailure) { NiceMock dispatcher; NiceMock runtime; NiceMock cm; + NiceMock local_info; ReadyWatcher initialized; const std::string yaml = R"EOF( @@ -789,7 +1266,7 @@ TEST(ClusterImplTest, CloseConnectionsOnHostHealthFailure) { hosts: [{ socket_address: { address: foo.bar.com, port_value: 443 }}] )EOF"; StrictDnsClusterImpl cluster(parseClusterFromV2Yaml(yaml), runtime, stats, ssl_context_manager, - dns_resolver, cm, dispatcher, false); + local_info, dns_resolver, cm, dispatcher, false); EXPECT_TRUE(cluster.info()->features() & ClusterInfo::Features::CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE); } @@ -851,6 +1328,7 @@ TEST(ClusterMetadataTest, Metadata) { NiceMock dispatcher; NiceMock runtime; NiceMock cm; + NiceMock local_info; ReadyWatcher initialized; const std::string yaml = R"EOF( @@ -866,7 +1344,7 @@ TEST(ClusterMetadataTest, Metadata) { )EOF"; StrictDnsClusterImpl cluster(parseClusterFromV2Yaml(yaml), runtime, stats, ssl_context_manager, - dns_resolver, cm, dispatcher, false); + local_info, dns_resolver, cm, dispatcher, false); EXPECT_EQ("test_value", Config::Metadata::metadataValue(cluster.info()->metadata(), "com.bar.foo", "baz") .string_value());