Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding priority levels to Envoy endpoints (not yet used for LB) #2088

Merged
merged 15 commits into from
Nov 30, 2017
Merged
4 changes: 2 additions & 2 deletions include/envoy/upstream/thread_local_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ class ThreadLocalCluster {
virtual ~ThreadLocalCluster() {}

/**
* @return const HostSet& the backing host set.
* @return const PrioritySet& the backing priority set.
*/
virtual const HostSet& hostSet() PURE;
virtual const PrioritySet& prioritySet() PURE;

/**
* @return ClusterInfoConstSharedPtr the info for this cluster. The info is safe to store beyond
Expand Down
89 changes: 85 additions & 4 deletions include/envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,25 @@ class Host : virtual public HostDescription {
typedef std::shared_ptr<const Host> HostConstSharedPtr;

/**
* Base host set interface. This is used both for clusters, as well as per thread/worker host sets
* used during routing/forwarding.
* Base host set interface. This contains all of the endpoints for a given LocalityLbEndpoints
* priority level.
*/
class HostSet {
public:
typedef std::shared_ptr<const std::vector<HostSharedPtr>> HostVectorConstSharedPtr;
typedef std::shared_ptr<const std::vector<std::vector<HostSharedPtr>>> HostListsConstSharedPtr;

virtual ~HostSet() {}

// TODO(alyssawilk) remove this once LBs use PrioritySet.
// It is generally incorrect to subscribe to updates to individual HostSet
// as one misses the addition of new HostSet to a PrioritySet.
/**
* Called when cluster host membership is about to change.
* @param hosts_added supplies the newly added hosts, if any.
* @param hosts_removed supplies the removed hosts, if any.
*/
typedef std::function<void(const std::vector<HostSharedPtr>& hosts_added,
typedef std::function<void(uint32_t priority, const std::vector<HostSharedPtr>& hosts_added,
const std::vector<HostSharedPtr>& hosts_removed)>
MemberUpdateCb;

Expand Down Expand Up @@ -168,6 +174,69 @@ class HostSet {
* @return same as hostsPerLocality but only contains healthy hosts.
*/
virtual const std::vector<std::vector<HostSharedPtr>>& healthyHostsPerLocality() const PURE;

/**
* Updates the hosts in a given host set.
*
* @param hosts supplies the (usually new) list of hosts in the host set.
* @param healthy hosts supplies the subset of hosts which are healthy.
* @param hosts_per_locality supplies the hosts subdivided by locality.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At a high level, without diving into implementation, it seems that it's redundant to track and supply both (healthy) hosts and (healthy) hosts per locality, one of these (presumably the latter) should be sufficient. I know this is an extant thing, just wanted to start a thread to discuss this with you and @mattklein123 .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, but as it turns out we track locality inconsistently (I think the rule is only if there are multiple localities and one is local and it's configured?) so it's quite non-trivial to combine the two.

I wonder if the better move might be to always just have the "locality style" group and if we don't care to pick by locality it's just in one big group. I know the healthy host is used for some things (mainly size for determining if it's a global panic, but size could be latched) but I can't remember if the full host list is ever used when splitting up by locality.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously this was hidden inside the implementation and is split out like this for perf reasons (basically compute once and then use everywhere). @alyssawilk do we need to expose this at the interface layer? I agree with @htuch at the interface level it's not great.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's that, or doing a bunch of casts, since the generic priority set impl is handing out containers of HostSets not HostSet Impls. And because they're complex types (it's super handy to iterate over the vector) I believe we can't just override.

If we prefer casts in upstream_impl.cc I'm happy to do that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't thought through all the details but if you think this is the way to go it's fine with me. I can take a more complete look through this tomorrow if that would help. Sorry for the delay.

* @param hosts_per_locality supplies the healthy hosts subdivided by locality.
* @param hosts_added supplies the hosts added since the last update.
* @param hosts_removed supplies the hosts removed since the last update.
*/
virtual void updateHosts(HostVectorConstSharedPtr hosts, HostVectorConstSharedPtr healthy_hosts,
HostListsConstSharedPtr hosts_per_locality,
HostListsConstSharedPtr healthy_hosts_per_locality,
const std::vector<HostSharedPtr>& hosts_added,
const std::vector<HostSharedPtr>& hosts_removed) PURE;

/**
* @return uint32_t the priority of this host set.
*/
virtual uint32_t priority() const PURE;
};

typedef std::unique_ptr<HostSet> HostSetPtr;

/**
* This class contains all of the HostSets for a given cluster grouped by priority, for
* ease of load balancing.
*/
class PrioritySet {
public:
typedef std::function<void(uint32_t priority, const std::vector<HostSharedPtr>& hosts_added,
const std::vector<HostSharedPtr>& hosts_removed)>
MemberUpdateCb;

virtual ~PrioritySet() {}

/**
* Install a callback that will be invoked when any of the HostSets in the PrioritySet changes.
* This includes when a new HostSet is created.
*
* @param callback supplies the callback to invoke.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: document @return

* @return Common::CallbackHandle* a handle which can be used to unregister the callback.
*/
virtual Common::CallbackHandle* addMemberUpdateCb(MemberUpdateCb callback) const PURE;

/**
* Returns the host sets for this priority set, ordered by priority.
* The first element in the vector is the host set for priority 0, and so on.
*
* @return std::vector<HostSetPtr>& the host sets for this priority set.
*/
virtual std::vector<HostSetPtr>& hostSetsPerPriority() PURE;

/**
* @return const std::vector<HostSetPtr>& the host sets, ordered by priority.
*/
virtual const std::vector<HostSetPtr>& hostSetsPerPriority() const PURE;

/**
* @return HostSet& the host set for the priority. If no such host set exists, it will be created.
*/
virtual HostSet& getHostSet(uint32_t priority) PURE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we ever remove hosts sets at a given priority?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. I thought that was more of an implementation detail though so that's commented upstream_impl.h

};

/**
Expand Down Expand Up @@ -386,8 +455,10 @@ class HealthChecker;
* An upstream cluster (group of hosts). This class is the "primary" singleton cluster used amongst
* all forwarding threads/workers. Individual HostSets are used on the workers themselves.
*/
class Cluster : public virtual HostSet {
class Cluster {
public:
virtual ~Cluster() {}

enum class InitializePhase { Primary, Secondary };

/**
Expand Down Expand Up @@ -423,6 +494,16 @@ class Cluster : public virtual HostSet {
* that depends on resolution of the SDS server itself).
*/
virtual InitializePhase initializePhase() const PURE;

/**
* @return the PrioritySet for the cluster.
*/
virtual PrioritySet& prioritySet() PURE;

/**
* @return the const PrioritySet for the cluster.
*/
virtual const PrioritySet& prioritySet() const PURE;
};

typedef std::shared_ptr<Cluster> ClusterSharedPtr;
Expand Down
4 changes: 2 additions & 2 deletions source/common/redis/conn_pool_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ InstanceImpl::ThreadLocalPool::ThreadLocalPool(InstanceImpl& parent, Event::Disp
// we will need to add thread local cluster removal callbacks so that we can
// safely clean things up and fail requests.
ASSERT(!cluster_->info()->addedViaApi());
local_host_set_member_update_cb_handle_ = cluster_->hostSet().addMemberUpdateCb(
[this](const std::vector<Upstream::HostSharedPtr>&,
local_host_set_member_update_cb_handle_ = cluster_->prioritySet().addMemberUpdateCb(
[this](uint32_t, const std::vector<Upstream::HostSharedPtr>&,
const std::vector<Upstream::HostSharedPtr>& hosts_removed) -> void {
onHostsRemoved(hosts_removed);
});
Expand Down
87 changes: 52 additions & 35 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,13 @@ ClusterManagerStats ClusterManagerImpl::generateStats(Stats::Scope& scope) {
}

void ClusterManagerImpl::postInitializeCluster(Cluster& cluster) {
if (cluster.hosts().empty()) {
return;
for (size_t i = 0; i < cluster.prioritySet().hostSetsPerPriority().size(); ++i) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Any reason to not just iterate directly through the returned vector here? Why do we need to get the size and then call get each time? (Same question if this happens elsewhere)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, now that host set latches its own priority I can rewrite these.

auto& host_set = cluster.prioritySet().getHostSet(i);
if (host_set.hosts().empty()) {
continue;
}
postThreadLocalClusterUpdate(cluster, i, host_set.hosts(), std::vector<HostSharedPtr>{});
}

postThreadLocalClusterUpdate(cluster, cluster.hosts(), std::vector<HostSharedPtr>{});
}

bool ClusterManagerImpl::addOrUpdatePrimaryCluster(const envoy::api::v2::Cluster& cluster) {
Expand Down Expand Up @@ -343,12 +345,14 @@ void ClusterManagerImpl::loadCluster(const envoy::api::v2::Cluster& cluster, boo
}

const Cluster& primary_cluster_reference = *new_cluster;
new_cluster->addMemberUpdateCb(
[&primary_cluster_reference, this](const std::vector<HostSharedPtr>& hosts_added,
new_cluster->prioritySet().addMemberUpdateCb(
[&primary_cluster_reference, this](uint32_t priority,
const std::vector<HostSharedPtr>& hosts_added,
const std::vector<HostSharedPtr>& hosts_removed) {
// This fires when a cluster is about to have an updated member set. We need to send this
// out to all of the thread local configurations.
postThreadLocalClusterUpdate(primary_cluster_reference, hosts_added, hosts_removed);
postThreadLocalClusterUpdate(primary_cluster_reference, priority, hosts_added,
hosts_removed);
});

if (new_cluster->healthChecker() != nullptr) {
Expand Down Expand Up @@ -408,31 +412,34 @@ ClusterManagerImpl::httpConnPoolForCluster(const std::string& cluster, ResourceP
}

void ClusterManagerImpl::postThreadLocalClusterUpdate(
const Cluster& primary_cluster, const std::vector<HostSharedPtr>& hosts_added,
const Cluster& primary_cluster, uint32_t priority,
const std::vector<HostSharedPtr>& hosts_added,
const std::vector<HostSharedPtr>& hosts_removed) {
if (init_helper_.state() == ClusterManagerInitHelper::State::Loading) {
// A cluster may try to post updates before we are ready for multi-threading. Block this case
// since we will post the update in postInitializeCluster().
return;
}

HostVectorConstSharedPtr hosts_copy(new std::vector<HostSharedPtr>(primary_cluster.hosts()));
const auto& host_set = primary_cluster.prioritySet().hostSetsPerPriority()[priority];

HostVectorConstSharedPtr hosts_copy(new std::vector<HostSharedPtr>(host_set->hosts()));
HostVectorConstSharedPtr healthy_hosts_copy(
new std::vector<HostSharedPtr>(primary_cluster.healthyHosts()));
new std::vector<HostSharedPtr>(host_set->healthyHosts()));
HostListsConstSharedPtr hosts_per_locality_copy(
new std::vector<std::vector<HostSharedPtr>>(primary_cluster.hostsPerLocality()));
new std::vector<std::vector<HostSharedPtr>>(host_set->hostsPerLocality()));
HostListsConstSharedPtr healthy_hosts_per_locality_copy(
new std::vector<std::vector<HostSharedPtr>>(primary_cluster.healthyHostsPerLocality()));
new std::vector<std::vector<HostSharedPtr>>(host_set->healthyHostsPerLocality()));

tls_->runOnAllThreads([
this, name = primary_cluster.info()->name(), hosts_copy, healthy_hosts_copy,
this, name = primary_cluster.info()->name(), priority, hosts_copy, healthy_hosts_copy,
hosts_per_locality_copy, healthy_hosts_per_locality_copy, hosts_added, hosts_removed
]()
->void {
ThreadLocalClusterManagerImpl::updateClusterMembership(
name, hosts_copy, healthy_hosts_copy, hosts_per_locality_copy,
healthy_hosts_per_locality_copy, hosts_added, hosts_removed,
*tls_);
name, priority, hosts_copy, healthy_hosts_copy,
hosts_per_locality_copy, healthy_hosts_per_locality_copy,
hosts_added, hosts_removed, *tls_);
});
}

Expand Down Expand Up @@ -481,9 +488,9 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ThreadLocalClusterManagerImpl
new ClusterEntry(*this, local_cluster->info()));
}

local_host_set_ = local_cluster_name.valid()
? &thread_local_clusters_[local_cluster_name.value()]->host_set_
: nullptr;
local_priority_set_ = local_cluster_name.valid()
? &thread_local_clusters_[local_cluster_name.value()]->priority_set_
: nullptr;

for (auto& cluster : parent.primary_clusters_) {
// If local cluster name is set then we already initialized this cluster.
Expand All @@ -508,7 +515,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::~ThreadLocalClusterManagerImp
ENVOY_LOG(debug, "shutting down thread local cluster manager");
host_http_conn_pool_map_.clear();
for (auto& cluster : thread_local_clusters_) {
if (&cluster.second->host_set_ != local_host_set_) {
if (&cluster.second->priority_set_ != local_priority_set_) {
cluster.second.reset();
}
}
Expand Down Expand Up @@ -560,15 +567,16 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainConnPools(
}

void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership(
const std::string& name, HostVectorConstSharedPtr hosts, HostVectorConstSharedPtr healthy_hosts,
HostListsConstSharedPtr hosts_per_locality, HostListsConstSharedPtr healthy_hosts_per_locality,
const std::string& name, uint32_t priority, HostVectorConstSharedPtr hosts,
HostVectorConstSharedPtr healthy_hosts, HostListsConstSharedPtr hosts_per_locality,
HostListsConstSharedPtr healthy_hosts_per_locality,
const std::vector<HostSharedPtr>& hosts_added, const std::vector<HostSharedPtr>& hosts_removed,
ThreadLocal::Slot& tls) {

ThreadLocalClusterManagerImpl& config = tls.getTyped<ThreadLocalClusterManagerImpl>();

ASSERT(config.thread_local_clusters_.find(name) != config.thread_local_clusters_.end());
config.thread_local_clusters_[name]->host_set_.updateHosts(
config.thread_local_clusters_[name]->priority_set_.getHostSet(priority).updateHosts(
std::move(hosts), std::move(healthy_hosts), std::move(hosts_per_locality),
std::move(healthy_hosts_per_locality), hosts_added, hosts_removed);
}
Expand Down Expand Up @@ -602,43 +610,50 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::ClusterEntry(
parent.parent_.local_info_, parent.parent_, parent.parent_.runtime_,
parent.parent_.random_,
Router::ShadowWriterPtr{new Router::ShadowWriterImpl(parent.parent_)}) {

// TODO(alyssawilk) make lb priority-set aware in a follow-up patch
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: period end of sentence.

HostSet& host_set = priority_set_.getHostSet(0);
HostSet* local_host_set = nullptr;
if (parent.local_priority_set_) {
local_host_set = parent.local_priority_set_->hostSetsPerPriority()[0].get();
}

if (cluster->lbSubsetInfo().isEnabled()) {
lb_.reset(new SubsetLoadBalancer(cluster->lbType(), host_set_, parent.local_host_set_,
cluster->stats(), parent.parent_.runtime_,
parent.parent_.random_, cluster->lbSubsetInfo(),
cluster->lbRingHashConfig()));
lb_.reset(new SubsetLoadBalancer(cluster->lbType(), host_set, local_host_set, cluster->stats(),
parent.parent_.runtime_, parent.parent_.random_,
cluster->lbSubsetInfo(), cluster->lbRingHashConfig()));
} else {
switch (cluster->lbType()) {
case LoadBalancerType::LeastRequest: {
lb_.reset(new LeastRequestLoadBalancer(host_set_, parent.local_host_set_, cluster->stats(),
lb_.reset(new LeastRequestLoadBalancer(host_set, local_host_set, cluster->stats(),
parent.parent_.runtime_, parent.parent_.random_));
break;
}
case LoadBalancerType::Random: {
lb_.reset(new RandomLoadBalancer(host_set_, parent.local_host_set_, cluster->stats(),
lb_.reset(new RandomLoadBalancer(host_set, local_host_set, cluster->stats(),
parent.parent_.runtime_, parent.parent_.random_));
break;
}
case LoadBalancerType::RoundRobin: {
lb_.reset(new RoundRobinLoadBalancer(host_set_, parent.local_host_set_, cluster->stats(),
lb_.reset(new RoundRobinLoadBalancer(host_set, local_host_set, cluster->stats(),
parent.parent_.runtime_, parent.parent_.random_));
break;
}
case LoadBalancerType::RingHash: {
lb_.reset(new RingHashLoadBalancer(host_set_, cluster->stats(), parent.parent_.runtime_,
lb_.reset(new RingHashLoadBalancer(host_set, cluster->stats(), parent.parent_.runtime_,
parent.parent_.random_, cluster->lbRingHashConfig()));
break;
}
case LoadBalancerType::OriginalDst: {
lb_.reset(new OriginalDstCluster::LoadBalancer(
host_set_, parent.parent_.primary_clusters_.at(cluster->name()).cluster_));
host_set, parent.parent_.primary_clusters_.at(cluster->name()).cluster_));
break;
}
}
}

host_set_.addMemberUpdateCb([this](const std::vector<HostSharedPtr>&,
const std::vector<HostSharedPtr>& hosts_removed) -> void {
priority_set_.addMemberUpdateCb([this](uint32_t, const std::vector<HostSharedPtr>&,
const std::vector<HostSharedPtr>& hosts_removed) -> void {
// We need to go through and purge any connection pools for hosts that got deleted.
// Even if two hosts actually point to the same address this will be safe, since if a
// host is readded it will be a different physical HostSharedPtr.
Expand All @@ -653,7 +668,9 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::~ClusterEntry()
// TODO(mattklein123): Optimally, we would just fire member changed callbacks and remove all of
// the hosts inside of the HostImpl destructor. That is a change with wide implications, so we are
// going with a more targeted approach for now.
parent_.drainConnPools(host_set_.hosts());
for (auto& host_set : priority_set_.hostSetsPerPriority()) {
parent_.drainConnPools(host_set->hosts());
}
}

Http::ConnectionPool::Instance*
Expand Down
11 changes: 6 additions & 5 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,12 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
LoadBalancerContext* context);

// Upstream::ThreadLocalCluster
const HostSet& hostSet() override { return host_set_; }
const PrioritySet& prioritySet() override { return priority_set_; }
ClusterInfoConstSharedPtr info() override { return cluster_info_; }
LoadBalancer& loadBalancer() override { return *lb_; }

ThreadLocalClusterManagerImpl& parent_;
HostSetImpl host_set_;
PrioritySetImpl priority_set_;
LoadBalancerPtr lb_;
ClusterInfoConstSharedPtr cluster_info_;
Http::AsyncClientImpl http_async_client_;
Expand All @@ -214,7 +214,8 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
~ThreadLocalClusterManagerImpl();
void drainConnPools(const std::vector<HostSharedPtr>& hosts);
void drainConnPools(HostSharedPtr old_host, ConnPoolsContainer& container);
static void updateClusterMembership(const std::string& name, HostVectorConstSharedPtr hosts,
static void updateClusterMembership(const std::string& name, uint32_t priority,
HostVectorConstSharedPtr hosts,
HostVectorConstSharedPtr healthy_hosts,
HostListsConstSharedPtr hosts_per_locality,
HostListsConstSharedPtr healthy_hosts_per_locality,
Expand All @@ -227,7 +228,7 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
Event::Dispatcher& thread_local_dispatcher_;
std::unordered_map<std::string, ClusterEntryPtr> thread_local_clusters_;
std::unordered_map<HostConstSharedPtr, ConnPoolsContainer> host_http_conn_pool_map_;
const HostSet* local_host_set_{};
const PrioritySet* local_priority_set_{};
};

struct PrimaryClusterData {
Expand All @@ -242,7 +243,7 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
static ClusterManagerStats generateStats(Stats::Scope& scope);
void loadCluster(const envoy::api::v2::Cluster& cluster, bool added_via_api);
void postInitializeCluster(Cluster& cluster);
void postThreadLocalClusterUpdate(const Cluster& primary_cluster,
void postThreadLocalClusterUpdate(const Cluster& cluster, uint32_t priority,
const std::vector<HostSharedPtr>& hosts_added,
const std::vector<HostSharedPtr>& hosts_removed);
void postThreadLocalHealthFailure(const HostSharedPtr& host);
Expand Down
Loading