Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upstream: switch zone-aware routing to single level locality-aware. #1797

Merged
merged 6 commits into from
Oct 3, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions include/envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,18 @@ class HostSet {
virtual const std::vector<HostSharedPtr>& healthyHosts() const PURE;

/**
* @return hosts per zone, index 0 is dedicated to local zone hosts.
* If there are no hosts in local zone for upstream cluster hostPerZone() will @return
* @return hosts per locality, index 0 is dedicated to local locality hosts.
* If there are no hosts in local locality for upstream cluster hosstPerLocality() will @return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

* empty vector.
*
* Note, that we sort zones in alphabetical order starting from index 1.
* Note, that we sort localities in lexicographic order starting from index 1.
*/
virtual const std::vector<std::vector<HostSharedPtr>>& hostsPerZone() const PURE;
virtual const std::vector<std::vector<HostSharedPtr>>& hostsPerLocality() const PURE;

/**
* @return same as hostsPerZone but only contains healthy hosts.
* @return same as hostsPerLocality but only contains healthy hosts.
*/
virtual const std::vector<std::vector<HostSharedPtr>>& healthyHostsPerZone() const PURE;
virtual const std::vector<std::vector<HostSharedPtr>>& healthyHostsPerLocality() const PURE;
};

/**
Expand Down
21 changes: 11 additions & 10 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -379,16 +379,16 @@ void ClusterManagerImpl::postThreadLocalClusterUpdate(
HostVectorConstSharedPtr hosts_copy(new std::vector<HostSharedPtr>(primary_cluster.hosts()));
HostVectorConstSharedPtr healthy_hosts_copy(
new std::vector<HostSharedPtr>(primary_cluster.healthyHosts()));
HostListsConstSharedPtr hosts_per_zone_copy(
new std::vector<std::vector<HostSharedPtr>>(primary_cluster.hostsPerZone()));
HostListsConstSharedPtr healthy_hosts_per_zone_copy(
new std::vector<std::vector<HostSharedPtr>>(primary_cluster.healthyHostsPerZone()));
HostListsConstSharedPtr hosts_per_locality_copy(
new std::vector<std::vector<HostSharedPtr>>(primary_cluster.hostsPerLocality()));
HostListsConstSharedPtr healthy_hosts_per_locality_copy(
new std::vector<std::vector<HostSharedPtr>>(primary_cluster.healthyHostsPerLocality()));

tls_->runOnAllThreads([this, name, hosts_copy, healthy_hosts_copy, hosts_per_zone_copy,
healthy_hosts_per_zone_copy, hosts_added, hosts_removed]() -> void {
tls_->runOnAllThreads([this, name, hosts_copy, healthy_hosts_copy, hosts_per_locality_copy,
healthy_hosts_per_locality_copy, hosts_added, hosts_removed]() -> void {
ThreadLocalClusterManagerImpl::updateClusterMembership(
name, hosts_copy, healthy_hosts_copy, hosts_per_zone_copy, healthy_hosts_per_zone_copy,
hosts_added, hosts_removed, *tls_);
name, hosts_copy, healthy_hosts_copy, hosts_per_locality_copy,
healthy_hosts_per_locality_copy, hosts_added, hosts_removed, *tls_);
});
}

Expand Down Expand Up @@ -512,15 +512,16 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainConnPools(

void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership(
const std::string& name, HostVectorConstSharedPtr hosts, HostVectorConstSharedPtr healthy_hosts,
HostListsConstSharedPtr hosts_per_zone, HostListsConstSharedPtr healthy_hosts_per_zone,
HostListsConstSharedPtr hosts_per_locality, HostListsConstSharedPtr healthy_hosts_per_locality,
const std::vector<HostSharedPtr>& hosts_added, const std::vector<HostSharedPtr>& hosts_removed,
ThreadLocal::Slot& tls) {

ThreadLocalClusterManagerImpl& config = tls.getTyped<ThreadLocalClusterManagerImpl>();

ASSERT(config.thread_local_clusters_.find(name) != config.thread_local_clusters_.end());
config.thread_local_clusters_[name]->host_set_.updateHosts(
hosts, healthy_hosts, hosts_per_zone, healthy_hosts_per_zone, hosts_added, hosts_removed);
hosts, healthy_hosts, hosts_per_locality, healthy_hosts_per_locality, hosts_added,
hosts_removed);
}

ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::ClusterEntry(
Expand Down
4 changes: 2 additions & 2 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
void drainConnPools(HostSharedPtr old_host, ConnPoolsContainer& container);
static void updateClusterMembership(const std::string& name, HostVectorConstSharedPtr hosts,
HostVectorConstSharedPtr healthy_hosts,
HostListsConstSharedPtr hosts_per_zone,
HostListsConstSharedPtr healthy_hosts_per_zone,
HostListsConstSharedPtr hosts_per_locality,
HostListsConstSharedPtr healthy_hosts_per_locality,
const std::vector<HostSharedPtr>& hosts_added,
const std::vector<HostSharedPtr>& hosts_removed,
ThreadLocal::Slot& tls);
Expand Down
27 changes: 14 additions & 13 deletions source/common/upstream/eds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,30 +72,31 @@ void EdsClusterImpl::onConfigUpdate(const ResourceVector& resources) {
if (updateDynamicHostList(new_hosts, *current_hosts_copy, hosts_added, hosts_removed,
health_checker_ != nullptr)) {
ENVOY_LOG(debug, "EDS hosts changed for cluster: {} ({})", info_->name(), hosts().size());
HostListsSharedPtr per_zone(new std::vector<std::vector<HostSharedPtr>>());
HostListsSharedPtr per_locality(new std::vector<std::vector<HostSharedPtr>>());

// If local zone name is not defined then skip populating per zone hosts.
if (!local_info_.zoneName().empty()) {
std::map<std::string, std::vector<HostSharedPtr>> hosts_per_zone;
// If local locality is not defined then skip populating per locality hosts.
const Locality local_locality(local_info_.node().locality());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why copy here? Reference? Or does locality() return a copy?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I see this is a helper that unpacks the locality.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do it more efficiently, with std::string& in the std::tuple, but I think this might be an issue with our multiple string implementation today.

if (!local_locality.empty()) {
std::map<Locality, std::vector<HostSharedPtr>> hosts_per_locality;

for (const HostSharedPtr& host : *current_hosts_copy) {
hosts_per_zone[host->locality().zone()].push_back(host);
hosts_per_locality[Locality(host->locality())].push_back(host);
}

// Populate per_zone hosts only if upstream cluster has hosts in the same zone.
if (hosts_per_zone.find(local_info_.zoneName()) != hosts_per_zone.end()) {
per_zone->push_back(hosts_per_zone[local_info_.zoneName()]);
// Populate per_locality hosts only if upstream cluster has hosts in the same locality.
if (hosts_per_locality.find(local_locality) != hosts_per_locality.end()) {
per_locality->push_back(hosts_per_locality[local_locality]);

for (auto& entry : hosts_per_zone) {
if (local_info_.zoneName() != entry.first) {
per_zone->push_back(entry.second);
for (auto& entry : hosts_per_locality) {
if (local_locality != entry.first) {
per_locality->push_back(entry.second);
}
}
}
}

updateHosts(current_hosts_copy, createHealthyHostList(*current_hosts_copy), per_zone,
createHealthyHostLists(*per_zone), hosts_added, hosts_removed);
updateHosts(current_hosts_copy, createHealthyHostList(*current_hosts_copy), per_locality,
createHealthyHostLists(*per_locality), hosts_added, hosts_removed);

if (initialize_callback_ && health_checker_ && pending_health_checks_ == 0) {
pending_health_checks_ = hosts().size();
Expand Down
137 changes: 70 additions & 67 deletions source/common/upstream/load_balancer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
namespace Envoy {
namespace Upstream {

static const std::string RuntimeZoneEnabled = "upstream.zone_routing.enabled";
static const std::string RuntimezoneEnabled = "upstream.zone_routing.enabled";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change case?

static const std::string RuntimeMinClusterSize = "upstream.zone_routing.min_cluster_size";
static const std::string RuntimePanicThreshold = "upstream.healthy_panic_threshold";

Expand All @@ -25,11 +25,11 @@ LoadBalancerBase::LoadBalancerBase(const HostSet& host_set, const HostSet* local
if (local_host_set_) {
host_set_.addMemberUpdateCb(
[this](const std::vector<HostSharedPtr>&, const std::vector<HostSharedPtr>&) -> void {
regenerateZoneRoutingStructures();
regenerateLocalityRoutingStructures();
});
local_host_set_member_update_cb_handle_ = local_host_set_->addMemberUpdateCb(
[this](const std::vector<HostSharedPtr>&, const std::vector<HostSharedPtr>&) -> void {
regenerateZoneRoutingStructures();
regenerateLocalityRoutingStructures();
});
}
}
Expand All @@ -40,44 +40,45 @@ LoadBalancerBase::~LoadBalancerBase() {
}
}

void LoadBalancerBase::regenerateZoneRoutingStructures() {
void LoadBalancerBase::regenerateLocalityRoutingStructures() {
stats_.lb_recalculate_zone_structures_.inc();

// Do not perform any calculations if we cannot perform zone routing based on non runtime params.
if (earlyExitNonZoneRouting()) {
zone_routing_state_ = ZoneRoutingState::NoZoneRouting;
// Do not perform any calculations if we cannot perform locality routing based on non runtime
// params.
if (earlyExitNonLocalityRouting()) {
locality_routing_state_ = LocalityRoutingState::NoLocalityRouting;
return;
}

size_t num_zones = host_set_.healthyHostsPerZone().size();
ASSERT(num_zones > 0);
size_t num_localitys = host_set_.healthyHostsPerLocality().size();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

localities

ASSERT(num_localitys > 0);

uint64_t local_percentage[num_zones];
calculateZonePercentage(local_host_set_->healthyHostsPerZone(), local_percentage);
uint64_t local_percentage[num_localitys];
calculateLocalityPercentage(local_host_set_->healthyHostsPerLocality(), local_percentage);

uint64_t upstream_percentage[num_zones];
calculateZonePercentage(host_set_.healthyHostsPerZone(), upstream_percentage);
uint64_t upstream_percentage[num_localitys];
calculateLocalityPercentage(host_set_.healthyHostsPerLocality(), upstream_percentage);

// If we have lower percent of hosts in the local cluster in the same zone,
// we can push all of the requests directly to upstream cluster in the same zone.
// If we have lower percent of hosts in the local cluster in the same locality,
// we can push all of the requests directly to upstream cluster in the same locality.
if (upstream_percentage[0] >= local_percentage[0]) {
zone_routing_state_ = ZoneRoutingState::ZoneDirect;
locality_routing_state_ = LocalityRoutingState::LocalityDirect;
return;
}

zone_routing_state_ = ZoneRoutingState::ZoneResidual;
locality_routing_state_ = LocalityRoutingState::LocalityResidual;

// If we cannot route all requests to the same zone, calculate what percentage can be routed.
// If we cannot route all requests to the same locality, calculate what percentage can be routed.
// For example, if local percentage is 20% and upstream is 10%
// we can route only 50% of requests directly.
local_percent_to_route_ = upstream_percentage[0] * 10000 / local_percentage[0];

// Local zone does not have additional capacity (we have already routed what we could).
// Now we need to figure out how much traffic we can route cross zone and to which exact zone
// we should route. Percentage of requests routed cross zone to a specific zone needed be
// proportional to the residual capacity upstream zone has.
// Local locality does not have additional capacity (we have already routed what we could).
// Now we need to figure out how much traffic we can route cross locality and to which exact
// locality we should route. Percentage of requests routed cross locality to a specific locality
// needed be proportional to the residual capacity upstream locality has.
//
// residual_capacity contains capacity left in a given zone, we keep accumulating residual
// residual_capacity contains capacity left in a given locality, we keep accumulating residual
// capacity to make search for sampled value easier.
// For example, if we have the following upstream and local percentage:
// local_percentage: 40000 40000 20000
Expand All @@ -86,41 +87,42 @@ void LoadBalancerBase::regenerateZoneRoutingStructures() {
// bucket sizes (residual capacity). For simplicity of finding where specific
// sampled value is, we accumulate values in residual capacity. This is what it will look like:
// residual_capacity: 0 10000 15000
// Now to find a zone to route (bucket) we could simply iterate over residual_capacity searching
// where sampled value is placed.
residual_capacity_.resize(num_zones);
// Now to find a locality to route (bucket) we could simply iterate over residual_capacity
// searching where sampled value is placed.
residual_capacity_.resize(num_localitys);

// Local zone (index 0) does not have residual capacity as we have routed all we could.
// Local locality (index 0) does not have residual capacity as we have routed all we could.
residual_capacity_[0] = 0;
for (size_t i = 1; i < num_zones; ++i) {
// Only route to the zones that have additional capacity.
for (size_t i = 1; i < num_localitys; ++i) {
// Only route to the localitys that have additional capacity.
if (upstream_percentage[i] > local_percentage[i]) {
residual_capacity_[i] =
residual_capacity_[i - 1] + upstream_percentage[i] - local_percentage[i];
} else {
// Zone with index "i" does not have residual capacity, but we keep accumulating previous
// Locality with index "i" does not have residual capacity, but we keep accumulating previous
// values to make search easier on the next step.
residual_capacity_[i] = residual_capacity_[i - 1];
}
}
};

bool LoadBalancerBase::earlyExitNonZoneRouting() {
if (host_set_.healthyHostsPerZone().size() < 2) {
bool LoadBalancerBase::earlyExitNonLocalityRouting() {
if (host_set_.healthyHostsPerLocality().size() < 2) {
return true;
}

if (host_set_.healthyHostsPerZone()[0].empty()) {
if (host_set_.healthyHostsPerLocality()[0].empty()) {
return true;
}

// Same number of zones should be for local and upstream cluster.
if (host_set_.healthyHostsPerZone().size() != local_host_set_->healthyHostsPerZone().size()) {
// Same number of localitys should be for local and upstream cluster.
if (host_set_.healthyHostsPerLocality().size() !=
local_host_set_->healthyHostsPerLocality().size()) {
stats_.lb_zone_number_differs_.inc();
return true;
}

// Do not perform zone routing for small clusters.
// Do not perform locality routing for small clusters.
uint64_t min_cluster_size = runtime_.snapshot().getInteger(RuntimeMinClusterSize, 6U);
if (host_set_.healthyHosts().size() < min_cluster_size) {
stats_.lb_zone_cluster_too_small_.inc();
Expand All @@ -145,65 +147,66 @@ bool LoadBalancerUtility::isGlobalPanic(const HostSet& host_set, Runtime::Loader
return false;
}

void LoadBalancerBase::calculateZonePercentage(
const std::vector<std::vector<HostSharedPtr>>& hosts_per_zone, uint64_t* ret) {
void LoadBalancerBase::calculateLocalityPercentage(
const std::vector<std::vector<HostSharedPtr>>& hosts_per_locality, uint64_t* ret) {
uint64_t total_hosts = 0;
for (const auto& zone_hosts : hosts_per_zone) {
total_hosts += zone_hosts.size();
for (const auto& locality_hosts : hosts_per_locality) {
total_hosts += locality_hosts.size();
}

size_t i = 0;
for (const auto& zone_hosts : hosts_per_zone) {
ret[i++] = total_hosts > 0 ? 10000ULL * zone_hosts.size() / total_hosts : 0;
for (const auto& locality_hosts : hosts_per_locality) {
ret[i++] = total_hosts > 0 ? 10000ULL * locality_hosts.size() / total_hosts : 0;
}
}

const std::vector<HostSharedPtr>& LoadBalancerBase::tryChooseLocalZoneHosts() {
ASSERT(zone_routing_state_ != ZoneRoutingState::NoZoneRouting);
const std::vector<HostSharedPtr>& LoadBalancerBase::tryChooseLocalLocalityHosts() {
ASSERT(locality_routing_state_ != LocalityRoutingState::NoLocalityRouting);

// At this point it's guaranteed to be at least 2 zones.
size_t number_of_zones = host_set_.healthyHostsPerZone().size();
// At this point it's guaranteed to be at least 2 localitys.
size_t number_of_localitys = host_set_.healthyHostsPerLocality().size();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

localities


ASSERT(number_of_zones >= 2U);
ASSERT(local_host_set_->healthyHostsPerZone().size() == host_set_.healthyHostsPerZone().size());
ASSERT(number_of_localitys >= 2U);
ASSERT(local_host_set_->healthyHostsPerLocality().size() ==
host_set_.healthyHostsPerLocality().size());

// Try to push all of the requests to the same zone first.
if (zone_routing_state_ == ZoneRoutingState::ZoneDirect) {
// Try to push all of the requests to the same locality first.
if (locality_routing_state_ == LocalityRoutingState::LocalityDirect) {
stats_.lb_zone_routing_all_directly_.inc();
return host_set_.healthyHostsPerZone()[0];
return host_set_.healthyHostsPerLocality()[0];
}

ASSERT(zone_routing_state_ == ZoneRoutingState::ZoneResidual);
ASSERT(locality_routing_state_ == LocalityRoutingState::LocalityResidual);

// If we cannot route all requests to the same zone, we already calculated how much we can
// push to the local zone, check if we can push to local zone on current iteration.
// If we cannot route all requests to the same locality, we already calculated how much we can
// push to the local locality, check if we can push to local locality on current iteration.
if (random_.random() % 10000 < local_percent_to_route_) {
stats_.lb_zone_routing_sampled_.inc();
return host_set_.healthyHostsPerZone()[0];
return host_set_.healthyHostsPerLocality()[0];
}

// At this point we must route cross zone as we cannot route to the local zone.
// At this point we must route cross locality as we cannot route to the local locality.
stats_.lb_zone_routing_cross_zone_.inc();

// This is *extremely* unlikely but possible due to rounding errors when calculating
// zone percentages. In this case just select random zone.
if (residual_capacity_[number_of_zones - 1] == 0) {
// locality percentages. In this case just select random locality.
if (residual_capacity_[number_of_localitys - 1] == 0) {
stats_.lb_zone_no_capacity_left_.inc();
return host_set_.healthyHostsPerZone()[random_.random() % number_of_zones];
return host_set_.healthyHostsPerLocality()[random_.random() % number_of_localitys];
}

// Random sampling to select specific zone for cross zone traffic based on the additional
// capacity in zones.
uint64_t threshold = random_.random() % residual_capacity_[number_of_zones - 1];
// Random sampling to select specific locality for cross locality traffic based on the additional
// capacity in localitys.
uint64_t threshold = random_.random() % residual_capacity_[number_of_localitys - 1];

// This potentially can be optimized to be O(log(N)) where N is the number of zones.
// This potentially can be optimized to be O(log(N)) where N is the number of localitys.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me guess: emacs query-replace ? /zone/locality/ ? ;)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, tree-wide search/replacing, will fix tomorrow, thanks for picking up these.

// Linear scan should be faster for smaller N, in most of the scenarios N will be small.
int i = 0;
while (threshold > residual_capacity_[i]) {
i++;
}

return host_set_.healthyHostsPerZone()[i];
return host_set_.healthyHostsPerLocality()[i];
}

const std::vector<HostSharedPtr>& LoadBalancerBase::hostsToUse() {
Expand All @@ -214,11 +217,11 @@ const std::vector<HostSharedPtr>& LoadBalancerBase::hostsToUse() {
return host_set_.hosts();
}

if (zone_routing_state_ == ZoneRoutingState::NoZoneRouting) {
if (locality_routing_state_ == LocalityRoutingState::NoLocalityRouting) {
return host_set_.healthyHosts();
}

if (!runtime_.snapshot().featureEnabled(RuntimeZoneEnabled, 100)) {
if (!runtime_.snapshot().featureEnabled(RuntimezoneEnabled, 100)) {
return host_set_.healthyHosts();
}

Expand All @@ -227,7 +230,7 @@ const std::vector<HostSharedPtr>& LoadBalancerBase::hostsToUse() {
return host_set_.healthyHosts();
}

return tryChooseLocalZoneHosts();
return tryChooseLocalLocalityHosts();
}

HostConstSharedPtr RoundRobinLoadBalancer::chooseHost(const LoadBalancerContext*) {
Expand Down
Loading