-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding priority levels to Envoy endpoints (not yet used for LB) #2088
Merged
Merged
Changes from 12 commits
Commits
Show all changes
15 commits
Select commit
Hold shift + click to select a range
c22f593
Adding priority levels to most-of-Envoy (not yet used for LB)
alyssawilk db0b016
using priorities in EDS
alyssawilk 187884d
Merge branch 'master' into priorities
alyssawilk 661d296
test/... passing
alyssawilk aa4c85e
fixing FIXMEs
alyssawilk 371c279
EDS unit tests and cleanup
alyssawilk 731fa47
halfway done with unit tests
alyssawilk 8ae3131
integration tests
alyssawilk ef1df21
Merge branch 'master' into priorities
alyssawilk 5b92074
reviewer comments
alyssawilk 757c2ae
Merge branch 'refs/heads/master' into priorities
alyssawilk 3fbbb3b
reviewer comments
alyssawilk afc3baa
GoogleCamelCase strikes again!
alyssawilk def4217
Merge branch 'refs/heads/master' into priorities
alyssawilk a744afb
priority -> uint32, LB stats sent with priority.
alyssawilk File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -119,19 +119,25 @@ class Host : virtual public HostDescription { | |
typedef std::shared_ptr<const Host> HostConstSharedPtr; | ||
|
||
/** | ||
* Base host set interface. This is used both for clusters, as well as per thread/worker host sets | ||
* used during routing/forwarding. | ||
* Base host set interface. This contains all of the endpoints for a given LocalityLbEndpoints | ||
* priority level. | ||
*/ | ||
class HostSet { | ||
public: | ||
typedef std::shared_ptr<const std::vector<HostSharedPtr>> HostVectorConstSharedPtr; | ||
typedef std::shared_ptr<const std::vector<std::vector<HostSharedPtr>>> HostListsConstSharedPtr; | ||
|
||
virtual ~HostSet() {} | ||
|
||
// TODO(alyssawilk) remove this once LBs use PrioritySet. | ||
// It is generally incorrect to subscribe to updates to individual HostSet | ||
// as one misses the addition of new HostSet to a PrioritySet. | ||
/** | ||
* Called when cluster host membership is about to change. | ||
* @param hosts_added supplies the newly added hosts, if any. | ||
* @param hosts_removed supplies the removed hosts, if any. | ||
*/ | ||
typedef std::function<void(const std::vector<HostSharedPtr>& hosts_added, | ||
typedef std::function<void(uint32_t priority, const std::vector<HostSharedPtr>& hosts_added, | ||
const std::vector<HostSharedPtr>& hosts_removed)> | ||
MemberUpdateCb; | ||
|
||
|
@@ -168,6 +174,64 @@ class HostSet { | |
* @return same as hostsPerLocality but only contains healthy hosts. | ||
*/ | ||
virtual const std::vector<std::vector<HostSharedPtr>>& healthyHostsPerLocality() const PURE; | ||
|
||
/** | ||
* Updates the hosts in a given host set. | ||
* | ||
* @param hosts supplies the (usually new) list of hosts in the host set. | ||
* @param healthy hosts supplies the subset of hosts which are healthy. | ||
* @param hosts_per_locality supplies the hosts subdivided by locality. | ||
* @param hosts_per_locality supplies the healthy hosts subdivided by locality. | ||
* @param hosts_added supplies the hosts added since the last update. | ||
* @param hosts_removed supplies the hosts removed since the last update. | ||
*/ | ||
virtual void updateHosts(HostVectorConstSharedPtr hosts, HostVectorConstSharedPtr healthy_hosts, | ||
HostListsConstSharedPtr hosts_per_locality, | ||
HostListsConstSharedPtr healthy_hosts_per_locality, | ||
const std::vector<HostSharedPtr>& hosts_added, | ||
const std::vector<HostSharedPtr>& hosts_removed) PURE; | ||
|
||
/** | ||
* @return uint32_t the priority of this host set. | ||
*/ | ||
virtual uint32_t priority() const PURE; | ||
}; | ||
|
||
typedef std::unique_ptr<HostSet> HostSetPtr; | ||
|
||
/** | ||
* This class contains all of the HostSets for a given cluster grouped by priority, for | ||
* ease of load balancing. | ||
*/ | ||
class PrioritySet { | ||
public: | ||
typedef std::function<void(uint32_t priority, const std::vector<HostSharedPtr>& hosts_added, | ||
const std::vector<HostSharedPtr>& hosts_removed)> | ||
MemberUpdateCb; | ||
|
||
virtual ~PrioritySet() {} | ||
|
||
/** | ||
* Install a callback that will be invoked when any of the HostSets in the PrioritySet changes. | ||
* This includes when a new HostSet is created. | ||
* | ||
* @param callback supplies the callback to invoke. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: document |
||
* @return Common::CallbackHandle* a handle which can be used to unregister the callback. | ||
*/ | ||
virtual Common::CallbackHandle* addMemberUpdateCb(MemberUpdateCb callback) const PURE; | ||
|
||
/** | ||
* Returns the host sets for this priority set, ordered by priority. | ||
* The first element in the vector is the host set for priority 0, and so on. | ||
* | ||
* @return std::vector<HostSetPtr>& the host sets for this priority set. | ||
*/ | ||
virtual std::vector<HostSetPtr>& hostSetsPerPriority() PURE; | ||
|
||
/** | ||
* @return const std::vector<HostSetPtr>& the host sets, ordered by priority. | ||
*/ | ||
virtual const std::vector<HostSetPtr>& hostSetsPerPriority() const PURE; | ||
}; | ||
|
||
/** | ||
|
@@ -386,8 +450,10 @@ class HealthChecker; | |
* An upstream cluster (group of hosts). This class is the "primary" singleton cluster used amongst | ||
* all forwarding threads/workers. Individual HostSets are used on the workers themselves. | ||
*/ | ||
class Cluster : public virtual HostSet { | ||
class Cluster { | ||
public: | ||
virtual ~Cluster() {} | ||
|
||
enum class InitializePhase { Primary, Secondary }; | ||
|
||
/** | ||
|
@@ -423,6 +489,16 @@ class Cluster : public virtual HostSet { | |
* that depends on resolution of the SDS server itself). | ||
*/ | ||
virtual InitializePhase initializePhase() const PURE; | ||
|
||
/** | ||
* @return the PrioritySet for the cluster. | ||
*/ | ||
virtual PrioritySet& prioritySet() PURE; | ||
|
||
/** | ||
* @return the const PrioritySet for the cluster. | ||
*/ | ||
virtual const PrioritySet& prioritySet() const PURE; | ||
}; | ||
|
||
typedef std::shared_ptr<Cluster> ClusterSharedPtr; | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -264,11 +264,13 @@ ClusterManagerStats ClusterManagerImpl::generateStats(Stats::Scope& scope) { | |
} | ||
|
||
void ClusterManagerImpl::postInitializeCluster(Cluster& cluster) { | ||
if (cluster.hosts().empty()) { | ||
return; | ||
for (auto& host_set : cluster.prioritySet().hostSetsPerPriority()) { | ||
if (host_set->hosts().empty()) { | ||
continue; | ||
} | ||
postThreadLocalClusterUpdate(cluster, host_set->priority(), host_set->hosts(), | ||
std::vector<HostSharedPtr>{}); | ||
} | ||
|
||
postThreadLocalClusterUpdate(cluster, cluster.hosts(), std::vector<HostSharedPtr>{}); | ||
} | ||
|
||
bool ClusterManagerImpl::addOrUpdatePrimaryCluster(const envoy::api::v2::Cluster& cluster) { | ||
|
@@ -343,12 +345,14 @@ void ClusterManagerImpl::loadCluster(const envoy::api::v2::Cluster& cluster, boo | |
} | ||
|
||
const Cluster& primary_cluster_reference = *new_cluster; | ||
new_cluster->addMemberUpdateCb( | ||
[&primary_cluster_reference, this](const std::vector<HostSharedPtr>& hosts_added, | ||
new_cluster->prioritySet().addMemberUpdateCb( | ||
[&primary_cluster_reference, this](uint32_t priority, | ||
const std::vector<HostSharedPtr>& hosts_added, | ||
const std::vector<HostSharedPtr>& hosts_removed) { | ||
// This fires when a cluster is about to have an updated member set. We need to send this | ||
// out to all of the thread local configurations. | ||
postThreadLocalClusterUpdate(primary_cluster_reference, hosts_added, hosts_removed); | ||
postThreadLocalClusterUpdate(primary_cluster_reference, priority, hosts_added, | ||
hosts_removed); | ||
}); | ||
|
||
if (new_cluster->healthChecker() != nullptr) { | ||
|
@@ -408,31 +412,34 @@ ClusterManagerImpl::httpConnPoolForCluster(const std::string& cluster, ResourceP | |
} | ||
|
||
void ClusterManagerImpl::postThreadLocalClusterUpdate( | ||
const Cluster& primary_cluster, const std::vector<HostSharedPtr>& hosts_added, | ||
const Cluster& primary_cluster, uint32_t priority, | ||
const std::vector<HostSharedPtr>& hosts_added, | ||
const std::vector<HostSharedPtr>& hosts_removed) { | ||
if (init_helper_.state() == ClusterManagerInitHelper::State::Loading) { | ||
// A cluster may try to post updates before we are ready for multi-threading. Block this case | ||
// since we will post the update in postInitializeCluster(). | ||
return; | ||
} | ||
|
||
HostVectorConstSharedPtr hosts_copy(new std::vector<HostSharedPtr>(primary_cluster.hosts())); | ||
const auto& host_set = primary_cluster.prioritySet().hostSetsPerPriority()[priority]; | ||
|
||
HostVectorConstSharedPtr hosts_copy(new std::vector<HostSharedPtr>(host_set->hosts())); | ||
HostVectorConstSharedPtr healthy_hosts_copy( | ||
new std::vector<HostSharedPtr>(primary_cluster.healthyHosts())); | ||
new std::vector<HostSharedPtr>(host_set->healthyHosts())); | ||
HostListsConstSharedPtr hosts_per_locality_copy( | ||
new std::vector<std::vector<HostSharedPtr>>(primary_cluster.hostsPerLocality())); | ||
new std::vector<std::vector<HostSharedPtr>>(host_set->hostsPerLocality())); | ||
HostListsConstSharedPtr healthy_hosts_per_locality_copy( | ||
new std::vector<std::vector<HostSharedPtr>>(primary_cluster.healthyHostsPerLocality())); | ||
new std::vector<std::vector<HostSharedPtr>>(host_set->healthyHostsPerLocality())); | ||
|
||
tls_->runOnAllThreads([ | ||
this, name = primary_cluster.info()->name(), hosts_copy, healthy_hosts_copy, | ||
this, name = primary_cluster.info()->name(), priority, hosts_copy, healthy_hosts_copy, | ||
hosts_per_locality_copy, healthy_hosts_per_locality_copy, hosts_added, hosts_removed | ||
]() | ||
->void { | ||
ThreadLocalClusterManagerImpl::updateClusterMembership( | ||
name, hosts_copy, healthy_hosts_copy, hosts_per_locality_copy, | ||
healthy_hosts_per_locality_copy, hosts_added, hosts_removed, | ||
*tls_); | ||
name, priority, hosts_copy, healthy_hosts_copy, | ||
hosts_per_locality_copy, healthy_hosts_per_locality_copy, | ||
hosts_added, hosts_removed, *tls_); | ||
}); | ||
} | ||
|
||
|
@@ -481,9 +488,9 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ThreadLocalClusterManagerImpl | |
new ClusterEntry(*this, local_cluster->info())); | ||
} | ||
|
||
local_host_set_ = local_cluster_name.valid() | ||
? &thread_local_clusters_[local_cluster_name.value()]->host_set_ | ||
: nullptr; | ||
local_priority_set_ = local_cluster_name.valid() | ||
? &thread_local_clusters_[local_cluster_name.value()]->priority_set_ | ||
: nullptr; | ||
|
||
for (auto& cluster : parent.primary_clusters_) { | ||
// If local cluster name is set then we already initialized this cluster. | ||
|
@@ -508,7 +515,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::~ThreadLocalClusterManagerImp | |
ENVOY_LOG(debug, "shutting down thread local cluster manager"); | ||
host_http_conn_pool_map_.clear(); | ||
for (auto& cluster : thread_local_clusters_) { | ||
if (&cluster.second->host_set_ != local_host_set_) { | ||
if (&cluster.second->priority_set_ != local_priority_set_) { | ||
cluster.second.reset(); | ||
} | ||
} | ||
|
@@ -560,15 +567,16 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainConnPools( | |
} | ||
|
||
void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership( | ||
const std::string& name, HostVectorConstSharedPtr hosts, HostVectorConstSharedPtr healthy_hosts, | ||
HostListsConstSharedPtr hosts_per_locality, HostListsConstSharedPtr healthy_hosts_per_locality, | ||
const std::string& name, uint32_t priority, HostVectorConstSharedPtr hosts, | ||
HostVectorConstSharedPtr healthy_hosts, HostListsConstSharedPtr hosts_per_locality, | ||
HostListsConstSharedPtr healthy_hosts_per_locality, | ||
const std::vector<HostSharedPtr>& hosts_added, const std::vector<HostSharedPtr>& hosts_removed, | ||
ThreadLocal::Slot& tls) { | ||
|
||
ThreadLocalClusterManagerImpl& config = tls.getTyped<ThreadLocalClusterManagerImpl>(); | ||
|
||
ASSERT(config.thread_local_clusters_.find(name) != config.thread_local_clusters_.end()); | ||
config.thread_local_clusters_[name]->host_set_.updateHosts( | ||
config.thread_local_clusters_[name]->priority_set_.getOrCreateHostSet(priority).updateHosts( | ||
std::move(hosts), std::move(healthy_hosts), std::move(hosts_per_locality), | ||
std::move(healthy_hosts_per_locality), hosts_added, hosts_removed); | ||
} | ||
|
@@ -602,43 +610,50 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::ClusterEntry( | |
parent.parent_.local_info_, parent.parent_, parent.parent_.runtime_, | ||
parent.parent_.random_, | ||
Router::ShadowWriterPtr{new Router::ShadowWriterImpl(parent.parent_)}) { | ||
|
||
// TODO(alyssawilk) make lb priority-set aware in a follow-up patch | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: period end of sentence. |
||
HostSet& host_set = priority_set_.getOrCreateHostSet(0); | ||
HostSet* local_host_set = nullptr; | ||
if (parent.local_priority_set_) { | ||
local_host_set = parent.local_priority_set_->hostSetsPerPriority()[0].get(); | ||
} | ||
|
||
if (cluster->lbSubsetInfo().isEnabled()) { | ||
lb_.reset(new SubsetLoadBalancer(cluster->lbType(), host_set_, parent.local_host_set_, | ||
cluster->stats(), parent.parent_.runtime_, | ||
parent.parent_.random_, cluster->lbSubsetInfo(), | ||
cluster->lbRingHashConfig())); | ||
lb_.reset(new SubsetLoadBalancer(cluster->lbType(), host_set, local_host_set, cluster->stats(), | ||
parent.parent_.runtime_, parent.parent_.random_, | ||
cluster->lbSubsetInfo(), cluster->lbRingHashConfig())); | ||
} else { | ||
switch (cluster->lbType()) { | ||
case LoadBalancerType::LeastRequest: { | ||
lb_.reset(new LeastRequestLoadBalancer(host_set_, parent.local_host_set_, cluster->stats(), | ||
lb_.reset(new LeastRequestLoadBalancer(host_set, local_host_set, cluster->stats(), | ||
parent.parent_.runtime_, parent.parent_.random_)); | ||
break; | ||
} | ||
case LoadBalancerType::Random: { | ||
lb_.reset(new RandomLoadBalancer(host_set_, parent.local_host_set_, cluster->stats(), | ||
lb_.reset(new RandomLoadBalancer(host_set, local_host_set, cluster->stats(), | ||
parent.parent_.runtime_, parent.parent_.random_)); | ||
break; | ||
} | ||
case LoadBalancerType::RoundRobin: { | ||
lb_.reset(new RoundRobinLoadBalancer(host_set_, parent.local_host_set_, cluster->stats(), | ||
lb_.reset(new RoundRobinLoadBalancer(host_set, local_host_set, cluster->stats(), | ||
parent.parent_.runtime_, parent.parent_.random_)); | ||
break; | ||
} | ||
case LoadBalancerType::RingHash: { | ||
lb_.reset(new RingHashLoadBalancer(host_set_, cluster->stats(), parent.parent_.runtime_, | ||
lb_.reset(new RingHashLoadBalancer(host_set, cluster->stats(), parent.parent_.runtime_, | ||
parent.parent_.random_, cluster->lbRingHashConfig())); | ||
break; | ||
} | ||
case LoadBalancerType::OriginalDst: { | ||
lb_.reset(new OriginalDstCluster::LoadBalancer( | ||
host_set_, parent.parent_.primary_clusters_.at(cluster->name()).cluster_)); | ||
host_set, parent.parent_.primary_clusters_.at(cluster->name()).cluster_)); | ||
break; | ||
} | ||
} | ||
} | ||
|
||
host_set_.addMemberUpdateCb([this](const std::vector<HostSharedPtr>&, | ||
const std::vector<HostSharedPtr>& hosts_removed) -> void { | ||
priority_set_.addMemberUpdateCb([this](uint32_t, const std::vector<HostSharedPtr>&, | ||
const std::vector<HostSharedPtr>& hosts_removed) -> void { | ||
// We need to go through and purge any connection pools for hosts that got deleted. | ||
// Even if two hosts actually point to the same address this will be safe, since if a | ||
// host is readded it will be a different physical HostSharedPtr. | ||
|
@@ -653,7 +668,9 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::~ClusterEntry() | |
// TODO(mattklein123): Optimally, we would just fire member changed callbacks and remove all of | ||
// the hosts inside of the HostImpl destructor. That is a change with wide implications, so we are | ||
// going with a more targeted approach for now. | ||
parent_.drainConnPools(host_set_.hosts()); | ||
for (auto& host_set : priority_set_.hostSetsPerPriority()) { | ||
parent_.drainConnPools(host_set->hosts()); | ||
} | ||
} | ||
|
||
Http::ConnectionPool::Instance* | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At a high level, without diving into implementation, it seems that it's redundant to track and supply both (healthy) hosts and (healthy) hosts per locality, one of these (presumably the latter) should be sufficient. I know this is an extant thing, just wanted to start a thread to discuss this with you and @mattklein123 .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, but as it turns out we track locality inconsistently (I think the rule is only if there are multiple localities and one is local and it's configured?) so it's quite non-trivial to combine the two.
I wonder if the better move might be to always just have the "locality style" group and if we don't care to pick by locality it's just in one big group. I know the healthy host is used for some things (mainly size for determining if it's a global panic, but size could be latched) but I can't remember if the full host list is ever used when splitting up by locality.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously this was hidden inside the implementation and is split out like this for perf reasons (basically compute once and then use everywhere). @alyssawilk do we need to expose this at the interface layer? I agree with @htuch at the interface level it's not great.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's that, or doing a bunch of casts, since the generic priority set impl is handing out containers of HostSets not HostSet Impls. And because they're complex types (it's super handy to iterate over the vector) I believe we can't just override.
If we prefer casts in upstream_impl.cc I'm happy to do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't thought through all the details but if you think this is the way to go it's fine with me. I can take a more complete look through this tomorrow if that would help. Sorry for the delay.