diff --git a/docs/configuration/overview/overview.rst b/docs/configuration/overview/overview.rst index 57fd3fa9d3bd..523a92af2736 100644 --- a/docs/configuration/overview/overview.rst +++ b/docs/configuration/overview/overview.rst @@ -16,6 +16,7 @@ miscellaneous configuration. "flags_path": "...", "statsd_local_udp_port": "...", "statsd_tcp_cluster_name": "...", + "stats_flush_interval_ms": "...", "tracing": "{...}", "rate_limit_service": "{...}", "runtime": "{...}", @@ -48,6 +49,11 @@ statsd_tcp_cluster_name listener. If specified, Envoy will connect to this cluster to flush :ref:`statistics `. +stats_flush_interval_ms + *(optional, integer)* The time in milliseconds between flushes to configured stats sinks. For + performance reasons Envoy latches counters and only flushes counters and gauges at a periodic + interval. If not specified the default is 5000ms (5 seconds). + :ref:`tracing ` *(optional, object)* Configuration for an external :ref:`tracing ` provider. If not specified, no tracing will be performed. diff --git a/include/envoy/server/configuration.h b/include/envoy/server/configuration.h index cbc1d95606e8..654969fc618e 100644 --- a/include/envoy/server/configuration.h +++ b/include/envoy/server/configuration.h @@ -102,6 +102,12 @@ class Main { * @return Optional the optional local UDP statsd port to write to. */ virtual Optional statsdUdpPort() PURE; + + /** + * @return the time interval between flushing to configured stat sinks. The server latches + * counters. + */ + virtual std::chrono::milliseconds statsFlushInterval() PURE; }; /** diff --git a/source/server/configuration_impl.cc b/source/server/configuration_impl.cc index 39a7520be897..1b2efd0828cd 100644 --- a/source/server/configuration_impl.cc +++ b/source/server/configuration_impl.cc @@ -25,15 +25,13 @@ void FilterChainUtility::buildFilterChain(Network::FilterManager& filter_manager MainImpl::MainImpl(Server::Instance& server) : server_(server) {} -void MainImpl::initialize(const std::string& file_path) { - Json::ObjectPtr loader = Json::Factory::LoadFromFile(file_path); - +void MainImpl::initialize(const Json::Object& json) { cluster_manager_.reset(new Upstream::ProdClusterManagerImpl( - *loader->getObject("cluster_manager"), server_.stats(), server_.threadLocal(), + *json.getObject("cluster_manager"), server_.stats(), server_.threadLocal(), server_.dnsResolver(), server_.sslContextManager(), server_.runtime(), server_.random(), server_.options().serviceZone(), server_.getLocalAddress())); - std::vector listeners = loader->getObjectArray("listeners"); + std::vector listeners = json.getObjectArray("listeners"); log().info("loading {} listener(s)", listeners.size()); for (size_t i = 0; i < listeners.size(); i++) { log().info("listener #{}:", i); @@ -41,22 +39,25 @@ void MainImpl::initialize(const std::string& file_path) { Server::Configuration::ListenerPtr{new ListenerConfig(*this, *listeners[i])}); } - if (loader->hasObject("statsd_local_udp_port")) { - statsd_udp_port_.value(loader->getInteger("statsd_local_udp_port")); + if (json.hasObject("statsd_local_udp_port")) { + statsd_udp_port_.value(json.getInteger("statsd_local_udp_port")); } - if (loader->hasObject("statsd_tcp_cluster_name")) { - statsd_tcp_cluster_name_.value(loader->getString("statsd_tcp_cluster_name")); + if (json.hasObject("statsd_tcp_cluster_name")) { + statsd_tcp_cluster_name_.value(json.getString("statsd_tcp_cluster_name")); } - if (loader->hasObject("tracing")) { - initializeTracers(*loader->getObject("tracing")); + stats_flush_interval_ = + std::chrono::milliseconds(json.getInteger("stats_flush_interval_ms", 5000)); + + if (json.hasObject("tracing")) { + initializeTracers(*json.getObject("tracing")); } else { http_tracer_.reset(new Tracing::HttpNullTracer()); } - if (loader->hasObject("rate_limit_service")) { - Json::ObjectPtr rate_limit_service_config = loader->getObject("rate_limit_service"); + if (json.hasObject("rate_limit_service")) { + Json::ObjectPtr rate_limit_service_config = json.getObject("rate_limit_service"); std::string type = rate_limit_service_config->getString("type"); if (type == "grpc_service") { ratelimit_client_factory_.reset(new RateLimit::GrpcFactoryImpl( @@ -69,14 +70,14 @@ void MainImpl::initialize(const std::string& file_path) { } } -void MainImpl::initializeTracers(const Json::Object& tracing_configuration_) { +void MainImpl::initializeTracers(const Json::Object& tracing_configuration) { log().info("loading tracing configuration"); // Initialize http sinks - if (tracing_configuration_.hasObject("http")) { + if (tracing_configuration.hasObject("http")) { http_tracer_.reset(new Tracing::HttpTracerImpl(server_.runtime(), server_.stats())); - Json::ObjectPtr http_tracer_config = tracing_configuration_.getObject("http"); + Json::ObjectPtr http_tracer_config = tracing_configuration.getObject("http"); if (http_tracer_config->hasObject("sinks")) { std::vector sinks = http_tracer_config->getObjectArray("sinks"); @@ -170,22 +171,21 @@ void MainImpl::ListenerConfig::createFilterChain(Network::Connection& connection FilterChainUtility::buildFilterChain(connection, filter_factories_); } -InitialImpl::InitialImpl(const std::string& file_path) { - Json::ObjectPtr loader = Json::Factory::LoadFromFile(file_path); - Json::ObjectPtr admin = loader->getObject("admin"); +InitialImpl::InitialImpl(const Json::Object& json) { + Json::ObjectPtr admin = json.getObject("admin"); admin_.access_log_path_ = admin->getString("access_log_path"); admin_.port_ = admin->getInteger("port"); - if (loader->hasObject("flags_path")) { - flags_path_.value(loader->getString("flags_path")); + if (json.hasObject("flags_path")) { + flags_path_.value(json.getString("flags_path")); } - if (loader->hasObject("runtime")) { + if (json.hasObject("runtime")) { runtime_.reset(new RuntimeImpl()); - runtime_->symlink_root_ = loader->getObject("runtime")->getString("symlink_root"); - runtime_->subdirectory_ = loader->getObject("runtime")->getString("subdirectory"); + runtime_->symlink_root_ = json.getObject("runtime")->getString("symlink_root"); + runtime_->subdirectory_ = json.getObject("runtime")->getString("subdirectory"); runtime_->override_subdirectory_ = - loader->getObject("runtime")->getString("override_subdirectory", ""); + json.getObject("runtime")->getString("override_subdirectory", ""); } } diff --git a/source/server/configuration_impl.h b/source/server/configuration_impl.h index 13aba2e38bea..84747665a347 100644 --- a/source/server/configuration_impl.h +++ b/source/server/configuration_impl.h @@ -63,7 +63,7 @@ class MainImpl : Logger::Loggable, public Main { * will call through the server to get the cluster manager so the server variable must be * initialized. */ - void initialize(const std::string& file_path); + void initialize(const Json::Object& json); // Server::Configuration::Main Upstream::ClusterManager& clusterManager() override { return *cluster_manager_; } @@ -72,12 +72,13 @@ class MainImpl : Logger::Loggable, public Main { RateLimit::ClientFactory& rateLimitClientFactory() override { return *ratelimit_client_factory_; } Optional statsdTcpClusterName() override { return statsd_tcp_cluster_name_; } Optional statsdUdpPort() override { return statsd_udp_port_; } + std::chrono::milliseconds statsFlushInterval() override { return stats_flush_interval_; } private: /** * Initialize tracers and corresponding sinks. */ - void initializeTracers(const Json::Object& tracing_configuration_); + void initializeTracers(const Json::Object& tracing_configuration); /** * Maps JSON config to runtime config for a listener with network filter chain. @@ -116,6 +117,7 @@ class MainImpl : Logger::Loggable, public Main { Optional statsd_tcp_cluster_name_; Optional statsd_udp_port_; RateLimit::ClientFactoryPtr ratelimit_client_factory_; + std::chrono::milliseconds stats_flush_interval_; }; /** @@ -134,7 +136,7 @@ template class RegisterNetworkFilterConfigFactory { */ class InitialImpl : public Initial { public: - InitialImpl(const std::string& file_path); + InitialImpl(const Json::Object& json); // Server::Configuration::Initial Admin& admin() { return admin_; } diff --git a/source/server/server.cc b/source/server/server.cc index 67735d32cd33..47c057ad916f 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -104,7 +104,7 @@ void InstanceImpl::flushStats() { } } - stat_flush_timer_->enableTimer(std::chrono::milliseconds(5000)); + stat_flush_timer_->enableTimer(config_->statsFlushInterval()); } int InstanceImpl::getListenSocketFd(uint32_t port) { @@ -130,7 +130,8 @@ void InstanceImpl::initialize(Options& options, TestHooks& hooks, restarter_.version()); // Handle configuration that needs to take place prior to the main configuration load. - Configuration::InitialImpl initial_config(options.configPath()); + Json::ObjectPtr config_json = Json::Factory::LoadFromFile(options.configPath()); + Configuration::InitialImpl initial_config(*config_json); log().info("admin port: {}", initial_config.admin().port()); HotRestart::ShutdownParentAdminInfo info; @@ -164,7 +165,7 @@ void InstanceImpl::initialize(Options& options, TestHooks& hooks, // per above. See MainImpl::initialize() for why we do this pointer dance. Configuration::MainImpl* main_config = new Configuration::MainImpl(*this); config_.reset(main_config); - main_config->initialize(options.configPath()); + main_config->initialize(*config_json); for (const Configuration::ListenerPtr& listener : config_->listeners()) { // For each listener config we share a single TcpListenSocket among all threaded listeners. @@ -227,7 +228,7 @@ void InstanceImpl::initialize(Options& options, TestHooks& hooks, // Some of the stat sinks may need dispatcher support so don't flush until the main loop starts. // Just setup the timer. stat_flush_timer_ = handler_.dispatcher().createTimer([this]() -> void { flushStats(); }); - stat_flush_timer_->enableTimer(std::chrono::milliseconds(5000)); + stat_flush_timer_->enableTimer(config_->statsFlushInterval()); } Runtime::LoaderPtr InstanceUtil::createRuntime(Instance& server, diff --git a/test/example_configs_test.cc b/test/example_configs_test.cc index 10b0ddd6658d..734dea719a68 100644 --- a/test/example_configs_test.cc +++ b/test/example_configs_test.cc @@ -37,7 +37,8 @@ class ConfigTest { ON_CALL(server_.api_, fileReadToEnd("lightstep_access_token")) .WillByDefault(Return("access_token")); - Server::Configuration::InitialImpl initial_config(file_path); + Json::ObjectPtr config_json = Json::Factory::LoadFromFile(file_path); + Server::Configuration::InitialImpl initial_config(*config_json); Server::Configuration::MainImpl main_config(server_); ON_CALL(server_, clusterManager()) @@ -45,7 +46,7 @@ class ConfigTest { Invoke([&]() -> Upstream::ClusterManager& { return main_config.clusterManager(); })); try { - main_config.initialize(file_path); + main_config.initialize(*config_json); } catch (const EnvoyException& ex) { throw EnvoyException(fmt::format("'{}' config failed. Error: {}", file_path, ex.what())); } diff --git a/test/mocks/server/mocks.cc b/test/mocks/server/mocks.cc index 89755cef8c36..7e4292c6d85c 100644 --- a/test/mocks/server/mocks.cc +++ b/test/mocks/server/mocks.cc @@ -20,7 +20,10 @@ using testing::SaveArg; namespace Server { -MockOptions::MockOptions() {} +MockOptions::MockOptions() { + ON_CALL(*this, serviceZone()).WillByDefault(ReturnRef(service_zone_)); +} + MockOptions::~MockOptions() {} MockAdmin::MockAdmin() {} diff --git a/test/mocks/server/mocks.h b/test/mocks/server/mocks.h index 641bb512f064..f1fda18b16b7 100644 --- a/test/mocks/server/mocks.h +++ b/test/mocks/server/mocks.h @@ -40,6 +40,8 @@ class MockOptions : public Options { MOCK_METHOD0(serviceNodeName, const std::string&()); MOCK_METHOD0(serviceZone, const std::string&()); MOCK_METHOD0(fileFlushIntervalMsec, std::chrono::milliseconds()); + + std::string service_zone_; }; class MockAdmin : public Admin { diff --git a/test/server/configuration_impl_test.cc b/test/server/configuration_impl_test.cc index c0984009ac4e..7aeeff54d644 100644 --- a/test/server/configuration_impl_test.cc +++ b/test/server/configuration_impl_test.cc @@ -2,6 +2,7 @@ #include "test/mocks/common.h" #include "test/mocks/network/mocks.h" +#include "test/mocks/server/mocks.h" using testing::InSequence; using testing::Return; @@ -22,5 +23,47 @@ TEST(FilterChainUtility, buildFilterChain) { FilterChainUtility::buildFilterChain(connection, factories); } +TEST(ConfigurationImplTest, DefaultStatsFlushInterval) { + std::string json = R"EOF( +{ + "listeners": [], + + "cluster_manager": { + "clusters": [] + } +} + )EOF"; + + Json::ObjectPtr loader = Json::Factory::LoadFromString(json); + + NiceMock server; + MainImpl config(server); + config.initialize(*loader); + + EXPECT_EQ(std::chrono::milliseconds(5000), config.statsFlushInterval()); +} + +TEST(ConfigurationImplTest, CustomStatsFlushInterval) { + std::string json = R"EOF( +{ + "listeners": [], + + "stats_flush_interval_ms": 500, + + "cluster_manager": { + "clusters": [] + } +} + )EOF"; + + Json::ObjectPtr loader = Json::Factory::LoadFromString(json); + + NiceMock server; + MainImpl config(server); + config.initialize(*loader); + + EXPECT_EQ(std::chrono::milliseconds(500), config.statsFlushInterval()); +} + } // Configuration } // Server