Skip to content

Commit

Permalink
Capture file flush interval as command line argument (#61)
Browse files Browse the repository at this point in the history
  • Loading branch information
junr03 authored Sep 13, 2016
1 parent d086cb6 commit 5cf9bd7
Show file tree
Hide file tree
Showing 22 changed files with 63 additions and 29 deletions.
9 changes: 9 additions & 0 deletions docs/operations/cli.rst
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,12 @@ following are the command line options that Envoy supports.
*(optional)* Defines the local service zone where Envoy is running. Though optional, it should
be set if discovery service routing is used and the discovery service exposes :ref:`zone data
<config_cluster_manager_sds_api_host_az>`.

.. option:: --file-flush-interval-msec <integer>

*(optional)* The file flushing interval in milliseconds. Defaults to 10 seconds.
This setting is used during file creation to determine the duration between flushes
of buffers to files. The buffer will flush every time it gets full, or every time
the interval has elapsed, whichever comes first. Adjusting this setting is useful
when tailing :ref:`access logs <arch_overview_http_access_logs>` in order to
get more (or less) immediate flushing.
5 changes: 5 additions & 0 deletions include/envoy/server/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ class Options {
* @return const std::string& the service zone where the server is running.
*/
virtual const std::string& serviceZone() PURE;

/**
* @return std::chrono::milliseconds the duration in msec between log flushes.
*/
virtual std::chrono::milliseconds fileFlushIntervalMsec() PURE;
};

} // Server
8 changes: 5 additions & 3 deletions source/common/api/api_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ Event::DispatcherPtr Impl::allocateDispatcher() {
return Event::DispatcherPtr{new Event::DispatcherImpl()};
}

Impl::Impl() : os_sys_calls_(new Filesystem::OsSysCallsImpl()) {}
Impl::Impl(std::chrono::milliseconds file_flush_interval_msec)
: os_sys_calls_(new Filesystem::OsSysCallsImpl()),
file_flush_interval_msec_(file_flush_interval_msec) {}

Filesystem::FilePtr Impl::createFile(const std::string& path, Event::Dispatcher& dispatcher,
Thread::BasicLockable& lock, Stats::Store& stats_store) {
return Filesystem::FilePtr{
new Filesystem::FileImpl(path, dispatcher, lock, *os_sys_calls_, stats_store)};
return Filesystem::FilePtr{new Filesystem::FileImpl(path, dispatcher, lock, *os_sys_calls_,
stats_store, file_flush_interval_msec_)};
}

bool Impl::fileExists(const std::string& path) { return Filesystem::fileExists(path); }
Expand Down
3 changes: 2 additions & 1 deletion source/common/api/api_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace Api {
*/
class Impl : public Api::Api {
public:
Impl();
Impl(std::chrono::milliseconds file_flush_interval_msec);

// Api::Api
Event::DispatcherPtr allocateDispatcher() override;
Expand All @@ -21,6 +21,7 @@ class Impl : public Api::Api {

private:
Filesystem::OsSysCallsPtr os_sys_calls_;
std::chrono::milliseconds file_flush_interval_msec_;
};

} // Api
3 changes: 0 additions & 3 deletions source/common/filesystem/filesystem_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@
#include <iostream>

namespace Filesystem {
const std::chrono::milliseconds FileImpl::FLUSH_INTERVAL_MSEC =
std::chrono::milliseconds(10 * 1000);

bool fileExists(const std::string& path) {
std::ifstream input_file(path);
return input_file.is_open();
Expand Down
5 changes: 2 additions & 3 deletions source/common/filesystem/filesystem_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class FileImpl : public File {
public:
FileImpl(const std::string& path, Event::Dispatcher& dispatcher, Thread::BasicLockable& lock,
OsSysCalls& osSysCalls, Stats::Store& stats_store,
std::chrono::milliseconds flush_interval_msec = FLUSH_INTERVAL_MSEC);
std::chrono::milliseconds flush_interval_msec);
~FileImpl();

// Filesystem::File
Expand All @@ -79,8 +79,6 @@ class FileImpl : public File {

// Minimum size before the flush thread will be told to flush.
static const uint64_t MIN_FLUSH_SIZE = 1024 * 64;
// Time interval buffer gets flushed no matter if it reached the MIN_FLUSH_SIZE or not.
static const std::chrono::milliseconds FLUSH_INTERVAL_MSEC;

int fd_;
std::string path_;
Expand All @@ -93,6 +91,7 @@ class FileImpl : public File {
Event::TimerPtr flush_timer_;
Event::Dispatcher& dispatcher_;
OsSysCalls& os_sys_calls_;
// Time interval buffer gets flushed no matter if it reached the MIN_FLUSH_SIZE or not.
const std::chrono::milliseconds flush_interval_msec_;
FileSystemStats stats_;
};
Expand Down
5 changes: 3 additions & 2 deletions source/server/connection_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@

#include "common/api/api_impl.h"

ConnectionHandler::ConnectionHandler(Stats::Store& stats_store, spdlog::logger& logger)
: stats_store_(stats_store), logger_(logger), api_(new Api::Impl()),
ConnectionHandler::ConnectionHandler(Stats::Store& stats_store, spdlog::logger& logger,
std::chrono::milliseconds file_flush_interval_msec)
: stats_store_(stats_store), logger_(logger), api_(new Api::Impl(file_flush_interval_msec)),
dispatcher_(api_->allocateDispatcher()),
watchdog_miss_counter_(stats_store.counter("server.watchdog_miss")),
watchdog_mega_miss_counter_(stats_store.counter("server.watchdog_mega_miss")) {}
Expand Down
5 changes: 3 additions & 2 deletions source/server/connection_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
COUNTER(downstream_cx_total) \
COUNTER(downstream_cx_destroy) \
GAUGE (downstream_cx_active) \
TIMER (downstream_cx_length_ms) \
TIMER (downstream_cx_length_ms)
// clang-format on

/**
Expand All @@ -33,7 +33,8 @@ struct ListenerStats {
*/
class ConnectionHandler final : NonCopyable {
public:
ConnectionHandler(Stats::Store& stats_store, spdlog::logger& logger);
ConnectionHandler(Stats::Store& stats_store, spdlog::logger& logger,
std::chrono::milliseconds file_flush_interval_msec);
~ConnectionHandler();

Api::Api& api() { return *api_; }
Expand Down
4 changes: 4 additions & 0 deletions source/server/options_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ OptionsImpl::OptionsImpl(int argc, char** argv, const std::string& hot_restart_v
cmd);
TCLAP::ValueArg<std::string> service_zone("", "service-zone", "Zone name", false, "", "string",
cmd);
TCLAP::ValueArg<uint64_t> file_flush_interval_msec("", "file-flush-interval-msec",
"Interval for log flushing in msec", false,
10000, "uint64_t", cmd);

try {
cmd.parse(argc, argv);
Expand All @@ -48,4 +51,5 @@ OptionsImpl::OptionsImpl(int argc, char** argv, const std::string& hot_restart_v
service_cluster_ = service_cluster.getValue();
service_node_ = service_node.getValue();
service_zone_ = service_zone.getValue();
file_flush_interval_msec_ = std::chrono::milliseconds(file_flush_interval_msec.getValue());
}
2 changes: 2 additions & 0 deletions source/server/options_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class OptionsImpl : public Server::Options {
const std::string& serviceClusterName() override { return service_cluster_; }
const std::string& serviceNodeName() override { return service_node_; }
const std::string& serviceZone() override { return service_zone_; }
std::chrono::milliseconds fileFlushIntervalMsec() override { return file_flush_interval_msec_; }

private:
uint64_t base_id_;
Expand All @@ -29,4 +30,5 @@ class OptionsImpl : public Server::Options {
std::string service_cluster_;
std::string service_node_;
std::string service_zone_;
std::chrono::milliseconds file_flush_interval_msec_;
};
5 changes: 3 additions & 2 deletions source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ InstanceImpl::InstanceImpl(Options& options, TestHooks& hooks, HotRestart& resta
: options_(options), restarter_(restarter), start_time_(time(nullptr)),
original_start_time_(start_time_), stats_store_(store), access_log_lock_(access_log_lock),
server_stats_{ALL_SERVER_STATS(POOL_GAUGE_PREFIX(stats_store_, "server."))},
handler_(stats_store_, log()), dns_resolver_(handler_.dispatcher().createDnsResolver()),
handler_(stats_store_, log(), options.fileFlushIntervalMsec()),
dns_resolver_(handler_.dispatcher().createDnsResolver()),
local_address_(Network::Utility::getLocalAddress()) {

failHealthcheck(false);
Expand Down Expand Up @@ -144,7 +145,7 @@ void InstanceImpl::initialize(Options& options, TestHooks& hooks,

// Workers get created first so they register for thread local updates.
for (uint32_t i = 0; i < std::max(1U, options.concurrency()); i++) {
workers_.emplace_back(new Worker(stats_store_, thread_local_));
workers_.emplace_back(new Worker(stats_store_, thread_local_, options.fileFlushIntervalMsec()));
}

// The main thread is also registered for thread local updates so that code that does not care
Expand Down
5 changes: 3 additions & 2 deletions source/server/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@

#include "common/common/thread.h"

Worker::Worker(Stats::Store& stats_store, ThreadLocal::Instance& tls)
: tls_(tls), handler_(new ConnectionHandler(stats_store, log())) {
Worker::Worker(Stats::Store& stats_store, ThreadLocal::Instance& tls,
std::chrono::milliseconds file_flush_interval_msec)
: tls_(tls), handler_(new ConnectionHandler(stats_store, log(), file_flush_interval_msec)) {
tls_.registerThread(handler_->dispatcher(), false);
}

Expand Down
3 changes: 2 additions & 1 deletion source/server/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ typedef std::map<Server::Configuration::Listener*, Network::TcpListenSocketPtr>
*/
class Worker : Logger::Loggable<Logger::Id::main> {
public:
Worker(Stats::Store& stats_store, ThreadLocal::Instance& tls);
Worker(Stats::Store& stats_store, ThreadLocal::Instance& tls,
std::chrono::milliseconds file_flush_interval_msec);
~Worker();

Event::Dispatcher& dispatcher() { return handler_->dispatcher(); }
Expand Down
6 changes: 3 additions & 3 deletions test/common/api/api_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
namespace Api {

TEST(ApiImplTest, readFileToEnd) {
Impl api;
Impl api(std::chrono::milliseconds(10000));

const std::string file_path = "/tmp/test_api_envoy";
unlink(file_path.c_str());
Expand All @@ -17,10 +17,10 @@ TEST(ApiImplTest, readFileToEnd) {
}

TEST(ApiImplTest, fileExists) {
Impl api;
Impl api(std::chrono::milliseconds(10000));

EXPECT_TRUE(api.fileExists("/dev/null"));
EXPECT_FALSE(api.fileExists("/dev/blahblahblah"));
}

} // Api
} // Api
4 changes: 3 additions & 1 deletion test/common/filesystem/filesystem_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ TEST(FileSystemImpl, BadFile) {
Thread::MutexBasicLockable lock;
Stats::IsolatedStoreImpl store;
Filesystem::OsSysCallsImpl os_sys_calls;
EXPECT_THROW(Filesystem::FileImpl("", dispatcher, lock, os_sys_calls, store), EnvoyException);
EXPECT_THROW(Filesystem::FileImpl("", dispatcher, lock, os_sys_calls, store,
std::chrono::milliseconds(10000)),
EnvoyException);
}

TEST(FileSystemImpl, fileExists) {
Expand Down
2 changes: 1 addition & 1 deletion test/common/network/dns_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include "common/api/api_impl.h"

TEST(DnsImplTest, LocalAsyncLookup) {
Api::Impl api;
Api::Impl api(std::chrono::milliseconds(10000));
Event::DispatcherPtr dispatcher = api.allocateDispatcher();
Network::DnsResolverPtr resolver = dispatcher->createDnsResolver();

Expand Down
4 changes: 2 additions & 2 deletions test/integration/fake_upstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ FakeUpstream::FakeUpstream(Ssl::ServerContext* ssl_ctx, uint32_t port,

FakeUpstream::FakeUpstream(Ssl::ServerContext* ssl_ctx, Network::ListenSocketPtr&& listen_socket,
FakeHttpConnection::Type type)
: ssl_ctx_(ssl_ctx), socket_(std::move(listen_socket)), handler_(stats_store_, log()),
http_type_(type) {
: ssl_ctx_(ssl_ctx), socket_(std::move(listen_socket)),
handler_(stats_store_, log(), std::chrono::milliseconds(10000)), http_type_(type) {
thread_.reset(new Thread::Thread([this]() -> void { threadRoutine(); }));
server_initialized_.waitReady();
}
Expand Down
3 changes: 2 additions & 1 deletion test/integration/integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ void IntegrationTcpClient::ConnectionCallbacks::onEvent(uint32_t events) {
}

BaseIntegrationTest::BaseIntegrationTest()
: api_(new Api::Impl()), dispatcher_(api_->allocateDispatcher()) {
: api_(new Api::Impl(std::chrono::milliseconds(10000))),
dispatcher_(api_->allocateDispatcher()) {

// This is a hack, but there are situations where we disconnect fake upstream connections and
// then we expect the server connection pool to get the disconnect before the next test starts.
Expand Down
3 changes: 3 additions & 0 deletions test/integration/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ class TestOptionsImpl : public Options {
const std::string& serviceClusterName() override { return cluster_name_; }
const std::string& serviceNodeName() override { return node_name_; }
const std::string& serviceZone() override { return zone_name_; }
std::chrono::milliseconds fileFlushIntervalMsec() override {
return std::chrono::milliseconds(10000);
}

private:
const std::string config_path_;
Expand Down
4 changes: 2 additions & 2 deletions test/integration/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ BufferingStreamDecoderPtr IntegrationUtil::makeSingleRequest(uint32_t port, std:
std::string url,
Http::CodecClient::Type type,
std::string host) {
Api::Impl api;
Api::Impl api(std::chrono::milliseconds(9000));
Event::DispatcherPtr dispatcher(api.allocateDispatcher());
Stats::IsolatedStoreImpl stats_store;
Http::CodecClientStats stats{ALL_CODEC_CLIENT_STATS(POOL_COUNTER(stats_store))};
Expand All @@ -66,7 +66,7 @@ BufferingStreamDecoderPtr IntegrationUtil::makeSingleRequest(uint32_t port, std:

RawConnectionDriver::RawConnectionDriver(uint32_t port, Buffer::Instance& initial_data,
ReadCallback data_callback) {
api_.reset(new Api::Impl());
api_.reset(new Api::Impl(std::chrono::milliseconds(10000)));
dispatcher_ = api_->allocateDispatcher();
client_ = dispatcher_->createClientConnection(fmt::format("tcp://127.0.0.1:{}", port));
client_->addReadFilter(Network::ReadFilterPtr{new ForwardingFilter(*this, data_callback)});
Expand Down
1 change: 1 addition & 0 deletions test/mocks/server/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class MockOptions : public Options {
MOCK_METHOD0(serviceClusterName, const std::string&());
MOCK_METHOD0(serviceNodeName, const std::string&());
MOCK_METHOD0(serviceZone, const std::string&());
MOCK_METHOD0(fileFlushIntervalMsec, std::chrono::milliseconds());
};

class MockAdmin : public Admin {
Expand Down
3 changes: 3 additions & 0 deletions test/server/options_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ TEST(OptionsImplTest, All) {
argv.push_back("node");
argv.push_back("--service-zone");
argv.push_back("zone");
argv.push_back("--file-flush-interval-msec");
argv.push_back("9000");
OptionsImpl options(argv.size(), const_cast<char**>(&argv[0]), "1", spdlog::level::notice);
EXPECT_EQ(2U, options.concurrency());
EXPECT_EQ("hello", options.configPath());
Expand All @@ -25,4 +27,5 @@ TEST(OptionsImplTest, All) {
EXPECT_EQ("cluster", options.serviceClusterName());
EXPECT_EQ("node", options.serviceNodeName());
EXPECT_EQ("zone", options.serviceZone());
EXPECT_EQ(std::chrono::milliseconds(9000), options.fileFlushIntervalMsec());
}

0 comments on commit 5cf9bd7

Please sign in to comment.