Skip to content

Commit

Permalink
upstream: introduce weighted locality as wrapper around endpoint's Lo…
Browse files Browse the repository at this point in the history
…cality

Currently Envoy::Upstream::StrictDnsClusterImpl::ResolveTarget when
instantiated for every endpoint also creates a full copy of
envoy::config::endpoint::v3::LocalityLbEndpoints the endpoint belongs
to. Given the message contains all the endpoints defined for it this
leads to exponential growth of consumed memory as the number of
endpoints increases. Even though those copies of endpoints are not
used.

Instead of creating a copy of
envoy::config::endpoint::v3::LocalityLbEndpoints introduce
WeightedLocality class which is a wrapper around
envoy::config::core::v3::Locality with priority and load balancing
weight actually used in the upstream implementation.

Signed-off-by: Dmitry Rozhkov <[email protected]>
  • Loading branch information
rojkov committed Feb 10, 2021
1 parent 684afd1 commit 3de74c1
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 33 deletions.
12 changes: 12 additions & 0 deletions source/common/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,16 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "weighted_locality_lib",
srcs = ["weighted_locality.cc"],
hdrs = ["weighted_locality.h"],
deps = [
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
"@envoy_api//envoy/config/endpoint/v3:pkg_cc_proto",
],
)

envoy_cc_library(
name = "health_discovery_service_lib",
srcs = ["health_discovery_service.cc"],
Expand Down Expand Up @@ -499,6 +509,7 @@ envoy_cc_library(
deps = [
":cluster_factory_includes",
":upstream_includes",
":weighted_locality_lib",
"@envoy_api//envoy/config/cluster/v3:pkg_cc_proto",
"@envoy_api//envoy/config/endpoint/v3:pkg_cc_proto",
],
Expand All @@ -515,6 +526,7 @@ envoy_cc_library(
":load_balancer_lib",
":outlier_detection_lib",
":resource_manager_lib",
":weighted_locality_lib",
"//include/envoy/event:timer_interface",
"//include/envoy/local_info:local_info_interface",
"//include/envoy/network:dns_interface",
Expand Down
16 changes: 8 additions & 8 deletions source/common/upstream/strict_dns_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ void StrictDnsClusterImpl::updateAllHosts(const HostVector& hosts_added,
// duplicated hosts inside a priority. And if we want to enforce this behavior, it should be done
// inside the priority state manager.
for (const ResolveTargetPtr& target : resolve_targets_) {
priority_state_manager.initializePriorityFor(target->locality_lb_endpoint_);
priority_state_manager.initializePriorityFor(target->weighted_locality_);
for (const HostSharedPtr& host : target->hosts_) {
if (target->locality_lb_endpoint_.priority() == current_priority) {
priority_state_manager.registerHostForPriority(host, target->locality_lb_endpoint_);
if (target->weighted_locality_.priority() == current_priority) {
priority_state_manager.registerHostForPriority(host, target->weighted_locality_);
}
}
}
Expand All @@ -94,7 +94,7 @@ StrictDnsClusterImpl::ResolveTarget::ResolveTarget(
: parent_(parent), dns_address_(Network::Utility::hostFromTcpUrl(url)),
port_(Network::Utility::portFromTcpUrl(url)),
resolve_timer_(dispatcher.createTimer([this]() -> void { startResolve(); })),
locality_lb_endpoint_(locality_lb_endpoint), lb_endpoint_(lb_endpoint) {}
weighted_locality_(locality_lb_endpoint), lb_endpoint_(lb_endpoint) {}

StrictDnsClusterImpl::ResolveTarget::~ResolveTarget() {
if (active_query_) {
Expand Down Expand Up @@ -133,8 +133,8 @@ void StrictDnsClusterImpl::ResolveTarget::startResolve() {
Network::Utility::getAddressWithPort(*(resp.address_), port_),
// TODO(zyfjeff): Created through metadata shared pool
std::make_shared<const envoy::config::core::v3::Metadata>(lb_endpoint_.metadata()),
lb_endpoint_.load_balancing_weight().value(), locality_lb_endpoint_.locality(),
lb_endpoint_.endpoint().health_check_config(), locality_lb_endpoint_.priority(),
lb_endpoint_.load_balancing_weight().value(), weighted_locality_.locality(),
lb_endpoint_.endpoint().health_check_config(), weighted_locality_.priority(),
lb_endpoint_.health_status(), parent_.time_source_));
all_new_hosts.emplace(new_hosts.back()->address()->asString());
ttl_refresh_rate = min(ttl_refresh_rate, resp.ttl_);
Expand All @@ -146,9 +146,9 @@ void StrictDnsClusterImpl::ResolveTarget::startResolve() {
updated_hosts, all_hosts_, all_new_hosts)) {
ENVOY_LOG(debug, "DNS hosts have changed for {}", dns_address_);
ASSERT(std::all_of(hosts_.begin(), hosts_.end(), [&](const auto& host) {
return host->priority() == locality_lb_endpoint_.priority();
return host->priority() == weighted_locality_.priority();
}));
parent_.updateAllHosts(hosts_added, hosts_removed, locality_lb_endpoint_.priority());
parent_.updateAllHosts(hosts_added, hosts_removed, weighted_locality_.priority());
} else {
parent_.info_->stats().update_no_rebuild_.inc();
}
Expand Down
3 changes: 2 additions & 1 deletion source/common/upstream/strict_dns_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "common/upstream/cluster_factory_impl.h"
#include "common/upstream/upstream_impl.h"
#include "common/upstream/weighted_locality.h"

namespace Envoy {
namespace Upstream {
Expand Down Expand Up @@ -38,7 +39,7 @@ class StrictDnsClusterImpl : public BaseDynamicClusterImpl {
uint32_t port_;
Event::TimerPtr resolve_timer_;
HostVector hosts_;
const envoy::config::endpoint::v3::LocalityLbEndpoints locality_lb_endpoint_;
const WeightedLocality weighted_locality_;
const envoy::config::endpoint::v3::LbEndpoint lb_endpoint_;
HostMap all_hosts_;
};
Expand Down
26 changes: 12 additions & 14 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1286,39 +1286,37 @@ PriorityStateManager::PriorityStateManager(ClusterImplBase& cluster,
PrioritySet::HostUpdateCb* update_cb)
: parent_(cluster), local_info_node_(local_info.node()), update_cb_(update_cb) {}

void PriorityStateManager::initializePriorityFor(
const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint) {
const uint32_t priority = locality_lb_endpoint.priority();
void PriorityStateManager::initializePriorityFor(const WeightedLocality& weighted_locality) {
const uint32_t priority = weighted_locality.priority();
if (priority_state_.size() <= priority) {
priority_state_.resize(priority + 1);
}
if (priority_state_[priority].first == nullptr) {
priority_state_[priority].first = std::make_unique<HostVector>();
}
if (locality_lb_endpoint.has_locality() && locality_lb_endpoint.has_load_balancing_weight()) {
priority_state_[priority].second[locality_lb_endpoint.locality()] =
locality_lb_endpoint.load_balancing_weight().value();
if (weighted_locality.isWeightSet()) {
priority_state_[priority].second[weighted_locality.locality()] =
weighted_locality.loadBalancingWeight();
}
}

void PriorityStateManager::registerHostForPriority(
const std::string& hostname, Network::Address::InstanceConstSharedPtr address,
const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint,
const WeightedLocality& weighted_locality,
const envoy::config::endpoint::v3::LbEndpoint& lb_endpoint, TimeSource& time_source) {
auto metadata = lb_endpoint.has_metadata()
? parent_.constMetadataSharedPool()->getObject(lb_endpoint.metadata())
: nullptr;
const HostSharedPtr host(new HostImpl(
parent_.info(), hostname, address, metadata, lb_endpoint.load_balancing_weight().value(),
locality_lb_endpoint.locality(), lb_endpoint.endpoint().health_check_config(),
locality_lb_endpoint.priority(), lb_endpoint.health_status(), time_source));
registerHostForPriority(host, locality_lb_endpoint);
weighted_locality.locality(), lb_endpoint.endpoint().health_check_config(),
weighted_locality.priority(), lb_endpoint.health_status(), time_source));
registerHostForPriority(host, weighted_locality);
}

void PriorityStateManager::registerHostForPriority(
const HostSharedPtr& host,
const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint) {
const uint32_t priority = locality_lb_endpoint.priority();
void PriorityStateManager::registerHostForPriority(const HostSharedPtr& host,
const WeightedLocality& weighted_locality) {
const uint32_t priority = weighted_locality.priority();
// Should be called after initializePriorityFor.
ASSERT(priority_state_[priority].first);
priority_state_[priority].first->emplace_back(host);
Expand Down
20 changes: 10 additions & 10 deletions source/common/upstream/upstream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
#include "common/upstream/outlier_detection_impl.h"
#include "common/upstream/resource_manager_impl.h"
#include "common/upstream/transport_socket_match_impl.h"
#include "common/upstream/weighted_locality.h"

#include "server/transport_socket_config_impl.h"

Expand Down Expand Up @@ -850,22 +851,21 @@ class PriorityStateManager : protected Logger::Loggable<Logger::Id::upstream> {
PrioritySet::HostUpdateCb* update_cb);

// Initializes the PriorityState vector based on the priority specified in locality_lb_endpoint.
void initializePriorityFor(
const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint);
void initializePriorityFor(const WeightedLocality& weighted_locality);

// Registers a host based on its address to the PriorityState based on the specified priority (the
// priority is specified by locality_lb_endpoint.priority()).
//
// The specified health_checker_flag is used to set the registered-host's health-flag when the
// lb_endpoint health status is unhealthy, draining or timeout.
void registerHostForPriority(
const std::string& hostname, Network::Address::InstanceConstSharedPtr address,
const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint,
const envoy::config::endpoint::v3::LbEndpoint& lb_endpoint, TimeSource& time_source);

void registerHostForPriority(
const HostSharedPtr& host,
const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint);
void registerHostForPriority(const std::string& hostname,
Network::Address::InstanceConstSharedPtr address,
const WeightedLocality& weighted_locality,
const envoy::config::endpoint::v3::LbEndpoint& lb_endpoint,
TimeSource& time_source);

void registerHostForPriority(const HostSharedPtr& host,
const WeightedLocality& weighted_locality);

void
updateClusterPrioritySet(const uint32_t priority, HostVectorSharedPtr&& current_hosts,
Expand Down
14 changes: 14 additions & 0 deletions source/common/upstream/weighted_locality.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#include "common/upstream/weighted_locality.h"

namespace Envoy {
namespace Upstream {

WeightedLocality::WeightedLocality(
const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint)
: is_weight_set_{locality_lb_endpoint.has_locality() &&
locality_lb_endpoint.has_load_balancing_weight()},
load_balancing_weight_{locality_lb_endpoint.load_balancing_weight().value()},
locality_{locality_lb_endpoint.locality()}, priority_{locality_lb_endpoint.priority()} {}

} // namespace Upstream
} // namespace Envoy
27 changes: 27 additions & 0 deletions source/common/upstream/weighted_locality.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once

#include "envoy/config/core/v3/base.pb.h"
#include "envoy/config/endpoint/v3/endpoint_components.pb.h"

namespace Envoy {
namespace Upstream {

// An endpoint's locality decorated with priority and weight.
class WeightedLocality {
public:
WeightedLocality(const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint);

bool isWeightSet() const { return is_weight_set_; }
uint32_t loadBalancingWeight() const { return load_balancing_weight_; }
const envoy::config::core::v3::Locality& locality() const { return locality_; }
uint32_t priority() const { return priority_; }

private:
const bool is_weight_set_;
const uint32_t load_balancing_weight_;
const envoy::config::core::v3::Locality locality_;
const uint32_t priority_;
};

} // namespace Upstream
} // namespace Envoy

0 comments on commit 3de74c1

Please sign in to comment.