Skip to content

Commit

Permalink
Implementing drop_overload stats reporting in load report service API (
Browse files Browse the repository at this point in the history
…envoyproxy#31384)


---------

Signed-off-by: Yanjun Xiang <[email protected]>
  • Loading branch information
yanjunxiang-google authored Dec 16, 2023
1 parent 2208327 commit 163a626
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 1 deletion.
9 changes: 9 additions & 0 deletions source/common/upstream/load_stats_reporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ void LoadStatsReporter::sendLoadStatsRequest() {
}
cluster_stats->set_total_dropped_requests(
cluster.info()->loadReportStats().upstream_rq_dropped_.latch());
const uint64_t drop_overload_count =
cluster.info()->loadReportStats().upstream_rq_drop_overload_.latch();
if (drop_overload_count > 0) {
auto* dropped_request = cluster_stats->add_dropped_requests();
dropped_request->set_category("drop_overload");
dropped_request->set_dropped_count(drop_overload_count);
}

const auto now = time_source_.monotonicTime().time_since_epoch();
const auto measured_interval = now - cluster_name_and_timestamp.second;
cluster_stats->mutable_load_report_interval()->MergeFrom(
Expand Down Expand Up @@ -214,6 +222,7 @@ void LoadStatsReporter::startLoadReportPeriod() {
}
}
cluster.info()->loadReportStats().upstream_rq_dropped_.latch();
cluster.info()->loadReportStats().upstream_rq_drop_overload_.latch();
};
if (message_->send_all_clusters()) {
for (const auto& p : all_clusters.active_clusters_) {
Expand Down
20 changes: 20 additions & 0 deletions test/common/upstream/load_stats_reporter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ class LoadStatsReporterTest : public testing::Test {
load_stats_reporter_->onReceiveMessage(std::move(response));
}

void setDropOverload(envoy::config::endpoint::v3::ClusterStats& cluster_stats, uint64_t count) {
auto* dropped_request = cluster_stats.add_dropped_requests();
dropped_request->set_category("drop_overload");
dropped_request->set_dropped_count(count);
}

Event::SimulatedTimeSystem time_system_;
NiceMock<Upstream::MockClusterManager> cm_;
Event::MockDispatcher dispatcher_;
Expand Down Expand Up @@ -132,12 +138,14 @@ TEST_F(LoadStatsReporterTest, ExistingClusters) {
deliverLoadStatsResponse({"foo"});
// Initial stats report for foo on timer tick.
foo_cluster.info_->load_report_stats_.upstream_rq_dropped_.add(5);
foo_cluster.info_->load_report_stats_.upstream_rq_drop_overload_.add(7);
time_system_.setMonotonicTime(std::chrono::microseconds(4));
{
envoy::config::endpoint::v3::ClusterStats foo_cluster_stats;
foo_cluster_stats.set_cluster_name("foo");
foo_cluster_stats.set_cluster_service_name("bar");
foo_cluster_stats.set_total_dropped_requests(5);
setDropOverload(foo_cluster_stats, 7);
foo_cluster_stats.mutable_load_report_interval()->MergeFrom(
Protobuf::util::TimeUtil::MicrosecondsToDuration(1));
expectSendMessage({foo_cluster_stats});
Expand All @@ -148,13 +156,15 @@ TEST_F(LoadStatsReporterTest, ExistingClusters) {
// Some traffic on foo/bar in between previous request and next response.
foo_cluster.info_->load_report_stats_.upstream_rq_dropped_.add(1);
bar_cluster.info_->load_report_stats_.upstream_rq_dropped_.add(1);
bar_cluster.info_->load_report_stats_.upstream_rq_drop_overload_.add(5);

// Start reporting on bar.
time_system_.setMonotonicTime(std::chrono::microseconds(6));
deliverLoadStatsResponse({"foo", "bar"});
// Stats report foo/bar on timer tick.
foo_cluster.info_->load_report_stats_.upstream_rq_dropped_.add(1);
bar_cluster.info_->load_report_stats_.upstream_rq_dropped_.add(1);
bar_cluster.info_->load_report_stats_.upstream_rq_drop_overload_.add(3);
time_system_.setMonotonicTime(std::chrono::microseconds(28));
{
envoy::config::endpoint::v3::ClusterStats foo_cluster_stats;
Expand All @@ -166,6 +176,7 @@ TEST_F(LoadStatsReporterTest, ExistingClusters) {
envoy::config::endpoint::v3::ClusterStats bar_cluster_stats;
bar_cluster_stats.set_cluster_name("bar");
bar_cluster_stats.set_total_dropped_requests(1);
setDropOverload(bar_cluster_stats, 3);
bar_cluster_stats.mutable_load_report_interval()->MergeFrom(
Protobuf::util::TimeUtil::MicrosecondsToDuration(22));
expectSendMessage({bar_cluster_stats, foo_cluster_stats});
Expand All @@ -176,17 +187,20 @@ TEST_F(LoadStatsReporterTest, ExistingClusters) {
// Some traffic on foo/bar in between previous request and next response.
foo_cluster.info_->load_report_stats_.upstream_rq_dropped_.add(1);
bar_cluster.info_->load_report_stats_.upstream_rq_dropped_.add(1);
bar_cluster.info_->load_report_stats_.upstream_rq_drop_overload_.add(1);

// Stop reporting on foo.
deliverLoadStatsResponse({"bar"});
// Stats report for bar on timer tick.
foo_cluster.info_->load_report_stats_.upstream_rq_dropped_.add(5);
bar_cluster.info_->load_report_stats_.upstream_rq_dropped_.add(5);
bar_cluster.info_->load_report_stats_.upstream_rq_drop_overload_.add(7);
time_system_.setMonotonicTime(std::chrono::microseconds(33));
{
envoy::config::endpoint::v3::ClusterStats bar_cluster_stats;
bar_cluster_stats.set_cluster_name("bar");
bar_cluster_stats.set_total_dropped_requests(6);
setDropOverload(bar_cluster_stats, 8);
bar_cluster_stats.mutable_load_report_interval()->MergeFrom(
Protobuf::util::TimeUtil::MicrosecondsToDuration(5));
expectSendMessage({bar_cluster_stats});
Expand All @@ -196,25 +210,31 @@ TEST_F(LoadStatsReporterTest, ExistingClusters) {

// Some traffic on foo/bar in between previous request and next response.
foo_cluster.info_->load_report_stats_.upstream_rq_dropped_.add(1);
foo_cluster.info_->load_report_stats_.upstream_rq_drop_overload_.add(8);
bar_cluster.info_->load_report_stats_.upstream_rq_dropped_.add(1);
bar_cluster.info_->load_report_stats_.upstream_rq_drop_overload_.add(3);

// Start tracking foo again, we should forget earlier history for foo.
time_system_.setMonotonicTime(std::chrono::microseconds(43));
deliverLoadStatsResponse({"foo", "bar"});
// Stats report foo/bar on timer tick.
foo_cluster.info_->load_report_stats_.upstream_rq_dropped_.add(1);
foo_cluster.info_->load_report_stats_.upstream_rq_drop_overload_.add(9);
bar_cluster.info_->load_report_stats_.upstream_rq_dropped_.add(1);
bar_cluster.info_->load_report_stats_.upstream_rq_drop_overload_.add(4);
time_system_.setMonotonicTime(std::chrono::microseconds(47));
{
envoy::config::endpoint::v3::ClusterStats foo_cluster_stats;
foo_cluster_stats.set_cluster_name("foo");
foo_cluster_stats.set_cluster_service_name("bar");
foo_cluster_stats.set_total_dropped_requests(1);
setDropOverload(foo_cluster_stats, 9);
foo_cluster_stats.mutable_load_report_interval()->MergeFrom(
Protobuf::util::TimeUtil::MicrosecondsToDuration(4));
envoy::config::endpoint::v3::ClusterStats bar_cluster_stats;
bar_cluster_stats.set_cluster_name("bar");
bar_cluster_stats.set_total_dropped_requests(2);
setDropOverload(bar_cluster_stats, 7);
bar_cluster_stats.mutable_load_report_interval()->MergeFrom(
Protobuf::util::TimeUtil::MicrosecondsToDuration(14));
expectSendMessage({bar_cluster_stats, foo_cluster_stats});
Expand Down
59 changes: 58 additions & 1 deletion test/integration/load_stats_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,20 @@ class LoadStatsIntegrationTest : public Grpc::GrpcClientIntegrationParamTest,

cluster_stats->set_total_dropped_requests(cluster_stats->total_dropped_requests() +
local_cluster_stats.total_dropped_requests());
if (local_cluster_stats.dropped_requests().size() > 0) {
const uint64_t local_drop_count = local_cluster_stats.dropped_requests(0).dropped_count();
if (local_drop_count > 0) {
envoy::config::endpoint::v3::ClusterStats::DroppedRequests* drop_request;
if (cluster_stats->dropped_requests().size() > 0) {
drop_request = cluster_stats->mutable_dropped_requests(0);
drop_request->set_dropped_count(drop_request->dropped_count() + local_drop_count);
} else {
drop_request = cluster_stats->add_dropped_requests();
drop_request->set_dropped_count(local_drop_count);
}
drop_request->set_category("drop_overload");
}
}

for (int i = 0; i < local_cluster_stats.upstream_locality_stats_size(); ++i) {
const auto& local_upstream_locality_stats = local_cluster_stats.upstream_locality_stats(i);
Expand Down Expand Up @@ -243,7 +257,7 @@ class LoadStatsIntegrationTest : public Grpc::GrpcClientIntegrationParamTest,
ABSL_MUST_USE_RESULT AssertionResult
waitForLoadStatsRequest(const std::vector<envoy::config::endpoint::v3::UpstreamLocalityStats>&
expected_locality_stats,
uint64_t dropped = 0) {
uint64_t dropped = 0, bool drop_overload_test = false) {
Event::TestTimeSystem::RealTimeBound bound(TestUtility::DefaultTimeout);
Protobuf::RepeatedPtrField<envoy::config::endpoint::v3::ClusterStats> expected_cluster_stats;
if (!expected_locality_stats.empty() || dropped != 0) {
Expand All @@ -253,6 +267,11 @@ class LoadStatsIntegrationTest : public Grpc::GrpcClientIntegrationParamTest,
cluster_stats->set_cluster_service_name("service_name_0");
if (dropped > 0) {
cluster_stats->set_total_dropped_requests(dropped);
if (drop_overload_test) {
auto* drop_request = cluster_stats->add_dropped_requests();
drop_request->set_category("drop_overload");
drop_request->set_dropped_count(dropped);
}
}
std::copy(
expected_locality_stats.begin(), expected_locality_stats.end(),
Expand Down Expand Up @@ -368,6 +387,19 @@ class LoadStatsIntegrationTest : public Grpc::GrpcClientIntegrationParamTest,
cleanupUpstreamAndDownstream();
}

void updateDropOverloadConfig() {
envoy::config::endpoint::v3::ClusterLoadAssignment cluster_load_assignment;
cluster_load_assignment.set_cluster_name("service_name_0");
// Config drop_overload to drop 100% requests.
auto* policy = cluster_load_assignment.mutable_policy();
auto* drop_overload = policy->add_drop_overloads();
drop_overload->set_category("drop_overload");
auto* drop_percentage = drop_overload->mutable_drop_percentage();
drop_percentage->set_numerator(100);
drop_percentage->set_denominator(envoy::type::v3::FractionalPercent::HUNDRED);
eds_helper_.setEdsAndWait({cluster_load_assignment}, *test_server_);
}

static constexpr uint32_t upstream_endpoints_ = 5;

IntegrationStreamDecoderPtr response_;
Expand Down Expand Up @@ -646,5 +678,30 @@ TEST_P(LoadStatsIntegrationTest, Dropped) {
cleanupLoadStatsConnection();
}

// Validate the load reports for dropped requests due to drop_overload make sense.
TEST_P(LoadStatsIntegrationTest, DropOverloadDropped) {
initialize();
waitForLoadStatsStream();
ASSERT_TRUE(waitForLoadStatsRequest({}));
loadstats_stream_->startGrpcStream();
updateClusterLoadAssignment({{0}}, {}, {}, {});
updateDropOverloadConfig();

requestLoadStatsResponse({"cluster_0"});
initiateClientConnection();
ASSERT_TRUE(response_->waitForEndStream());
ASSERT_TRUE(response_->complete());
EXPECT_EQ("503", response_->headers().getStatusValue());
cleanupUpstreamAndDownstream();

ASSERT_TRUE(waitForLoadStatsRequest({}, 1, true));

EXPECT_EQ(1, test_server_->counter("load_reporter.requests")->value());
EXPECT_LE(2, test_server_->counter("load_reporter.responses")->value());
EXPECT_EQ(0, test_server_->counter("load_reporter.errors")->value());

cleanupLoadStatsConnection();
}

} // namespace
} // namespace Envoy

0 comments on commit 163a626

Please sign in to comment.