Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Made stats sinks a statically registered component #1506

Merged
merged 7 commits into from
Sep 5, 2017
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions include/envoy/server/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,9 @@ class Main {
virtual RateLimit::ClientFactory& rateLimitClientFactory() PURE;

/**
* @return Optional<std::string> the optional local/remote TCP statsd cluster to write to.
* This cluster must be defined via the cluster manager configuration.
* @return std::list<Stats::SinkPtr>& the list of stats sinks initialized from the configuration.
*/
virtual Optional<std::string> statsdTcpClusterName() PURE;

/**
* @return Network::Address::InstanceConstSharedPtr the optional UDP statsd address to write to.
*/
virtual Network::Address::InstanceConstSharedPtr statsdUdpIpAddress() PURE;
virtual std::list<Stats::SinkPtr>& statsSinks() PURE;

/**
* @return std::chrono::milliseconds the time interval between flushing to configured stat sinks.
Expand Down
1 change: 1 addition & 0 deletions source/exe/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ envoy_cc_library(
"//source/server/config/network:ratelimit_lib",
"//source/server/config/network:redis_proxy_lib",
"//source/server/config/network:tcp_proxy_lib",
"//source/server/config/stats:statsd_lib",
"//source/server/http:health_check_lib",
],
)
Expand Down
1 change: 0 additions & 1 deletion source/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ envoy_cc_library(
"//source/common/router:rds_lib",
"//source/common/runtime:runtime_lib",
"//source/common/singleton:manager_impl_lib",
"//source/common/stats:statsd_lib",
"//source/common/upstream:cluster_manager_lib",
"//source/server/http:admin_lib",
],
Expand Down
23 changes: 23 additions & 0 deletions source/server/config/stats/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
licenses(["notice"]) # Apache 2

load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_library",
"envoy_package",
)

envoy_package()

envoy_cc_library(
name = "statsd_lib",
srcs = ["statsd.cc"],
hdrs = ["statsd.h"],
external_deps = [
"envoy_bootstrap",
],
deps = [
"//source/common/network:address_lib",
"//source/common/stats:statsd_lib",
"//source/server:configuration_lib",
],
)
54 changes: 54 additions & 0 deletions source/server/config/stats/statsd.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#include "server/config/stats/statsd.h"

#include <string>

#include "envoy/registry/registry.h"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Any header that is depended upon in .{cc,h} should be directly reflected in the deps in the BUILD file. This avoids surprises when A depends on B providing C, but later on, B no longer depends on C. Since we lack tool automation for this in OSS, we don't really enforce it (I'm sure I'm guilty of many breaches of this), but best to try and shoot for it. Basically, you want IWYU (include what you use, i.e. have all include headers in a file directly for symbols that appear in the file) and then BUILD dep pointing at libs providing the headers.


#include "common/stats/statsd.h"

#include "api/bootstrap.pb.h"

namespace Envoy {
namespace Server {
namespace Configuration {

Stats::SinkPtr StatsdSinkFactory::createStatsSink(const Protobuf::Message& config,
Server::Instance& server,
Upstream::ClusterManager& cluster_manager) {

const auto& statsd_sink = dynamic_cast<const envoy::api::v2::StatsdSink&>(config);
switch (statsd_sink.statsd_specifier_case()) {
case envoy::api::v2::StatsdSink::kAddress: {
Network::Address::InstanceConstSharedPtr address =
Network::Utility::fromProtoAddress(statsd_sink.address());
ENVOY_LOG(info, "statsd UDP ip address: {}", address->asString());
return Stats::SinkPtr(
new Stats::Statsd::UdpStatsdSink(server.threadLocal(), std::move(address)));
break;
}
case envoy::api::v2::StatsdSink::kTcpClusterName:
ENVOY_LOG(info, "statsd TCP cluster: {}", statsd_sink.tcp_cluster_name());
return Stats::SinkPtr(
new Stats::Statsd::TcpStatsdSink(server.localInfo(), statsd_sink.tcp_cluster_name(),
server.threadLocal(), cluster_manager, server.stats()));
break;
default:
throw EnvoyException(
fmt::format("No tcp_cluster_name or address provided for {} Stats::Sink config", name()));
}
}

ProtobufTypes::MessagePtr StatsdSinkFactory::createEmptyConfigProto() {
return std::unique_ptr<envoy::api::v2::StatsdSink>(new envoy::api::v2::StatsdSink());
}

std::string StatsdSinkFactory::name() { return "envoy.statsd"; }

/**
* Static registration for the statsd sink factory. @see RegisterFactory.
*/
static Registry::RegisterFactory<StatsdSinkFactory, StatsSinkFactory> register_;

} // namespace Configuration
} // namespace Server
} // namespace Envoy
29 changes: 29 additions & 0 deletions source/server/config/stats/statsd.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#pragma once

#include <string>

#include "envoy/server/instance.h"

#include "server/configuration_impl.h"

namespace Envoy {
namespace Server {
namespace Configuration {

/**
* Config registration for the tcp statsd sink. @see StatsSinkFactory.
*/
class StatsdSinkFactory : Logger::Loggable<Logger::Id::config>, public StatsSinkFactory {
public:
// StatsSinkFactory
Stats::SinkPtr createStatsSink(const Protobuf::Message& config, Instance& server,
Upstream::ClusterManager& cluster_manager) override;

ProtobufTypes::MessagePtr createEmptyConfigProto() override;

std::string name() override;
};

} // namespace Configuration
} // namespace Server
} // namespace Envoy
47 changes: 28 additions & 19 deletions source/server/configuration_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,25 +55,6 @@ void MainImpl::initialize(const envoy::api::v2::Bootstrap& bootstrap, Instance&
server.localInfo(), server.stats(), server.listenerManager()));
}

for (const auto& stats_sink : bootstrap.stats_sinks()) {
// TODO(mrice32): Add support for pluggable stats sinks.
ASSERT(stats_sink.name() == "envoy.statsd");
envoy::api::v2::StatsdSink statsd_sink;
MessageUtil::jsonConvert(stats_sink.config(), statsd_sink);

switch (statsd_sink.statsd_specifier_case()) {
case envoy::api::v2::StatsdSink::kAddress: {
statsd_udp_ip_address_ = Network::Utility::fromProtoAddress(statsd_sink.address());
break;
}
case envoy::api::v2::StatsdSink::kTcpClusterName:
statsd_tcp_cluster_name_.value(statsd_sink.tcp_cluster_name());
break;
default:
NOT_REACHED;
}
}

stats_flush_interval_ =
std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(bootstrap, stats_flush_interval, 5000));

Expand All @@ -95,6 +76,8 @@ void MainImpl::initialize(const envoy::api::v2::Bootstrap& bootstrap, Instance&
} else {
ratelimit_client_factory_.reset(new RateLimit::NullFactoryImpl());
}

initializeStatsSinks(bootstrap, server);
}

void MainImpl::initializeTracers(const envoy::api::v2::Tracing& configuration, Instance& server) {
Expand Down Expand Up @@ -127,6 +110,32 @@ void MainImpl::initializeTracers(const envoy::api::v2::Tracing& configuration, I
}
}

void MainImpl::initializeStatsSinks(const envoy::api::v2::Bootstrap& bootstrap, Instance& server) {
ENVOY_LOG(info, "loading stats sink configuration");

for (const envoy::api::v2::StatsSink& sink_object : bootstrap.stats_sinks()) {
if (sink_object.name().empty()) {
throw EnvoyException(
"sink object does not have 'name' attribute to look up the implementation");
}

if (!sink_object.has_config()) {
throw EnvoyException(
"sink object does not contain the 'config' object to configure the implementation");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think an empty default config is legit, we could have some filters that don't require config.

}

ProtobufTypes::String name = sink_object.name();
StatsSinkFactory* factory = Registry::FactoryRegistry<StatsSinkFactory>::getFactory(name);
if (factory != nullptr) {
ProtobufTypes::MessagePtr message = factory->createEmptyConfigProto();
MessageUtil::jsonConvert(sink_object.config(), *message);
stats_sinks_.emplace_back(factory->createStatsSink(*message, server, *cluster_manager_));
} else {
throw EnvoyException(fmt::format("No Stats::Sink found for name: {}", name));
}
}
}

InitialImpl::InitialImpl(const envoy::api::v2::Bootstrap& bootstrap) {
const auto& admin = bootstrap.admin();
admin_.access_log_path_ = admin.access_log_path();
Expand Down
44 changes: 38 additions & 6 deletions source/server/configuration_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,40 @@ class HttpTracerFactory {
virtual std::string name() PURE;
};

/**
* Implemented for each Stats::Sink and registered via Registry::registerFactory() or
* the convenience class RegisterFactory.
*/
class StatsSinkFactory {
public:
virtual ~StatsSinkFactory() {}

/**
* Create a particular Stats::Sink implementation. If the implementation is unable to produce a
* Stats::Sink with the provided parameters, it should throw an EnvoyException in the case of
* general error or a Json::Exception if the json configuration is erroneous. The returned
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Json::Exception? This is a pure v2 feature, we shouldn't have RapidJSON involved as we're working in proto land here. The only JSON conversion is performed by the protobuf library that won't create these types of exceptions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see, this is copy+paste from https://github.com/lyft/envoy/blob/master/include/envoy/server/filter_config.h. In filter_config.h, the JSON stuff doesn't apply to the ...FromProto() variants.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, sorry, that was just a typo. Thought I removed it.

* pointer should always be valid.
* @param config supplies the custom proto configuration for the Stats::Sink
* @param server supplies the server instance
* @param cluster_manager supplies the cluster_manager instance
*/
virtual Stats::SinkPtr createStatsSink(const Protobuf::Message& config, Instance& server,
Upstream::ClusterManager& cluster_manager) PURE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is ClusterManager a distinct parameter here? I think it can be inferred from the Server::Instance parameter?


/**
* @return ProtobufTypes::MessagePtr create empty config proto message for v2. The filter
* config, which arrives in an opaque google.protobuf.Struct message, will be converted to
* JSON and then parsed into this empty proto.
*/
virtual ProtobufTypes::MessagePtr createEmptyConfigProto() PURE;

/**
* Returns the identifying name for a particular implementation of Stats::Sink produced by the
* factory.
*/
virtual std::string name() PURE;
};

/**
* Utilities for creating a filter chain for a network connection.
*/
Expand Down Expand Up @@ -88,10 +122,7 @@ class MainImpl : Logger::Loggable<Logger::Id::config>, public Main {
Upstream::ClusterManager& clusterManager() override { return *cluster_manager_; }
Tracing::HttpTracer& httpTracer() override { return *http_tracer_; }
RateLimit::ClientFactory& rateLimitClientFactory() override { return *ratelimit_client_factory_; }
Optional<std::string> statsdTcpClusterName() override { return statsd_tcp_cluster_name_; }
Network::Address::InstanceConstSharedPtr statsdUdpIpAddress() override {
return statsd_udp_ip_address_;
}
std::list<Stats::SinkPtr>& statsSinks() override { return stats_sinks_; }
std::chrono::milliseconds statsFlushInterval() override { return stats_flush_interval_; }
std::chrono::milliseconds wdMissTimeout() const override { return watchdog_miss_timeout_; }
std::chrono::milliseconds wdMegaMissTimeout() const override {
Expand All @@ -108,11 +139,12 @@ class MainImpl : Logger::Loggable<Logger::Id::config>, public Main {
*/
void initializeTracers(const envoy::api::v2::Tracing& configuration, Instance& server);

void initializeStatsSinks(const envoy::api::v2::Bootstrap& bootstrap, Instance& server);

std::unique_ptr<Upstream::ClusterManager> cluster_manager_;
std::unique_ptr<LdsApi> lds_api_;
Tracing::HttpTracerPtr http_tracer_;
Optional<std::string> statsd_tcp_cluster_name_;
Network::Address::InstanceConstSharedPtr statsd_udp_ip_address_;
std::list<Stats::SinkPtr> stats_sinks_;
RateLimit::ClientFactoryPtr ratelimit_client_factory_;
std::chrono::milliseconds stats_flush_interval_;
std::chrono::milliseconds watchdog_miss_timeout_;
Expand Down
24 changes: 4 additions & 20 deletions source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include "common/router/rds_impl.h"
#include "common/runtime/runtime_impl.h"
#include "common/singleton/manager_impl.h"
#include "common/stats/statsd.h"
#include "common/upstream/cluster_manager_impl.h"

#include "server/configuration_impl.h"
Expand Down Expand Up @@ -132,7 +131,7 @@ void InstanceImpl::flushStats() {
server_stats_.days_until_first_cert_expiring_.set(
sslContextManager().daysUntilFirstCertExpires());

InstanceUtil::flushCountersAndGaugesToSinks(stat_sinks_, stats_store_);
InstanceUtil::flushCountersAndGaugesToSinks(config_->statsSinks(), stats_store_);
stat_flush_timer_->enableTimer(config_->statsFlushInterval());
}

Expand Down Expand Up @@ -228,7 +227,9 @@ void InstanceImpl::initialize(Options& options,
ENVOY_LOG(warn, "caught and eating SIGHUP. See documentation for how to hot restart.");
});

initializeStatSinks();
for (Stats::SinkPtr& sink : main_config->statsSinks()) {
stats_store_.addSink(*sink);
}

// Some of the stat sinks may need dispatcher support so don't flush until the main loop starts.
// Just setup the timer.
Expand Down Expand Up @@ -268,23 +269,6 @@ Runtime::LoaderPtr InstanceUtil::createRuntime(Instance& server,
}
}

void InstanceImpl::initializeStatSinks() {
if (config_->statsdUdpIpAddress()) {
ENVOY_LOG(info, "statsd UDP ip address: {}", config_->statsdUdpIpAddress()->asString());
stat_sinks_.emplace_back(
new Stats::Statsd::UdpStatsdSink(thread_local_, config_->statsdUdpIpAddress()));
stats_store_.addSink(*stat_sinks_.back());
}

if (config_->statsdTcpClusterName().valid()) {
ENVOY_LOG(info, "statsd TCP cluster: {}", config_->statsdTcpClusterName().value());
stat_sinks_.emplace_back(
new Stats::Statsd::TcpStatsdSink(*local_info_, config_->statsdTcpClusterName().value(),
thread_local_, config_->clusterManager(), stats_store_));
stats_store_.addSink(*stat_sinks_.back());
}
}

void InstanceImpl::loadServerFlags(const Optional<std::string>& flags_path) {
if (!flags_path.valid()) {
return;
Expand Down
2 changes: 0 additions & 2 deletions source/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public Instance {
void flushStats();
void initialize(Options& options, Network::Address::InstanceConstSharedPtr local_address,
ComponentFactory& component_factory);
void initializeStatSinks();
void loadServerFlags(const Optional<std::string>& flags_path);
uint64_t numConnections();
void startWorkers();
Expand All @@ -148,7 +147,6 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public Instance {
const time_t start_time_;
time_t original_start_time_;
Stats::StoreRoot& stats_store_;
std::list<Stats::SinkPtr> stat_sinks_;
ServerStats server_stats_;
ThreadLocal::Instance& thread_local_;
Api::ApiPtr api_;
Expand Down
1 change: 1 addition & 0 deletions test/config_test/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ envoy_cc_test(
],
deps = [
":config_test_lib",
"//source/server/config/stats:statsd_lib",
"//test/test_common:environment_lib",
"//test/test_common:utility_lib",
],
Expand Down
3 changes: 2 additions & 1 deletion test/integration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ envoy_cc_test_library(
"//source/server/config/network:ratelimit_lib",
"//source/server/config/network:redis_proxy_lib",
"//source/server/config/network:tcp_proxy_lib",
"//source/server/config/stats:statsd_lib",
"//source/server/http:health_check_lib",
"//test/mocks/buffer:buffer_mocks",
"//test/mocks/upstream:upstream_mocks",
Expand All @@ -199,6 +200,7 @@ envoy_cc_test(
deps = [
":integration_lib",
"//source/common/http:header_map_lib",
"//source/server/config/stats:statsd_lib",
"//test/test_common:utility_lib",
],
)
Expand Down Expand Up @@ -228,7 +230,6 @@ envoy_cc_test(
":integration_lib",
"//source/common/buffer:buffer_lib",
"//source/common/http:codec_client_lib",
"//source/common/stats:stats_lib",
],
)

Expand Down
3 changes: 1 addition & 2 deletions test/mocks/server/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,7 @@ class MockMain : public Main {
MOCK_METHOD0(clusterManager, Upstream::ClusterManager&());
MOCK_METHOD0(httpTracer, Tracing::HttpTracer&());
MOCK_METHOD0(rateLimitClientFactory, RateLimit::ClientFactory&());
MOCK_METHOD0(statsdTcpClusterName, Optional<std::string>());
MOCK_METHOD0(statsdUdpIpAddress, Network::Address::InstanceConstSharedPtr());
MOCK_METHOD0(statsSinks, std::list<Stats::SinkPtr>&());
MOCK_METHOD0(statsFlushInterval, std::chrono::milliseconds());
MOCK_CONST_METHOD0(wdMissTimeout, std::chrono::milliseconds());
MOCK_CONST_METHOD0(wdMegaMissTimeout, std::chrono::milliseconds());
Expand Down
Loading