Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stats: allow configurable flush interval #267

Merged
merged 1 commit into from
Dec 2, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/configuration/overview/overview.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ miscellaneous configuration.
"flags_path": "...",
"statsd_local_udp_port": "...",
"statsd_tcp_cluster_name": "...",
"stats_flush_interval_ms": "...",
"tracing": "{...}",
"rate_limit_service": "{...}",
"runtime": "{...}",
Expand Down Expand Up @@ -48,6 +49,11 @@ statsd_tcp_cluster_name
listener. If specified, Envoy will connect to this cluster to flush :ref:`statistics
<arch_overview_statistics>`.

stats_flush_interval_ms
*(optional, integer)* The time in milliseconds between flushes to configured stats sinks. For
performance reasons Envoy latches counters and only flushes counters and gauges at a periodic
interval. If not specified the default is 5000ms (5 seconds).

:ref:`tracing <config_tracing>`
*(optional, object)* Configuration for an external :ref:`tracing <arch_overview_tracing>`
provider. If not specified, no tracing will be performed.
Expand Down
6 changes: 6 additions & 0 deletions include/envoy/server/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ class Main {
* @return Optional<uint32_t> the optional local UDP statsd port to write to.
*/
virtual Optional<uint32_t> statsdUdpPort() PURE;

/**
* @return the time interval between flushing to configured stat sinks. The server latches
* counters.
*/
virtual std::chrono::milliseconds statsFlushInterval() PURE;
};

/**
Expand Down
50 changes: 25 additions & 25 deletions source/server/configuration_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,38 +25,39 @@ void FilterChainUtility::buildFilterChain(Network::FilterManager& filter_manager

MainImpl::MainImpl(Server::Instance& server) : server_(server) {}

void MainImpl::initialize(const std::string& file_path) {
Json::ObjectPtr loader = Json::Factory::LoadFromFile(file_path);

void MainImpl::initialize(const Json::Object& json) {
cluster_manager_.reset(new Upstream::ProdClusterManagerImpl(
*loader->getObject("cluster_manager"), server_.stats(), server_.threadLocal(),
*json.getObject("cluster_manager"), server_.stats(), server_.threadLocal(),
server_.dnsResolver(), server_.sslContextManager(), server_.runtime(), server_.random(),
server_.options().serviceZone(), server_.getLocalAddress()));

std::vector<Json::ObjectPtr> listeners = loader->getObjectArray("listeners");
std::vector<Json::ObjectPtr> listeners = json.getObjectArray("listeners");
log().info("loading {} listener(s)", listeners.size());
for (size_t i = 0; i < listeners.size(); i++) {
log().info("listener #{}:", i);
listeners_.emplace_back(
Server::Configuration::ListenerPtr{new ListenerConfig(*this, *listeners[i])});
}

if (loader->hasObject("statsd_local_udp_port")) {
statsd_udp_port_.value(loader->getInteger("statsd_local_udp_port"));
if (json.hasObject("statsd_local_udp_port")) {
statsd_udp_port_.value(json.getInteger("statsd_local_udp_port"));
}

if (loader->hasObject("statsd_tcp_cluster_name")) {
statsd_tcp_cluster_name_.value(loader->getString("statsd_tcp_cluster_name"));
if (json.hasObject("statsd_tcp_cluster_name")) {
statsd_tcp_cluster_name_.value(json.getString("statsd_tcp_cluster_name"));
}

if (loader->hasObject("tracing")) {
initializeTracers(*loader->getObject("tracing"));
stats_flush_interval_ =
std::chrono::milliseconds(json.getInteger("stats_flush_interval_ms", 5000));

if (json.hasObject("tracing")) {
initializeTracers(*json.getObject("tracing"));
} else {
http_tracer_.reset(new Tracing::HttpNullTracer());
}

if (loader->hasObject("rate_limit_service")) {
Json::ObjectPtr rate_limit_service_config = loader->getObject("rate_limit_service");
if (json.hasObject("rate_limit_service")) {
Json::ObjectPtr rate_limit_service_config = json.getObject("rate_limit_service");
std::string type = rate_limit_service_config->getString("type");
if (type == "grpc_service") {
ratelimit_client_factory_.reset(new RateLimit::GrpcFactoryImpl(
Expand All @@ -69,14 +70,14 @@ void MainImpl::initialize(const std::string& file_path) {
}
}

void MainImpl::initializeTracers(const Json::Object& tracing_configuration_) {
void MainImpl::initializeTracers(const Json::Object& tracing_configuration) {
log().info("loading tracing configuration");

// Initialize http sinks
if (tracing_configuration_.hasObject("http")) {
if (tracing_configuration.hasObject("http")) {
http_tracer_.reset(new Tracing::HttpTracerImpl(server_.runtime(), server_.stats()));

Json::ObjectPtr http_tracer_config = tracing_configuration_.getObject("http");
Json::ObjectPtr http_tracer_config = tracing_configuration.getObject("http");

if (http_tracer_config->hasObject("sinks")) {
std::vector<Json::ObjectPtr> sinks = http_tracer_config->getObjectArray("sinks");
Expand Down Expand Up @@ -170,22 +171,21 @@ void MainImpl::ListenerConfig::createFilterChain(Network::Connection& connection
FilterChainUtility::buildFilterChain(connection, filter_factories_);
}

InitialImpl::InitialImpl(const std::string& file_path) {
Json::ObjectPtr loader = Json::Factory::LoadFromFile(file_path);
Json::ObjectPtr admin = loader->getObject("admin");
InitialImpl::InitialImpl(const Json::Object& json) {
Json::ObjectPtr admin = json.getObject("admin");
admin_.access_log_path_ = admin->getString("access_log_path");
admin_.port_ = admin->getInteger("port");

if (loader->hasObject("flags_path")) {
flags_path_.value(loader->getString("flags_path"));
if (json.hasObject("flags_path")) {
flags_path_.value(json.getString("flags_path"));
}

if (loader->hasObject("runtime")) {
if (json.hasObject("runtime")) {
runtime_.reset(new RuntimeImpl());
runtime_->symlink_root_ = loader->getObject("runtime")->getString("symlink_root");
runtime_->subdirectory_ = loader->getObject("runtime")->getString("subdirectory");
runtime_->symlink_root_ = json.getObject("runtime")->getString("symlink_root");
runtime_->subdirectory_ = json.getObject("runtime")->getString("subdirectory");
runtime_->override_subdirectory_ =
loader->getObject("runtime")->getString("override_subdirectory", "");
json.getObject("runtime")->getString("override_subdirectory", "");
}
}

Expand Down
8 changes: 5 additions & 3 deletions source/server/configuration_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class MainImpl : Logger::Loggable<Logger::Id::config>, public Main {
* will call through the server to get the cluster manager so the server variable must be
* initialized.
*/
void initialize(const std::string& file_path);
void initialize(const Json::Object& json);

// Server::Configuration::Main
Upstream::ClusterManager& clusterManager() override { return *cluster_manager_; }
Expand All @@ -72,12 +72,13 @@ class MainImpl : Logger::Loggable<Logger::Id::config>, public Main {
RateLimit::ClientFactory& rateLimitClientFactory() override { return *ratelimit_client_factory_; }
Optional<std::string> statsdTcpClusterName() override { return statsd_tcp_cluster_name_; }
Optional<uint32_t> statsdUdpPort() override { return statsd_udp_port_; }
std::chrono::milliseconds statsFlushInterval() override { return stats_flush_interval_; }

private:
/**
* Initialize tracers and corresponding sinks.
*/
void initializeTracers(const Json::Object& tracing_configuration_);
void initializeTracers(const Json::Object& tracing_configuration);

/**
* Maps JSON config to runtime config for a listener with network filter chain.
Expand Down Expand Up @@ -116,6 +117,7 @@ class MainImpl : Logger::Loggable<Logger::Id::config>, public Main {
Optional<std::string> statsd_tcp_cluster_name_;
Optional<uint32_t> statsd_udp_port_;
RateLimit::ClientFactoryPtr ratelimit_client_factory_;
std::chrono::milliseconds stats_flush_interval_;
};

/**
Expand All @@ -134,7 +136,7 @@ template <class T> class RegisterNetworkFilterConfigFactory {
*/
class InitialImpl : public Initial {
public:
InitialImpl(const std::string& file_path);
InitialImpl(const Json::Object& json);

// Server::Configuration::Initial
Admin& admin() { return admin_; }
Expand Down
9 changes: 5 additions & 4 deletions source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ void InstanceImpl::flushStats() {
}
}

stat_flush_timer_->enableTimer(std::chrono::milliseconds(5000));
stat_flush_timer_->enableTimer(config_->statsFlushInterval());
}

int InstanceImpl::getListenSocketFd(uint32_t port) {
Expand All @@ -130,7 +130,8 @@ void InstanceImpl::initialize(Options& options, TestHooks& hooks,
restarter_.version());

// Handle configuration that needs to take place prior to the main configuration load.
Configuration::InitialImpl initial_config(options.configPath());
Json::ObjectPtr config_json = Json::Factory::LoadFromFile(options.configPath());
Configuration::InitialImpl initial_config(*config_json);
log().info("admin port: {}", initial_config.admin().port());

HotRestart::ShutdownParentAdminInfo info;
Expand Down Expand Up @@ -164,7 +165,7 @@ void InstanceImpl::initialize(Options& options, TestHooks& hooks,
// per above. See MainImpl::initialize() for why we do this pointer dance.
Configuration::MainImpl* main_config = new Configuration::MainImpl(*this);
config_.reset(main_config);
main_config->initialize(options.configPath());
main_config->initialize(*config_json);

for (const Configuration::ListenerPtr& listener : config_->listeners()) {
// For each listener config we share a single TcpListenSocket among all threaded listeners.
Expand Down Expand Up @@ -227,7 +228,7 @@ void InstanceImpl::initialize(Options& options, TestHooks& hooks,
// Some of the stat sinks may need dispatcher support so don't flush until the main loop starts.
// Just setup the timer.
stat_flush_timer_ = handler_.dispatcher().createTimer([this]() -> void { flushStats(); });
stat_flush_timer_->enableTimer(std::chrono::milliseconds(5000));
stat_flush_timer_->enableTimer(config_->statsFlushInterval());
}

Runtime::LoaderPtr InstanceUtil::createRuntime(Instance& server,
Expand Down
5 changes: 3 additions & 2 deletions test/example_configs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,16 @@ class ConfigTest {
ON_CALL(server_.api_, fileReadToEnd("lightstep_access_token"))
.WillByDefault(Return("access_token"));

Server::Configuration::InitialImpl initial_config(file_path);
Json::ObjectPtr config_json = Json::Factory::LoadFromFile(file_path);
Server::Configuration::InitialImpl initial_config(*config_json);
Server::Configuration::MainImpl main_config(server_);

ON_CALL(server_, clusterManager())
.WillByDefault(
Invoke([&]() -> Upstream::ClusterManager& { return main_config.clusterManager(); }));

try {
main_config.initialize(file_path);
main_config.initialize(*config_json);
} catch (const EnvoyException& ex) {
throw EnvoyException(fmt::format("'{}' config failed. Error: {}", file_path, ex.what()));
}
Expand Down
5 changes: 4 additions & 1 deletion test/mocks/server/mocks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ using testing::SaveArg;

namespace Server {

MockOptions::MockOptions() {}
MockOptions::MockOptions() {
ON_CALL(*this, serviceZone()).WillByDefault(ReturnRef(service_zone_));
}

MockOptions::~MockOptions() {}

MockAdmin::MockAdmin() {}
Expand Down
2 changes: 2 additions & 0 deletions test/mocks/server/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class MockOptions : public Options {
MOCK_METHOD0(serviceNodeName, const std::string&());
MOCK_METHOD0(serviceZone, const std::string&());
MOCK_METHOD0(fileFlushIntervalMsec, std::chrono::milliseconds());

std::string service_zone_;
};

class MockAdmin : public Admin {
Expand Down
43 changes: 43 additions & 0 deletions test/server/configuration_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "test/mocks/common.h"
#include "test/mocks/network/mocks.h"
#include "test/mocks/server/mocks.h"

using testing::InSequence;
using testing::Return;
Expand All @@ -22,5 +23,47 @@ TEST(FilterChainUtility, buildFilterChain) {
FilterChainUtility::buildFilterChain(connection, factories);
}

TEST(ConfigurationImplTest, DefaultStatsFlushInterval) {
std::string json = R"EOF(
{
"listeners": [],

"cluster_manager": {
"clusters": []
}
}
)EOF";

Json::ObjectPtr loader = Json::Factory::LoadFromString(json);

NiceMock<Server::MockInstance> server;
MainImpl config(server);
config.initialize(*loader);

EXPECT_EQ(std::chrono::milliseconds(5000), config.statsFlushInterval());
}

TEST(ConfigurationImplTest, CustomStatsFlushInterval) {
std::string json = R"EOF(
{
"listeners": [],

"stats_flush_interval_ms": 500,

"cluster_manager": {
"clusters": []
}
}
)EOF";

Json::ObjectPtr loader = Json::Factory::LoadFromString(json);

NiceMock<Server::MockInstance> server;
MainImpl config(server);
config.initialize(*loader);

EXPECT_EQ(std::chrono::milliseconds(500), config.statsFlushInterval());
}

} // Configuration
} // Server