Skip to content

Commit

Permalink
upstream: implement Cluster's load_assignment field (#3864)
Browse files Browse the repository at this point in the history
This patch implements load_assigment field in CDS' Cluster.
This change specifically adds the implementation of the new load_assigment field
for clusters with discovery-type: STATIC, STRICT_DNS and LOGICAL_DNS.

While adding this load_assigment field implementation to Cluster,
this patch also allows specifying optional (active) health check config per specified upstream host.

Risk Level: medium
Testing: unit tests
Docs Changes:

This unhides docs for endpoint health check config
Release Notes: N/A

Fixes #439

Signed-off-by: Dhi Aurrahman <[email protected]>
  • Loading branch information
dio authored and htuch committed Jul 24, 2018
1 parent 8b3aae8 commit b32eabf
Show file tree
Hide file tree
Showing 16 changed files with 1,018 additions and 245 deletions.
3 changes: 2 additions & 1 deletion DEPRECATED.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ A logged warning is expected for each deprecated item that is in deprecation win

* Use of the v1 API is deprecated. See envoy-announce
[email](https://groups.google.com/forum/#!topic/envoy-announce/oPnYMZw8H4U).
* Use of the legacy
* Use of the legacy
[ratelimit.proto](https://github.com/envoyproxy/envoy/blob/b0a518d064c8255e0e20557a8f909b6ff457558f/source/common/ratelimit/ratelimit.proto)
is deprecated, in favor of the proto defined in
[date-plane-api](https://github.com/envoyproxy/envoy/blob/master/api/envoy/service/ratelimit/v2/rls.proto)
Expand All @@ -23,6 +23,7 @@ A logged warning is expected for each deprecated item that is in deprecation win
is deprecated. Please use the new `upgrade_configs` in the
[HttpConnectionManager](https://github.com/envoyproxy/envoy/blob/master/api/envoy/config/filter/network/http_connection_manager/v2/http_connection_manager.proto)
instead.
* Setting hosts via `hosts` field in `Cluster` is deprecated. Use `load_assignment` instead.

## Version 1.7.0

Expand Down
9 changes: 7 additions & 2 deletions api/envoy/api/v2/cds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,13 @@ message Cluster {
// :ref:`STRICT_DNS<envoy_api_enum_value_Cluster.DiscoveryType.STRICT_DNS>`
// or :ref:`LOGICAL_DNS<envoy_api_enum_value_Cluster.DiscoveryType.LOGICAL_DNS>`,
// then hosts is required.
repeated core.Address hosts = 7;
//
// .. attention::
//
// **This field is deprecated**. Set the
// :ref:`load_assignment<envoy_api_field_Cluster.load_assignment>` field instead.
//
repeated core.Address hosts = 7 [deprecated = true];

// Setting this is required for specifying members of
// :ref:`STATIC<envoy_api_enum_value_Cluster.DiscoveryType.STATIC>`,
Expand All @@ -176,7 +182,6 @@ message Cluster {
// :ref:`endpoint assignments<envoy_api_msg_ClusterLoadAssignment>`.
// Setting this overrides :ref:`hosts<envoy_api_field_Cluster.hosts>` values.
//
// [#not-implemented-hide:]
ClusterLoadAssignment load_assignment = 33;

// Optional :ref:`active health checking <arch_overview_health_checking>`
Expand Down
6 changes: 3 additions & 3 deletions api/envoy/api/v2/endpoint/endpoint.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ message Endpoint {
// and will be resolved via DNS.
core.Address address = 1;

// [#not-implemented-hide:] The optional health check configuration.
// The optional health check configuration.
message HealthCheckConfig {
// Optional alternative health check port value.
//
Expand All @@ -40,8 +40,8 @@ message Endpoint {
uint32 port_value = 1;
}

// [#not-implemented-hide:] The optional health check configuration is used as
// configuration for the health checker to contact the health checked host.
// The optional health check configuration is used as configuration for the
// health checker to contact the health checked host.
//
// .. attention::
//
Expand Down
35 changes: 28 additions & 7 deletions docs/root/configuration/overview/v2_overview.rst
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,14 @@ A minimal fully static bootstrap config is provided below:
connect_timeout: 0.25s
type: STATIC
lb_policy: ROUND_ROBIN
hosts: [{ socket_address: { address: 127.0.0.2, port_value: 1234 }}]
load_assignment:
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: 127.0.0.1
port_value: 1234
Mostly static with dynamic EDS
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand Down Expand Up @@ -150,7 +157,14 @@ on 127.0.0.3:5678 is provided below:
type: STATIC
lb_policy: ROUND_ROBIN
http2_protocol_options: {}
hosts: [{ socket_address: { address: 127.0.0.3, port_value: 5678 }}]
load_assignment:
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: 127.0.0.1
port_value: 5678
Notice above that *xds_cluster* is defined to point Envoy at the management server. Even in
an otherwise completely dynamic configurations, some static resources need to
Expand Down Expand Up @@ -214,7 +228,14 @@ below:
type: STATIC
lb_policy: ROUND_ROBIN
http2_protocol_options: {}
hosts: [{ socket_address: { address: 127.0.0.3, port_value: 5678 }}]
load_assignment:
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: 127.0.0.1
port_value: 5678
The management server could respond to LDS requests with:

Expand Down Expand Up @@ -543,11 +564,11 @@ the shared ADS channel.
Management Server Unreachability
--------------------------------

When Envoy instance looses connectivity with the management server, Envoy will latch on to
the previous configuration while actively retrying in the background to reestablish the
connection with the management server.
When an Envoy instance loses connectivity with the management server, Envoy will latch on to
the previous configuration while actively retrying in the background to reestablish the
connection with the management server.

Envoy debug logs the fact that it is not able to establish a connection with the management server
Envoy debug logs the fact that it is not able to establish a connection with the management server
every time it attempts a connection.

:ref:`upstream_cx_connect_fail <config_cluster_manager_cluster_stats>` a cluster level statistic
Expand Down
29 changes: 29 additions & 0 deletions docs/root/intro/arch_overview/health_checking.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,35 @@ unhealthy, successes required before marking a host healthy, etc.):
maintenance by setting the specified key to any value and waiting for traffic to drain. See
:ref:`redis_key <config_cluster_manager_cluster_hc_redis_key>`.

.. _arch_overview_per_cluster_health_check_config:

Per cluster member health check config
--------------------------------------

If active health checking is configured for an upstream cluster, a specific additional configuration
for each registered member can be specified by setting the
:ref:`HealthCheckConfig<envoy_api_msg_endpoint.Endpoint.HealthCheckConfig>`
in the :ref:`Endpoint<envoy_api_msg_endpoint.Endpoint>` of an :ref:`LbEndpoint<envoy_api_msg_endpoint.LbEndpoint>`
of each defined :ref:`LocalityLbEndpoints<envoy_api_msg_endpoint.LocalityLbEndpoints>` in a
:ref:`ClusterLoadAssignment<envoy_api_msg_ClusterLoadAssignment>`.

An example of setting up :ref:`health check config<envoy_api_msg_endpoint.Endpoint.HealthCheckConfig>`
to set a :ref:`cluster member<envoy_api_msg_endpoint.Endpoint>`'s alternative health check
:ref:`port<envoy_api_field_endpoint.Endpoint.HealthCheckConfig.port_value>` is:

.. code-block:: yaml
load_assignment:
endpoints:
- lb_endpoints:
- endpoint:
health_check_config:
port_value: 8080
address:
socket_address:
address: localhost
port_value: 80
.. _arch_overview_health_check_logging:

Health check event logging
Expand Down
15 changes: 15 additions & 0 deletions source/common/config/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,5 +225,20 @@ Grpc::AsyncClientFactoryPtr Utility::factoryForGrpcApiConfigSource(
return async_client_manager.factoryForGrpcService(grpc_service, scope, false);
}

envoy::api::v2::ClusterLoadAssignment Utility::translateClusterHosts(
const Protobuf::RepeatedPtrField<envoy::api::v2::core::Address>& hosts) {
envoy::api::v2::ClusterLoadAssignment load_assignment;
envoy::api::v2::endpoint::LocalityLbEndpoints* locality_lb_endpoints =
load_assignment.add_endpoints();
// Since this LocalityLbEndpoints is built from hosts list, set the default weight to 1.
locality_lb_endpoints->mutable_load_balancing_weight()->set_value(1);
for (const envoy::api::v2::core::Address& host : hosts) {
envoy::api::v2::endpoint::LbEndpoint* lb_endpoint = locality_lb_endpoints->add_lb_endpoints();
lb_endpoint->mutable_endpoint()->mutable_address()->MergeFrom(host);
lb_endpoint->mutable_load_balancing_weight()->set_value(1);
}
return load_assignment;
}

} // namespace Config
} // namespace Envoy
8 changes: 8 additions & 0 deletions source/common/config/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,14 @@ class Utility {
factoryForGrpcApiConfigSource(Grpc::AsyncClientManager& async_client_manager,
const envoy::api::v2::core::ApiConfigSource& api_config_source,
Stats::Scope& scope);

/**
* Translate a set of cluster's hosts into a load assignment configuration.
* @param hosts cluster's list of hosts.
* @return envoy::api::v2::ClusterLoadAssignment a load assignment configuration.
*/
static envoy::api::v2::ClusterLoadAssignment
translateClusterHosts(const Protobuf::RepeatedPtrField<envoy::api::v2::core::Address>& hosts);
};

} // namespace Config
Expand Down
2 changes: 1 addition & 1 deletion source/common/upstream/eds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ bool EdsClusterImpl::updateHostsPerLocality(const uint32_t priority, const HostV
info_->name(), host_set.hosts().size(), host_set.priority());

priority_state_manager.updateClusterPrioritySet(priority, std::move(current_hosts_copy),
hosts_added, hosts_removed);
hosts_added, hosts_removed, absl::nullopt);
return true;
}
return false;
Expand Down
45 changes: 30 additions & 15 deletions source/common/upstream/logical_dns_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace Upstream {
LogicalDnsCluster::LogicalDnsCluster(const envoy::api::v2::Cluster& cluster,
Runtime::Loader& runtime, Stats::Store& stats,
Ssl::ContextManager& ssl_context_manager,
const LocalInfo::LocalInfo& local_info,
Network::DnsResolverSharedPtr dns_resolver,
ThreadLocal::SlotAllocator& tls, ClusterManager& cm,
Event::Dispatcher& dispatcher, bool added_via_api)
Expand All @@ -27,10 +28,19 @@ LogicalDnsCluster::LogicalDnsCluster(const envoy::api::v2::Cluster& cluster,
dns_refresh_rate_ms_(
std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(cluster, dns_refresh_rate, 5000))),
tls_(tls.allocateSlot()),
resolve_timer_(dispatcher.createTimer([this]() -> void { startResolve(); })) {
const auto& hosts = cluster.hosts();
if (hosts.size() != 1) {
throw EnvoyException("logical_dns clusters must have a single host");
resolve_timer_(dispatcher.createTimer([this]() -> void { startResolve(); })),
local_info_(local_info),
load_assignment_(cluster.has_load_assignment()
? cluster.load_assignment()
: Config::Utility::translateClusterHosts(cluster.hosts())) {
const auto& locality_lb_endpoints = load_assignment_.endpoints();
if (locality_lb_endpoints.size() != 1 || locality_lb_endpoints[0].lb_endpoints().size() != 1) {
if (cluster.has_load_assignment()) {
throw EnvoyException(
"LOGICAL_DNS clusters must have a single locality_lb_endpoint and a single lb_endpoint");
} else {
throw EnvoyException("LOGICAL_DNS clusters must have a single host");
}
}

switch (cluster.dns_lookup_family()) {
Expand All @@ -47,7 +57,8 @@ LogicalDnsCluster::LogicalDnsCluster(const envoy::api::v2::Cluster& cluster,
NOT_REACHED_GCOVR_EXCL_LINE;
}

const auto& socket_address = hosts[0].socket_address();
const envoy::api::v2::core::SocketAddress& socket_address =
lbEndpoint().endpoint().address().socket_address();
dns_url_ = fmt::format("tcp://{}:{}", socket_address.address(), socket_address.port_value());
hostname_ = Network::Utility::hostFromTcpUrl(dns_url_);
Network::Utility::portFromTcpUrl(dns_url_);
Expand Down Expand Up @@ -88,7 +99,8 @@ void LogicalDnsCluster::startResolve() {
current_resolved_address_ = new_address;
// Capture URL to avoid a race with another update.
tls_->runOnAllThreads([this, new_address]() -> void {
tls_->getTyped<PerThreadCurrentHostData>().current_resolved_address_ = new_address;
PerThreadCurrentHostData& data = tls_->getTyped<PerThreadCurrentHostData>();
data.current_resolved_address_ = new_address;
});
}

Expand All @@ -107,14 +119,16 @@ void LogicalDnsCluster::startResolve() {
new LogicalHost(info_, hostname_, Network::Utility::getIpv6AnyAddress(), *this));
break;
}
HostVectorSharedPtr new_hosts(new HostVector());
new_hosts->emplace_back(logical_host_);
// Given the current config, only EDS clusters support multiple priorities.
ASSERT(priority_set_.hostSetsPerPriority().size() == 1);
auto& first_host_set = priority_set_.getOrCreateHostSet(0);
first_host_set.updateHosts(new_hosts, createHealthyHostList(*new_hosts),
HostsPerLocalityImpl::empty(), HostsPerLocalityImpl::empty(),
{}, *new_hosts, {});
const auto& locality_lb_endpoint = localityLbEndpoint();
PriorityStateManager priority_state_manager(*this, local_info_);
priority_state_manager.initializePriorityFor(locality_lb_endpoint);
priority_state_manager.registerHostForPriority(logical_host_, locality_lb_endpoint,
lbEndpoint(), absl::nullopt);

const uint32_t priority = locality_lb_endpoint.priority();
priority_state_manager.updateClusterPrioritySet(
priority, std::move(priority_state_manager.priorityState()[priority].first),
absl::nullopt, absl::nullopt, absl::nullopt);
}
}

Expand All @@ -131,7 +145,8 @@ Upstream::Host::CreateConnectionData LogicalDnsCluster::LogicalHost::createConne
return {HostImpl::createConnection(dispatcher, *parent_.info_, data.current_resolved_address_,
options),
HostDescriptionConstSharedPtr{
new RealHostDescription(data.current_resolved_address_, shared_from_this())}};
new RealHostDescription(data.current_resolved_address_, parent_.localityLbEndpoint(),
parent_.lbEndpoint(), shared_from_this())}};
}

} // namespace Upstream
Expand Down
42 changes: 34 additions & 8 deletions source/common/upstream/logical_dns_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class LogicalDnsCluster : public ClusterImplBase {
public:
LogicalDnsCluster(const envoy::api::v2::Cluster& cluster, Runtime::Loader& runtime,
Stats::Store& stats, Ssl::ContextManager& ssl_context_manager,
const LocalInfo::LocalInfo& local_info,
Network::DnsResolverSharedPtr dns_resolver, ThreadLocal::SlotAllocator& tls,
ClusterManager& cm, Event::Dispatcher& dispatcher, bool added_via_api);

Expand All @@ -42,9 +43,10 @@ class LogicalDnsCluster : public ClusterImplBase {
struct LogicalHost : public HostImpl {
LogicalHost(ClusterInfoConstSharedPtr cluster, const std::string& hostname,
Network::Address::InstanceConstSharedPtr address, LogicalDnsCluster& parent)
: HostImpl(cluster, hostname, address, envoy::api::v2::core::Metadata::default_instance(),
1, envoy::api::v2::core::Locality().default_instance(),
envoy::api::v2::endpoint::Endpoint::HealthCheckConfig().default_instance()),
: HostImpl(cluster, hostname, address, parent.lbEndpoint().metadata(),
parent.lbEndpoint().load_balancing_weight().value(),
parent.localityLbEndpoint().locality(),
parent.lbEndpoint().endpoint().health_check_config()),
parent_(parent) {}

// Upstream::Host
Expand All @@ -57,10 +59,17 @@ class LogicalDnsCluster : public ClusterImplBase {

struct RealHostDescription : public HostDescription {
RealHostDescription(Network::Address::InstanceConstSharedPtr address,
const envoy::api::v2::endpoint::LocalityLbEndpoints& locality_lb_endpoint,
const envoy::api::v2::endpoint::LbEndpoint& lb_endpoint,
HostConstSharedPtr logical_host)
: address_(address), logical_host_(logical_host),
metadata_(std::make_shared<envoy::api::v2::core::Metadata>(
envoy::api::v2::core::Metadata::default_instance())) {}
metadata_(std::make_shared<envoy::api::v2::core::Metadata>(lb_endpoint.metadata())),
health_check_address_(
lb_endpoint.endpoint().health_check_config().port_value() == 0
? address
: Network::Utility::getAddressWithPort(
*address, lb_endpoint.endpoint().health_check_config().port_value())),
locality_lb_endpoint_(locality_lb_endpoint), lb_endpoint_(lb_endpoint) {}

// Upstream:HostDescription
bool canary() const override { return false; }
Expand All @@ -81,21 +90,36 @@ class LogicalDnsCluster : public ClusterImplBase {
const std::string& hostname() const override { return logical_host_->hostname(); }
Network::Address::InstanceConstSharedPtr address() const override { return address_; }
const envoy::api::v2::core::Locality& locality() const override {
return envoy::api::v2::core::Locality().default_instance();
return locality_lb_endpoint_.locality();
}
// TODO(dio): To support different address port.
Network::Address::InstanceConstSharedPtr healthCheckAddress() const override {
return address_;
return health_check_address_;
}
uint32_t priority() const { return locality_lb_endpoint_.priority(); }
Network::Address::InstanceConstSharedPtr address_;
HostConstSharedPtr logical_host_;
const std::shared_ptr<envoy::api::v2::core::Metadata> metadata_;
Network::Address::InstanceConstSharedPtr health_check_address_;
const envoy::api::v2::endpoint::LocalityLbEndpoints& locality_lb_endpoint_;
const envoy::api::v2::endpoint::LbEndpoint& lb_endpoint_;
};

struct PerThreadCurrentHostData : public ThreadLocal::ThreadLocalObject {
Network::Address::InstanceConstSharedPtr current_resolved_address_;
};

const envoy::api::v2::endpoint::LocalityLbEndpoints& localityLbEndpoint() const {
// This is checked in the constructor, i.e. at config load time.
ASSERT(load_assignment_.endpoints().size() == 1);
return load_assignment_.endpoints()[0];
}

const envoy::api::v2::endpoint::LbEndpoint& lbEndpoint() const {
// This is checked in the constructor, i.e. at config load time.
ASSERT(localityLbEndpoint().lb_endpoints().size() == 1);
return localityLbEndpoint().lb_endpoints()[0];
}

void startResolve();

// ClusterImplBase
Expand All @@ -111,6 +135,8 @@ class LogicalDnsCluster : public ClusterImplBase {
Network::Address::InstanceConstSharedPtr current_resolved_address_;
HostSharedPtr logical_host_;
Network::ActiveDnsQuery* active_dns_query_{};
const LocalInfo::LocalInfo& local_info_;
const envoy::api::v2::ClusterLoadAssignment load_assignment_;
};

} // namespace Upstream
Expand Down
Loading

0 comments on commit b32eabf

Please sign in to comment.