Skip to content

Commit

Permalink
adaptive concurrency: Disregard samples from previous minRTT epoch (e…
Browse files Browse the repository at this point in the history
…nvoyproxy#11579)

Signed-off-by: Tony Allen <[email protected]>
Signed-off-by: yashwant121 <[email protected]>
  • Loading branch information
tonya11en authored and yashwant121 committed Jun 24, 2020
1 parent 615a0c3 commit 73eae45
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 72 deletions.
3 changes: 3 additions & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ Bug Fixes
---------
*Changes expected to improve the state of the world and are unlikely to have negative effects*

* adaptive concurrency: fixed a minRTT calculation bug where requests started before the concurrency
limit was pinned to the minimum would skew the new minRTT value if the replies arrived after the
start of the new minRTT window.
* grpc-json: fix a bug when in trailers only gRPC response (e.g. error) HTTP status code is not being re-written.
* http: fixed a bug in the grpc_http1_reverse_bridge filter where header-only requests were forwarded with a non-zero content length.
* http: fixed a bug where in some cases slash was moved from path to query string when :ref:`merging of adjacent slashes<envoy_api_field_config.filter.network.http_connection_manager.v2.HttpConnectionManager.merge_slashes>` is enabled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,11 @@ Http::FilterHeadersStatus AdaptiveConcurrencyFilter::decodeHeaders(Http::Request
return Http::FilterHeadersStatus::StopIteration;
}

// When the deferred_sample_task_ object is destroyed, the time difference between its destruction
// and the request start time is measured as the request latency. This value is sampled by the
// concurrency controller either when encoding is complete or during destruction of this filter
// object.
// When the deferred_sample_task_ object is destroyed, the request start time is sampled. This
// occurs either when encoding is complete or during destruction of this filter object.
const auto now = config_->timeSource().monotonicTime();
deferred_sample_task_ =
std::make_unique<Cleanup>([this, rq_start_time = config_->timeSource().monotonicTime()]() {
const auto now = config_->timeSource().monotonicTime();
const std::chrono::nanoseconds rq_latency = now - rq_start_time;
controller_->recordLatencySample(rq_latency);
});
std::make_unique<Cleanup>([this, now]() { controller_->recordLatencySample(now); });

return Http::FilterHeadersStatus::Continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ Http::FilterFactoryCb AdaptiveConcurrencyFilterFactory::createFilterFactoryFromP
Controller::GradientControllerConfig(config.gradient_controller_config(), context.runtime());
controller = std::make_shared<Controller::GradientController>(
std::move(gradient_controller_config), context.dispatcher(), context.runtime(),
acc_stats_prefix + "gradient_controller.", context.scope(), context.random());
acc_stats_prefix + "gradient_controller.", context.scope(), context.random(),
context.timeSource());

AdaptiveConcurrencyFilterConfigSharedPtr filter_config(
new AdaptiveConcurrencyFilterConfig(config, context.runtime(), std::move(acc_stats_prefix),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ envoy_cc_library(
"libcircllhist",
],
deps = [
"//include/envoy/common:time_interface",
"//source/common/event:dispatcher_lib",
"//source/common/protobuf",
"//source/common/runtime:runtime_lib",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <chrono>

#include "envoy/common/pure.h"
#include "envoy/common/time.h"

namespace Envoy {
namespace Extensions {
Expand Down Expand Up @@ -41,9 +42,9 @@ class ConcurrencyController {
* request latency to update the internal state of the controller for
* concurrency limit calculations.
*
* @param rq_latency is the clocked round-trip time for the request.
* @param rq_send_time the time point which the sampled request was sent
*/
virtual void recordLatencySample(std::chrono::nanoseconds rq_latency) PURE;
virtual void recordLatencySample(MonotonicTime rq_send_time) PURE;

/**
* Omit sampling an outstanding request and update the internal state of the controller to reflect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ GradientControllerConfig::GradientControllerConfig(
GradientController::GradientController(GradientControllerConfig config,
Event::Dispatcher& dispatcher, Runtime::Loader&,
const std::string& stats_prefix, Stats::Scope& scope,
Runtime::RandomGenerator& random)
Runtime::RandomGenerator& random, TimeSource& time_source)
: config_(std::move(config)), dispatcher_(dispatcher), scope_(scope),
stats_(generateStats(scope_, stats_prefix)), random_(random), deferred_limit_value_(0),
num_rq_outstanding_(0), concurrency_limit_(config_.minConcurrency()),
stats_(generateStats(scope_, stats_prefix)), random_(random), time_source_(time_source),
deferred_limit_value_(0), num_rq_outstanding_(0),
concurrency_limit_(config_.minConcurrency()),
latency_sample_hist_(hist_fast_alloc(), hist_free) {
min_rtt_calc_timer_ = dispatcher_.createTimer([this]() -> void { enterMinRTTSamplingWindow(); });

Expand Down Expand Up @@ -102,6 +103,8 @@ void GradientController::enterMinRTTSamplingWindow() {
// Throw away any latency samples from before the recalculation window as it may not represent
// the minRTT.
hist_clear(latency_sample_hist_.get());

min_rtt_epoch_ = time_source_.monotonicTime();
}

void GradientController::updateMinRTT() {
Expand Down Expand Up @@ -192,16 +195,22 @@ RequestForwardingAction GradientController::forwardingDecision() {
return RequestForwardingAction::Block;
}

void GradientController::recordLatencySample(std::chrono::nanoseconds rq_latency) {
const uint32_t latency_usec =
std::chrono::duration_cast<std::chrono::microseconds>(rq_latency).count();
void GradientController::recordLatencySample(MonotonicTime rq_send_time) {
ASSERT(num_rq_outstanding_.load() > 0);
--num_rq_outstanding_;

if (rq_send_time < min_rtt_epoch_) {
// Disregard samples from requests started in the previous minRTT window.
return;
}

const std::chrono::microseconds rq_latency =
std::chrono::duration_cast<std::chrono::microseconds>(time_source_.monotonicTime() -
rq_send_time);
uint32_t sample_count;
{
absl::MutexLock ml(&sample_mutation_mtx_);
hist_insert(latency_sample_hist_.get(), latency_usec, 1);
hist_insert(latency_sample_hist_.get(), rq_latency.count(), 1);
sample_count = hist_sample_count(latency_sample_hist_.get());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <chrono>
#include <vector>

#include "envoy/common/time.h"
#include "envoy/event/dispatcher.h"
#include "envoy/extensions/filters/http/adaptive_concurrency/v3/adaptive_concurrency.pb.h"
#include "envoy/runtime/runtime.h"
Expand Down Expand Up @@ -210,11 +211,11 @@ class GradientController : public ConcurrencyController {
public:
GradientController(GradientControllerConfig config, Event::Dispatcher& dispatcher,
Runtime::Loader& runtime, const std::string& stats_prefix, Stats::Scope& scope,
Runtime::RandomGenerator& random);
Runtime::RandomGenerator& random, TimeSource& time_source);

// ConcurrencyController.
RequestForwardingAction forwardingDecision() override;
void recordLatencySample(std::chrono::nanoseconds rq_latency) override;
void recordLatencySample(MonotonicTime rq_send_time) override;
void cancelLatencySample() override;
uint32_t concurrencyLimit() const override { return concurrency_limit_.load(); }

Expand All @@ -238,6 +239,7 @@ class GradientController : public ConcurrencyController {
Stats::Scope& scope_;
GradientControllerStats stats_;
Runtime::RandomGenerator& random_;
TimeSource& time_source_;

// Protects data related to latency sampling and RTT values. In addition to protecting the latency
// sample histogram, the mutex ensures that the minRTT calculation window and the sample window
Expand Down Expand Up @@ -274,6 +276,10 @@ class GradientController : public ConcurrencyController {
// after remaining at the minimum limit for too long.
uint32_t consecutive_min_concurrency_set_ ABSL_GUARDED_BY(sample_mutation_mtx_);

// We will disregard sampling any requests admitted before this timestamp to prevent sampling
// requests admitted before the start of a minRTT window and potentially skewing the minRTT.
MonotonicTime min_rtt_epoch_;

Event::TimerPtr min_rtt_calc_timer_;
Event::TimerPtr sample_reset_timer_;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class MockConcurrencyController : public Controller::ConcurrencyController {
public:
MOCK_METHOD(RequestForwardingAction, forwardingDecision, ());
MOCK_METHOD(void, cancelLatencySample, ());
MOCK_METHOD(void, recordLatencySample, (std::chrono::nanoseconds));
MOCK_METHOD(void, recordLatencySample, (MonotonicTime));

uint32_t concurrencyLimit() const override { return 0; }
};
Expand Down Expand Up @@ -223,12 +223,12 @@ TEST_F(AdaptiveConcurrencyFilterTest, OnDestroyCleanupTest) {
.WillOnce(Return(RequestForwardingAction::Forward));
EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers, true));

const auto advance_time = std::chrono::nanoseconds(42);
time_system_.advanceTimeWait(advance_time);
const auto rq_rcv_time = time_system_.monotonicTime();
time_system_.advanceTimeWait(std::chrono::nanoseconds(42));

Http::TestResponseHeaderMapImpl response_headers;
EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers, true));
EXPECT_CALL(*controller_, recordLatencySample(advance_time));
EXPECT_CALL(*controller_, recordLatencySample(rq_rcv_time));
filter_->encodeComplete();

filter_->onDestroy();
Expand All @@ -248,16 +248,16 @@ TEST_F(AdaptiveConcurrencyFilterTest, EncodeHeadersValidTestWithBody) {
Http::TestRequestTrailerMapImpl request_trailers;
EXPECT_EQ(Http::FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers));

const auto advance_time = std::chrono::nanoseconds(42);
const auto rq_rcv_time = time_system_.monotonicTime();
mt = time_system_.monotonicTime();
time_system_.setMonotonicTime(mt + advance_time);
time_system_.setMonotonicTime(mt + std::chrono::nanoseconds(42));

Http::TestResponseHeaderMapImpl response_headers;
EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers, false));
EXPECT_EQ(Http::FilterDataStatus::Continue, filter_->encodeData(data, false));
Http::TestResponseTrailerMapImpl response_trailers;
EXPECT_EQ(Http::FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers));
EXPECT_CALL(*controller_, recordLatencySample(advance_time));
EXPECT_CALL(*controller_, recordLatencySample(rq_rcv_time));
filter_->encodeComplete();
}

Expand All @@ -271,13 +271,13 @@ TEST_F(AdaptiveConcurrencyFilterTest, EncodeHeadersValidTest) {
.WillOnce(Return(RequestForwardingAction::Forward));
EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers, true));

const auto advance_time = std::chrono::nanoseconds(42);
const auto rq_rcv_time = time_system_.monotonicTime();
mt = time_system_.monotonicTime();
time_system_.setMonotonicTime(mt + advance_time);
time_system_.setMonotonicTime(mt + std::chrono::nanoseconds(42));

Http::TestResponseHeaderMapImpl response_headers;
EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers, true));
EXPECT_CALL(*controller_, recordLatencySample(advance_time));
EXPECT_CALL(*controller_, recordLatencySample(rq_rcv_time));
filter_->encodeComplete();
}

Expand Down
Loading

0 comments on commit 73eae45

Please sign in to comment.