Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cluster manager: implement cluster warming #2774

Merged
merged 5 commits into from
Mar 14, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions include/envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ class ClusterManager {

/**
* Add or update a cluster via API. The semantics of this API are:
* 1) The hash of the config is used to determine if an already existing cluser has changed.
* 1) The hash of the config is used to determine if an already existing cluster has changed.
* Nothing is done if the hash matches the previously running configuration.
* 2) Statically defined clusters (those present when Envoy starts) can not be updated via API.
*
* @return true if the action results in an add/update of a cluster.
*/
virtual bool addOrUpdatePrimaryCluster(const envoy::api::v2::Cluster& cluster) PURE;
virtual bool addOrUpdateCluster(const envoy::api::v2::Cluster& cluster) PURE;

/**
* Set a callback that will be invoked when all owned clusters have been initialized.
Expand Down Expand Up @@ -97,13 +97,13 @@ class ClusterManager {
virtual Http::AsyncClient& httpAsyncClientForCluster(const std::string& cluster) PURE;

/**
* Remove a primary cluster via API. Only clusters added via addOrUpdatePrimaryCluster() can
* Remove a cluster via API. Only clusters added via addOrUpdateCluster() can
* be removed in this manner. Statically defined clusters present when Envoy starts cannot be
* removed.
*
* @return true if the action results in the removal of a cluster.
*/
virtual bool removePrimaryCluster(const std::string& cluster) PURE;
virtual bool removeCluster(const std::string& cluster) PURE;

/**
* Shutdown the cluster manager prior to destroying connection pools and other thread local data.
Expand Down
4 changes: 2 additions & 2 deletions source/common/upstream/cds_api_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ void CdsApiImpl::onConfigUpdate(const ResourceVector& resources) {
for (auto& cluster : resources) {
const std::string cluster_name = cluster.name();
clusters_to_remove.erase(cluster_name);
if (cm_.addOrUpdatePrimaryCluster(cluster)) {
if (cm_.addOrUpdateCluster(cluster)) {
ENVOY_LOG(debug, "cds: add/update cluster '{}'", cluster_name);
}
}

for (auto cluster : clusters_to_remove) {
const std::string cluster_name = cluster.first;
if (cm_.removePrimaryCluster(cluster_name)) {
if (cm_.removeCluster(cluster_name)) {
ENVOY_LOG(debug, "cds: remove cluster '{}'", cluster_name);
}
}
Expand Down
228 changes: 140 additions & 88 deletions source/common/upstream/cluster_manager_impl.cc

Large diffs are not rendered by default.

43 changes: 27 additions & 16 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ class ProdClusterManagerFactory : public ClusterManagerFactory {
ThreadLocal::Instance& tls, Runtime::RandomGenerator& random,
Network::DnsResolverSharedPtr dns_resolver,
Ssl::ContextManager& ssl_context_manager,
Event::Dispatcher& primary_dispatcher,
Event::Dispatcher& main_thread_dispatcher,
const LocalInfo::LocalInfo& local_info)
: primary_dispatcher_(primary_dispatcher), runtime_(runtime), stats_(stats), tls_(tls),
random_(random), dns_resolver_(dns_resolver), ssl_context_manager_(ssl_context_manager),
local_info_(local_info) {}
: main_thread_dispatcher_(main_thread_dispatcher), runtime_(runtime), stats_(stats),
tls_(tls), random_(random), dns_resolver_(dns_resolver),
ssl_context_manager_(ssl_context_manager), local_info_(local_info) {}

// Upstream::ClusterManagerFactory
ClusterManagerPtr
Expand All @@ -58,7 +58,7 @@ class ProdClusterManagerFactory : public ClusterManagerFactory {
ClusterManager& cm) override;

protected:
Event::Dispatcher& primary_dispatcher_;
Event::Dispatcher& main_thread_dispatcher_;

private:
Runtime::Loader& runtime_;
Expand Down Expand Up @@ -128,7 +128,8 @@ class ClusterManagerInitHelper : Logger::Loggable<Logger::Id::upstream> {
COUNTER(cluster_added) \
COUNTER(cluster_modified) \
COUNTER(cluster_removed) \
GAUGE (total_clusters)
GAUGE (active_clusters) \
GAUGE (warming_clusters)
// clang-format on

/**
Expand All @@ -149,17 +150,18 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
ThreadLocal::Instance& tls, Runtime::Loader& runtime,
Runtime::RandomGenerator& random, const LocalInfo::LocalInfo& local_info,
AccessLog::AccessLogManager& log_manager,
Event::Dispatcher& primary_dispatcher);
Event::Dispatcher& main_thread_dispatcher);

// Upstream::ClusterManager
bool addOrUpdatePrimaryCluster(const envoy::api::v2::Cluster& cluster) override;
bool addOrUpdateCluster(const envoy::api::v2::Cluster& cluster) override;
void setInitializedCb(std::function<void()> callback) override {
init_helper_.setInitializedCb(callback);
}
ClusterInfoMap clusters() override {
// TODO(mattklein123): Add ability to see warming clusters in admin output.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(adding reference to #2172 for tracking)

ClusterInfoMap clusters_map;
for (auto& cluster : primary_clusters_) {
clusters_map.emplace(cluster.first, *cluster.second.cluster_);
for (auto& cluster : active_clusters_) {
clusters_map.emplace(cluster.first, *cluster.second->cluster_);
}

return clusters_map;
Expand All @@ -172,11 +174,11 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
Host::CreateConnectionData tcpConnForCluster(const std::string& cluster,
LoadBalancerContext* context) override;
Http::AsyncClient& httpAsyncClientForCluster(const std::string& cluster) override;
bool removePrimaryCluster(const std::string& cluster) override;
bool removeCluster(const std::string& cluster) override;
void shutdown() override {
cds_api_.reset();
ads_mux_.reset();
primary_clusters_.clear();
active_clusters_.clear();
}

const Network::Address::InstanceConstSharedPtr& sourceAddress() const override {
Expand Down Expand Up @@ -261,10 +263,12 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
const PrioritySet* local_priority_set_{};
};

struct PrimaryClusterData {
PrimaryClusterData(uint64_t config_hash, bool added_via_api, ClusterSharedPtr&& cluster)
struct ClusterData {
ClusterData(uint64_t config_hash, bool added_via_api, ClusterSharedPtr&& cluster)
: config_hash_(config_hash), added_via_api_(added_via_api), cluster_(std::move(cluster)) {}

bool blockUpdate(uint64_t hash) { return !added_via_api_ || config_hash_ == hash; }

LoadBalancerFactorySharedPtr loadBalancerFactory() {
if (thread_aware_lb_ != nullptr) {
return thread_aware_lb_->factory();
Expand All @@ -280,19 +284,26 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
ThreadAwareLoadBalancerPtr thread_aware_lb_;
};

typedef std::unique_ptr<ClusterData> ClusterDataPtr;
typedef std::unordered_map<std::string, ClusterDataPtr> ClusterMap;

void createOrUpdateThreadLocalCluster(ClusterData& cluster);
static ClusterManagerStats generateStats(Stats::Scope& scope);
void loadCluster(const envoy::api::v2::Cluster& cluster, bool added_via_api);
void loadCluster(const envoy::api::v2::Cluster& cluster, bool added_via_api,
ClusterMap& cluster_map);
void onClusterInit(Cluster& cluster);
void postThreadLocalClusterUpdate(const Cluster& cluster, uint32_t priority,
const HostVector& hosts_added, const HostVector& hosts_removed);
void postThreadLocalHealthFailure(const HostSharedPtr& host);
void updateGauges();

ClusterManagerFactory& factory_;
Runtime::Loader& runtime_;
Stats::Store& stats_;
ThreadLocal::SlotPtr tls_;
Runtime::RandomGenerator& random_;
std::unordered_map<std::string, PrimaryClusterData> primary_clusters_;
ClusterMap active_clusters_;
ClusterMap warming_clusters_;
Optional<envoy::api::v2::core::ConfigSource> eds_config_;
Network::Address::InstanceConstSharedPtr source_address_;
Outlier::EventLoggerSharedPtr outlier_event_logger_;
Expand Down
13 changes: 7 additions & 6 deletions source/server/config_validation/cluster_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@ namespace Upstream {
ValidationClusterManagerFactory::ValidationClusterManagerFactory(
Runtime::Loader& runtime, Stats::Store& stats, ThreadLocal::Instance& tls,
Runtime::RandomGenerator& random, Network::DnsResolverSharedPtr dns_resolver,
Ssl::ContextManager& ssl_context_manager, Event::Dispatcher& primary_dispatcher,
Ssl::ContextManager& ssl_context_manager, Event::Dispatcher& main_thread_dispatcher,
const LocalInfo::LocalInfo& local_info)
: ProdClusterManagerFactory(runtime, stats, tls, random, dns_resolver, ssl_context_manager,
primary_dispatcher, local_info) {}
main_thread_dispatcher, local_info) {}

ClusterManagerPtr ValidationClusterManagerFactory::clusterManagerFromProto(
const envoy::config::bootstrap::v2::Bootstrap& bootstrap, Stats::Store& stats,
ThreadLocal::Instance& tls, Runtime::Loader& runtime, Runtime::RandomGenerator& random,
const LocalInfo::LocalInfo& local_info, AccessLog::AccessLogManager& log_manager) {
return ClusterManagerPtr{new ValidationClusterManager(
bootstrap, *this, stats, tls, runtime, random, local_info, log_manager, primary_dispatcher_)};
return ClusterManagerPtr{new ValidationClusterManager(bootstrap, *this, stats, tls, runtime,
random, local_info, log_manager,
main_thread_dispatcher_)};
}

CdsApiPtr ValidationClusterManagerFactory::createCds(
Expand All @@ -32,9 +33,9 @@ ValidationClusterManager::ValidationClusterManager(
const envoy::config::bootstrap::v2::Bootstrap& bootstrap, ClusterManagerFactory& factory,
Stats::Store& stats, ThreadLocal::Instance& tls, Runtime::Loader& runtime,
Runtime::RandomGenerator& random, const LocalInfo::LocalInfo& local_info,
AccessLog::AccessLogManager& log_manager, Event::Dispatcher& primary_dispatcher)
AccessLog::AccessLogManager& log_manager, Event::Dispatcher& main_thread_dispatcher)
: ClusterManagerImpl(bootstrap, factory, stats, tls, runtime, random, local_info, log_manager,
primary_dispatcher) {}
main_thread_dispatcher) {}

Http::ConnectionPool::Instance*
ValidationClusterManager::httpConnPoolForCluster(const std::string&, ResourcePriority,
Expand Down
2 changes: 1 addition & 1 deletion source/server/config_validation/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class ValidationClusterManagerFactory : public ProdClusterManagerFactory {
ThreadLocal::Instance& tls, Runtime::RandomGenerator& random,
Network::DnsResolverSharedPtr dns_resolver,
Ssl::ContextManager& ssl_context_manager,
Event::Dispatcher& primary_dispatcher,
Event::Dispatcher& main_thread_dispatcher,
const LocalInfo::LocalInfo& local_info);

ClusterManagerPtr
Expand Down
4 changes: 2 additions & 2 deletions test/common/upstream/cds_api_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class CdsApiImplTest : public testing::Test {
}

void expectAdd(const std::string& cluster_name) {
EXPECT_CALL(cm_, addOrUpdatePrimaryCluster(_))
EXPECT_CALL(cm_, addOrUpdateCluster(_))
.WillOnce(Invoke([cluster_name](const envoy::api::v2::Cluster& cluster) -> bool {
EXPECT_EQ(cluster_name, cluster.name());
return true;
Expand Down Expand Up @@ -181,7 +181,7 @@ TEST_F(CdsApiImplTest, Basic) {
EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterMap({"cluster1", "cluster2"})));
expectAdd("cluster1");
expectAdd("cluster3");
EXPECT_CALL(cm_, removePrimaryCluster("cluster2"));
EXPECT_CALL(cm_, removeCluster("cluster2"));
EXPECT_CALL(*interval_timer_, enableTimer(_));
callbacks_->onSuccess(std::move(message));

Expand Down
Loading