Skip to content

Commit

Permalink
Stats: Filter stats to be flushed to sinks (#18805)
Browse files Browse the repository at this point in the history
Provide the ability to filter stats to be flushed to sinks to reduce CPU usage for the periodic stats aggregation process.

Commit Message:

Additional Description:

Envoy stats are periodically flushed to stat sinks (default cadence of 5s) on the main thread. The number of stats scales linearly with the number of clusters, as approximately 100 stats are replicated for each cluster. For high counts of clusters (order of 10k), the flushing of stats dominates CPU usage on the main thread. Being tied up in stats flushing can prevent the main thread from processing xDS updates in a timely manner, or even starve worker threads of CPU if the CPU is overcommitted.

Usually, the number of stats of interest can be an order of magnitude lower than the number of stats. There is a mechanism to reject unwanted stats, but doing so will also make them unavailable for viewing in the admin console, which could hinder debuggability. Further, Envoy actually needs some of its stats to run (see for eg. #14610) which is currently an open bug.

See the design doc below for more details:
https://docs.google.com/document/d/1lzMvRlU5xY0yezpqA75N6kU747GY7I_WeGpBXiPaP5M/edit#heading=h.xgjl2srtytjt

Risk Level: Low
Testing: Added tests
Docs Changes: NA
Release Notes: NA
Platform Specific Features: NA

See below benchmark results from //test/server:server_stats_flush_benchmark
```
----------------------------------------------------------------------------------
Benchmark                                        Time             CPU   Iterations
----------------------------------------------------------------------------------
bmFlushToSinks/10                            0.003 ms        0.003 ms       247626
bmFlushToSinks/100                           0.019 ms        0.019 ms        36474
bmFlushToSinks/1000                          0.193 ms        0.193 ms         3622
bmFlushToSinks/10000                          2.25 ms         2.25 ms          299
bmFlushToSinks/100000                         61.8 ms         61.8 ms           10
bmFlushToSinks/1000000                        1212 ms         1212 ms            1
bmFlushToSinksWithPredicatesSet/10           0.001 ms        0.001 ms       496056
bmFlushToSinksWithPredicatesSet/100          0.007 ms        0.007 ms       104775
bmFlushToSinksWithPredicatesSet/1000         0.067 ms        0.067 ms        10411
bmFlushToSinksWithPredicatesSet/10000        0.704 ms        0.704 ms          938
bmFlushToSinksWithPredicatesSet/100000        28.0 ms         28.0 ms           25
bmFlushToSinksWithPredicatesSet/1000000        484 ms          484 ms            2
```

Signed-off-by: Pradeep Rao <[email protected]>
  • Loading branch information
pradeepcrao authored Nov 30, 2021
1 parent c07bfee commit 55a97dd
Show file tree
Hide file tree
Showing 23 changed files with 527 additions and 94 deletions.
12 changes: 12 additions & 0 deletions envoy/server/instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <chrono>
#include <cstdint>
#include <memory>
#include <string>

#include "envoy/access_log/access_log.h"
Expand Down Expand Up @@ -31,6 +32,11 @@
#include "envoy/upstream/cluster_manager.h"

namespace Envoy {

namespace Stats {
class SinkPredicates;
}

namespace Server {

/**
Expand Down Expand Up @@ -269,6 +275,12 @@ class Instance {
* TODO(mattklein123): This can be removed when version 1.20.0 is no longer supported.
*/
virtual bool enableReusePortDefault() PURE;

/**
* Set predicates for filtering counters, gauges and text readouts to be flushed to sinks.
*/
virtual void
setSinkPredicates(std::unique_ptr<Envoy::Stats::SinkPredicates>&& sink_predicates) PURE;
};

} // namespace Server
Expand Down
36 changes: 26 additions & 10 deletions envoy/stats/allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
namespace Envoy {
namespace Stats {

class Sink;
class SinkPredicates;

/**
* Abstract interface for allocating statistics. Implementations can
* be created utilizing a single fixed-size block suitable for
Expand Down Expand Up @@ -70,19 +73,32 @@ class Allocator {
virtual void markTextReadoutForDeletion(const TextReadoutSharedPtr& text_readout) PURE;

/**
* Iterate over all stats that need to be added to a sink. Note, that implementations can
* Iterate over all stats. Note, that implementations can potentially hold on to a mutex that
* will deadlock if the passed in functors try to create or delete a stat.
* @param f_size functor that is provided the current number of all stats. Note that this is
* called only once, prior to any calls to f_stat.
* @param f_stat functor that is provided one stat at a time from the stats container.
*/
virtual void forEachCounter(SizeFn f_size, StatFn<Counter> f_stat) const PURE;
virtual void forEachGauge(SizeFn f_size, StatFn<Gauge> f_stat) const PURE;
virtual void forEachTextReadout(SizeFn f_size, StatFn<TextReadout> f_stat) const PURE;

/**
* Iterate over all stats that need to be flushed to sinks. Note, that implementations can
* potentially hold on to a mutex that will deadlock if the passed in functors try to create
* or delete a stat.
* @param f_size functor that is provided the number of all stats in the sink. Note this is
* called only once, prior to any calls to f_stat.
* @param f_stat functor that is provided one stat in the sink at a time.
* @param f_size functor that is provided the number of all stats that will be flushed to sinks.
* Note that this is called only once, prior to any calls to f_stat.
* @param f_stat functor that is provided one stat that will be flushed to sinks, at a time.
*/
virtual void forEachSinkedCounter(SizeFn f_size, StatFn<Counter> f_stat) const PURE;
virtual void forEachSinkedGauge(SizeFn f_size, StatFn<Gauge> f_stat) const PURE;
virtual void forEachSinkedTextReadout(SizeFn f_size, StatFn<TextReadout> f_stat) const PURE;

/**
* Set the predicates to filter counters, gauges and text readouts for sink.
*/
virtual void forEachCounter(std::function<void(std::size_t)> f_size,
std::function<void(Stats::Counter&)> f_stat) const PURE;
virtual void forEachGauge(std::function<void(std::size_t)> f_size,
std::function<void(Stats::Gauge&)> f_stat) const PURE;
virtual void forEachTextReadout(std::function<void(std::size_t)> f_size,
std::function<void(Stats::TextReadout&)> f_stat) const PURE;
virtual void setSinkPredicates(std::unique_ptr<SinkPredicates>&& sink_predicates) PURE;

// TODO(jmarantz): create a parallel mechanism to instantiate histograms. At
// the moment, histograms don't fit the same pattern of counters and gauges
Expand Down
23 changes: 23 additions & 0 deletions envoy/stats/sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,29 @@ class MetricSnapshot {
virtual SystemTime snapshotTime() const PURE;
};

/**
* A class to define predicates to filter counters, gauges and text readouts for flushing to sinks.
*/
class SinkPredicates {
public:
virtual ~SinkPredicates() = default;

/**
* @return true if @param counter needs to be flushed to sinks.
*/
virtual bool includeCounter(const Counter& counter) PURE;

/**
* @return true if @param gague needs to be flushed to sinks.
*/
virtual bool includeGauge(const Gauge& gauge) PURE;

/**
* @return true if @param text_readout needs to be flushed to sinks.
*/
virtual bool includeTextReadout(const TextReadout& text_readout) PURE;
};

/**
* A sink for stats. Each sink is responsible for writing stats to a backing store.
*/
Expand Down
10 changes: 10 additions & 0 deletions envoy/stats/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,5 +190,15 @@ class TextReadout : public virtual Metric {

using TextReadoutSharedPtr = RefcountPtr<TextReadout>;

/**
* Callback invoked to provide size of stats container.
*/
using SizeFn = std::function<void(std::size_t)>;

/**
* Callback invoked for each stat during iteration.
*/
template <typename Stat> using StatFn = std::function<void(Stat&)>;

} // namespace Stats
} // namespace Envoy
40 changes: 28 additions & 12 deletions envoy/stats/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class Instance;
namespace Stats {

class Sink;
class SinkPredicates;

/**
* A store for all known counters, gauges, and timers.
Expand Down Expand Up @@ -51,20 +52,27 @@ class Store : public Scope {
virtual std::vector<ParentHistogramSharedPtr> histograms() const PURE;

/**
* Iterate over all stats that need to be added to a sink. Note, that implementations can
* potentially hold on to a mutex that will deadlock if the passed in functors try to create
* or delete a stat.
* @param f_size functor that is provided the number of all stats in the sink.
* @param f_stat functor that is provided one stat in the sink at a time.
* Iterate over all stats. Note, that implementations can potentially hold on to a mutex that
* will deadlock if the passed in functors try to create or delete a stat.
* @param f_size functor that is provided the current number of all stats. Note that this is
* called only once, prior to any calls to f_stat.
* @param f_stat functor that is provided one stat at a time from the stats container.
*/
virtual void forEachCounter(std::function<void(std::size_t)> f_size,
std::function<void(Stats::Counter&)> f_stat) const PURE;

virtual void forEachGauge(std::function<void(std::size_t)> f_size,
std::function<void(Stats::Gauge&)> f_stat) const PURE;
virtual void forEachCounter(SizeFn f_size, StatFn<Counter> f_stat) const PURE;
virtual void forEachGauge(SizeFn f_size, StatFn<Gauge> f_stat) const PURE;
virtual void forEachTextReadout(SizeFn f_size, StatFn<TextReadout> f_stat) const PURE;

virtual void forEachTextReadout(std::function<void(std::size_t)> f_size,
std::function<void(Stats::TextReadout&)> f_stat) const PURE;
/**
* Iterate over all stats that need to be flushed to sinks. Note, that implementations can
* potentially hold on to a mutex that will deadlock if the passed in functors try to create
* or delete a stat.
* @param f_size functor that is provided the number of all stats that will be flushed to sinks.
* Note that this is called only once, prior to any calls to f_stat.
* @param f_stat functor that is provided one stat that will be flushed to sinks, at a time.
*/
virtual void forEachSinkedCounter(SizeFn f_size, StatFn<Counter> f_stat) const PURE;
virtual void forEachSinkedGauge(SizeFn f_size, StatFn<Gauge> f_stat) const PURE;
virtual void forEachSinkedTextReadout(SizeFn f_size, StatFn<TextReadout> f_stat) const PURE;
};

using StorePtr = std::unique_ptr<Store>;
Expand Down Expand Up @@ -123,6 +131,14 @@ class StoreRoot : public Store {
* method would be asserted.
*/
virtual void mergeHistograms(PostMergeCb merge_complete_cb) PURE;

/**
* Set predicates for filtering counters, gauges and text readouts to be flushed to sinks.
* Note that if the sink predicates object is set, we do not send non-sink stats over to the
* child process during hot restart. This will result in the admin stats console being wrong
* during hot restart.
*/
virtual void setSinkPredicates(std::unique_ptr<SinkPredicates>&& sink_predicates) PURE;
};

using StoreRootPtr = std::unique_ptr<StoreRoot>;
Expand Down
94 changes: 88 additions & 6 deletions source/common/stats/allocator_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <algorithm>
#include <cstdint>

#include "envoy/stats/sink.h"
#include "envoy/stats/stats.h"
#include "envoy/stats/symbol_table.h"

Expand Down Expand Up @@ -144,6 +145,7 @@ class CounterImpl : public StatsSharedImpl<Counter> {
void removeFromSetLockHeld() ABSL_EXCLUSIVE_LOCKS_REQUIRED(alloc_.mutex_) override {
const size_t count = alloc_.counters_.erase(statName());
ASSERT(count == 1);
alloc_.sinked_counters_.erase(this);
}

// Stats::Counter
Expand Down Expand Up @@ -188,6 +190,7 @@ class GaugeImpl : public StatsSharedImpl<Gauge> {
void removeFromSetLockHeld() override ABSL_EXCLUSIVE_LOCKS_REQUIRED(alloc_.mutex_) {
const size_t count = alloc_.gauges_.erase(statName());
ASSERT(count == 1);
alloc_.sinked_gauges_.erase(this);
}

// Stats::Gauge
Expand Down Expand Up @@ -260,6 +263,7 @@ class TextReadoutImpl : public StatsSharedImpl<TextReadout> {
void removeFromSetLockHeld() ABSL_EXCLUSIVE_LOCKS_REQUIRED(alloc_.mutex_) override {
const size_t count = alloc_.text_readouts_.erase(statName());
ASSERT(count == 1);
alloc_.sinked_text_readouts_.erase(this);
}

// Stats::TextReadout
Expand Down Expand Up @@ -289,6 +293,11 @@ CounterSharedPtr AllocatorImpl::makeCounter(StatName name, StatName tag_extracte
}
auto counter = CounterSharedPtr(makeCounterInternal(name, tag_extracted_name, stat_name_tags));
counters_.insert(counter.get());
// Add counter to sinked_counters_ if it matches the sink predicate.
if (sink_predicates_ != nullptr && sink_predicates_->includeCounter(*counter)) {
auto val = sinked_counters_.insert(counter.get());
ASSERT(val.second);
}
return counter;
}

Expand All @@ -305,6 +314,11 @@ GaugeSharedPtr AllocatorImpl::makeGauge(StatName name, StatName tag_extracted_na
auto gauge =
GaugeSharedPtr(new GaugeImpl(name, *this, tag_extracted_name, stat_name_tags, import_mode));
gauges_.insert(gauge.get());
// Add gauge to sinked_gauges_ if it matches the sink predicate.
if (sink_predicates_ != nullptr && sink_predicates_->includeGauge(*gauge)) {
auto val = sinked_gauges_.insert(gauge.get());
ASSERT(val.second);
}
return gauge;
}

Expand All @@ -320,6 +334,11 @@ TextReadoutSharedPtr AllocatorImpl::makeTextReadout(StatName name, StatName tag_
auto text_readout =
TextReadoutSharedPtr(new TextReadoutImpl(name, *this, tag_extracted_name, stat_name_tags));
text_readouts_.insert(text_readout.get());
// Add text_readout to sinked_text_readouts_ if it matches the sink predicate.
if (sink_predicates_ != nullptr && sink_predicates_->includeTextReadout(*text_readout)) {
auto val = sinked_text_readouts_.insert(text_readout.get());
ASSERT(val.second);
}
return text_readout;
}

Expand All @@ -336,8 +355,7 @@ Counter* AllocatorImpl::makeCounterInternal(StatName name, StatName tag_extracte
return new CounterImpl(name, *this, tag_extracted_name, stat_name_tags);
}

void AllocatorImpl::forEachCounter(std::function<void(std::size_t)> f_size,
std::function<void(Stats::Counter&)> f_stat) const {
void AllocatorImpl::forEachCounter(SizeFn f_size, StatFn<Counter> f_stat) const {
Thread::LockGuard lock(mutex_);
if (f_size != nullptr) {
f_size(counters_.size());
Expand All @@ -347,8 +365,7 @@ void AllocatorImpl::forEachCounter(std::function<void(std::size_t)> f_size,
}
}

void AllocatorImpl::forEachGauge(std::function<void(std::size_t)> f_size,
std::function<void(Stats::Gauge&)> f_stat) const {
void AllocatorImpl::forEachGauge(SizeFn f_size, StatFn<Gauge> f_stat) const {
Thread::LockGuard lock(mutex_);
if (f_size != nullptr) {
f_size(gauges_.size());
Expand All @@ -358,8 +375,7 @@ void AllocatorImpl::forEachGauge(std::function<void(std::size_t)> f_size,
}
}

void AllocatorImpl::forEachTextReadout(std::function<void(std::size_t)> f_size,
std::function<void(Stats::TextReadout&)> f_stat) const {
void AllocatorImpl::forEachTextReadout(SizeFn f_size, StatFn<TextReadout> f_stat) const {
Thread::LockGuard lock(mutex_);
if (f_size != nullptr) {
f_size(text_readouts_.size());
Expand All @@ -369,6 +385,69 @@ void AllocatorImpl::forEachTextReadout(std::function<void(std::size_t)> f_size,
}
}

void AllocatorImpl::forEachSinkedCounter(SizeFn f_size, StatFn<Counter> f_stat) const {
if (sink_predicates_ != nullptr) {
Thread::LockGuard lock(mutex_);
f_size(sinked_counters_.size());
for (auto counter : sinked_counters_) {
f_stat(*counter);
}
} else {
forEachCounter(f_size, f_stat);
}
}

void AllocatorImpl::forEachSinkedGauge(SizeFn f_size, StatFn<Gauge> f_stat) const {
if (sink_predicates_ != nullptr) {
Thread::LockGuard lock(mutex_);
f_size(sinked_gauges_.size());
for (auto gauge : sinked_gauges_) {
f_stat(*gauge);
}
} else {
forEachGauge(f_size, f_stat);
}
}

void AllocatorImpl::forEachSinkedTextReadout(SizeFn f_size, StatFn<TextReadout> f_stat) const {
if (sink_predicates_ != nullptr) {
Thread::LockGuard lock(mutex_);
f_size(sinked_text_readouts_.size());
for (auto text_readout : sinked_text_readouts_) {
f_stat(*text_readout);
}
} else {
forEachTextReadout(f_size, f_stat);
}
}

void AllocatorImpl::setSinkPredicates(std::unique_ptr<SinkPredicates>&& sink_predicates) {
Thread::LockGuard lock(mutex_);
ASSERT(sink_predicates_ == nullptr);
sink_predicates_ = std::move(sink_predicates);
sinked_counters_.clear();
sinked_gauges_.clear();
sinked_text_readouts_.clear();
// Add counters to the set of sinked counters.
for (auto& counter : counters_) {
if (sink_predicates_->includeCounter(*counter)) {
sinked_counters_.emplace(counter);
}
}
// Add gauges to the set of sinked gauges.
for (auto& gauge : gauges_) {
if (sink_predicates_->includeGauge(*gauge)) {
sinked_gauges_.insert(gauge);
}
}
// Add text_readouts to the set of sinked text readouts.
for (auto& text_readout : text_readouts_) {
if (sink_predicates_->includeTextReadout(*text_readout)) {
sinked_text_readouts_.insert(text_readout);
}
}
}

void AllocatorImpl::markCounterForDeletion(const CounterSharedPtr& counter) {
Thread::LockGuard lock(mutex_);
auto iter = counters_.find(counter->statName());
Expand All @@ -380,6 +459,7 @@ void AllocatorImpl::markCounterForDeletion(const CounterSharedPtr& counter) {
// Duplicates are ASSERTed in ~AllocatorImpl.
deleted_counters_.emplace_back(*iter);
counters_.erase(iter);
sinked_counters_.erase(counter.get());
}

void AllocatorImpl::markGaugeForDeletion(const GaugeSharedPtr& gauge) {
Expand All @@ -393,6 +473,7 @@ void AllocatorImpl::markGaugeForDeletion(const GaugeSharedPtr& gauge) {
// Duplicates are ASSERTed in ~AllocatorImpl.
deleted_gauges_.emplace_back(*iter);
gauges_.erase(iter);
sinked_gauges_.erase(gauge.get());
}

void AllocatorImpl::markTextReadoutForDeletion(const TextReadoutSharedPtr& text_readout) {
Expand All @@ -406,6 +487,7 @@ void AllocatorImpl::markTextReadoutForDeletion(const TextReadoutSharedPtr& text_
// Duplicates are ASSERTed in ~AllocatorImpl.
deleted_text_readouts_.emplace_back(*iter);
text_readouts_.erase(iter);
sinked_text_readouts_.erase(text_readout.get());
}

} // namespace Stats
Expand Down
Loading

0 comments on commit 55a97dd

Please sign in to comment.