From e52ec8eb05c95b46d314aeace900aa63f74cfbff Mon Sep 17 00:00:00 2001 From: Misha Efimov Date: Mon, 5 Aug 2024 16:16:43 -0400 Subject: [PATCH 1/2] Invoke ORCA Load Report callbacks from Router::Filter. - Add `LoadBalancerContext::setOrcaLoadReportCb(OrcaLoadReportCb callback)` method. Signed-off-by: Misha Efimov --- envoy/upstream/BUILD | 1 + envoy/upstream/load_balancer.h | 12 ++++++++++++ source/common/router/router.cc | 19 ++++++++++++++----- source/common/router/router.h | 5 +++++ .../upstream/load_balancer_context_base.h | 2 ++ .../subset/subset_lb.h | 4 ++++ test/mocks/upstream/load_balancer_context.h | 1 + 7 files changed, 39 insertions(+), 5 deletions(-) diff --git a/envoy/upstream/BUILD b/envoy/upstream/BUILD index b2253b11bbfe..0ea30e9fb349 100644 --- a/envoy/upstream/BUILD +++ b/envoy/upstream/BUILD @@ -86,6 +86,7 @@ envoy_cc_library( ":upstream_interface", "//envoy/router:router_interface", "//envoy/upstream:types_interface", + "@com_github_cncf_xds//xds/data/orca/v3:pkg_cc_proto", ], ) diff --git a/envoy/upstream/load_balancer.h b/envoy/upstream/load_balancer.h index cc58b35e5ea3..0a8b0502e177 100644 --- a/envoy/upstream/load_balancer.h +++ b/envoy/upstream/load_balancer.h @@ -10,6 +10,8 @@ #include "envoy/upstream/types.h" #include "envoy/upstream/upstream.h" +#include "xds/data/orca/v3/orca_load_report.pb.h" + namespace Envoy { namespace Http { namespace ConnectionPool { @@ -103,6 +105,16 @@ class LoadBalancerContext { * and return the corresponding host directly. */ virtual absl::optional overrideHostToSelect() const PURE; + + // Invoked when a new orca report is received for this LB context. + using OrcaLoadReportCb = + std::function; + + /** + * Install a callback to be invoked when ORCA Load report is received for this + * LB context. + */ + virtual void setOrcaLoadReportCb(OrcaLoadReportCb callback) PURE; }; /** diff --git a/source/common/router/router.cc b/source/common/router/router.cc index 05ad0e6430c4..5570717b62c8 100644 --- a/source/common/router/router.cc +++ b/source/common/router/router.cc @@ -2118,7 +2118,7 @@ void Filter::maybeProcessOrcaLoadReport(const Envoy::Http::HeaderMap& headers_or auto host = upstream_request.upstreamHost(); const bool need_to_send_load_report = (host != nullptr) && cluster_->lrsReportMetricNames().has_value(); - if (!need_to_send_load_report) { + if (!need_to_send_load_report && !orca_load_report_cb_) { return; } @@ -2132,10 +2132,19 @@ void Filter::maybeProcessOrcaLoadReport(const Envoy::Http::HeaderMap& headers_or orca_load_report_received_ = true; - ENVOY_STREAM_LOG(trace, "Adding ORCA load report {} to load metrics", *callbacks_, - orca_load_report->DebugString()); - Envoy::Orca::addOrcaLoadReportToLoadMetricStats( - cluster_->lrsReportMetricNames().value(), orca_load_report.value(), host->loadMetricStats()); + if (need_to_send_load_report) { + ENVOY_STREAM_LOG(trace, "Adding ORCA load report {} to load metrics", *callbacks_, + orca_load_report->DebugString()); + Envoy::Orca::addOrcaLoadReportToLoadMetricStats(cluster_->lrsReportMetricNames().value(), + orca_load_report.value(), + host->loadMetricStats()); + } + if (orca_load_report_cb_) { + auto status = orca_load_report_cb_(*orca_load_report); + if (!status.ok()) { + ENVOY_LOG_MISC(error, "Failed to invoke OrcaLoadReportCb: {}", status.message()); + } + } } RetryStatePtr diff --git a/source/common/router/router.h b/source/common/router/router.h index 6e92950fbe1d..34ae59d136d4 100644 --- a/source/common/router/router.h +++ b/source/common/router/router.h @@ -422,6 +422,10 @@ class Filter : Logger::Loggable, return callbacks_->upstreamOverrideHost(); } + void setOrcaLoadReportCb(OrcaLoadReportCb callback) override { + orca_load_report_cb_ = std::move(callback); + } + /** * Set a computed cookie to be sent with the downstream headers. * @param key supplies the size of the cookie @@ -604,6 +608,7 @@ class Filter : Logger::Loggable, Http::Code timeout_response_code_ = Http::Code::GatewayTimeout; FilterUtility::HedgingParams hedging_params_; Http::StreamFilterSidestreamWatermarkCallbacks watermark_callbacks_; + OrcaLoadReportCb orca_load_report_cb_; bool grpc_request_ : 1; bool exclude_http_code_stats_ : 1; bool downstream_response_started_ : 1; diff --git a/source/common/upstream/load_balancer_context_base.h b/source/common/upstream/load_balancer_context_base.h index ded22edbdb01..e9616793c145 100644 --- a/source/common/upstream/load_balancer_context_base.h +++ b/source/common/upstream/load_balancer_context_base.h @@ -34,6 +34,8 @@ class LoadBalancerContextBase : public LoadBalancerContext { } absl::optional overrideHostToSelect() const override { return {}; } + + void setOrcaLoadReportCb(OrcaLoadReportCb) override {} }; } // namespace Upstream diff --git a/source/extensions/load_balancing_policies/subset/subset_lb.h b/source/extensions/load_balancing_policies/subset/subset_lb.h index 78854be3eb9b..6de0508cf8f3 100644 --- a/source/extensions/load_balancing_policies/subset/subset_lb.h +++ b/source/extensions/load_balancing_policies/subset/subset_lb.h @@ -200,6 +200,10 @@ class SubsetLoadBalancer : public LoadBalancer, Logger::LoggableoverrideHostToSelect(); } + void setOrcaLoadReportCb(OrcaLoadReportCb callback) override { + return wrapped_->setOrcaLoadReportCb(std::move(callback)); + } + private: LoadBalancerContext* wrapped_; Router::MetadataMatchCriteriaConstPtr metadata_match_; diff --git a/test/mocks/upstream/load_balancer_context.h b/test/mocks/upstream/load_balancer_context.h index ef0fccbe90be..234d5ac161b4 100644 --- a/test/mocks/upstream/load_balancer_context.h +++ b/test/mocks/upstream/load_balancer_context.h @@ -26,6 +26,7 @@ class MockLoadBalancerContext : public LoadBalancerContext { MOCK_METHOD(Network::TransportSocketOptionsConstSharedPtr, upstreamTransportSocketOptions, (), (const)); MOCK_METHOD(absl::optional, overrideHostToSelect, (), (const)); + MOCK_METHOD(void, setOrcaLoadReportCb, (OrcaLoadReportCb)); private: HealthyAndDegradedLoad priority_load_; From a8aab1de84a60ae2f2858460fa025e325e0c90cd Mon Sep 17 00:00:00 2001 From: Misha Efimov Date: Mon, 26 Aug 2024 16:41:30 -0400 Subject: [PATCH 2/2] Address code review comments, use an interface instead of a function. Signed-off-by: Misha Efimov --- envoy/upstream/load_balancer.h | 13 +- source/common/router/router.cc | 8 +- source/common/router/router.h | 6 +- .../upstream/load_balancer_context_base.h | 2 +- .../subset/subset_lb.h | 4 +- test/common/router/router_test.cc | 122 ++++++++++++++++++ test/mocks/upstream/load_balancer_context.h | 2 +- 7 files changed, 142 insertions(+), 15 deletions(-) diff --git a/envoy/upstream/load_balancer.h b/envoy/upstream/load_balancer.h index 0a8b0502e177..196102790bd3 100644 --- a/envoy/upstream/load_balancer.h +++ b/envoy/upstream/load_balancer.h @@ -106,15 +106,20 @@ class LoadBalancerContext { */ virtual absl::optional overrideHostToSelect() const PURE; - // Invoked when a new orca report is received for this LB context. - using OrcaLoadReportCb = - std::function; + // Interface for callbacks when ORCA load reports are received from upstream. + class OrcaLoadReportCallbacks { + public: + virtual ~OrcaLoadReportCallbacks() = default; + // Invoked when a new orca report is received for this LB context. + virtual absl::Status + onOrcaLoadReport(const xds::data::orca::v3::OrcaLoadReport& orca_load_report) PURE; + }; /** * Install a callback to be invoked when ORCA Load report is received for this * LB context. */ - virtual void setOrcaLoadReportCb(OrcaLoadReportCb callback) PURE; + virtual void setOrcaLoadReportCallbacks(OrcaLoadReportCallbacks& callbacks) PURE; }; /** diff --git a/source/common/router/router.cc b/source/common/router/router.cc index 5570717b62c8..8d73983e85ba 100644 --- a/source/common/router/router.cc +++ b/source/common/router/router.cc @@ -2118,7 +2118,7 @@ void Filter::maybeProcessOrcaLoadReport(const Envoy::Http::HeaderMap& headers_or auto host = upstream_request.upstreamHost(); const bool need_to_send_load_report = (host != nullptr) && cluster_->lrsReportMetricNames().has_value(); - if (!need_to_send_load_report && !orca_load_report_cb_) { + if (!need_to_send_load_report && !orca_load_report_callbacks_.has_value()) { return; } @@ -2139,10 +2139,10 @@ void Filter::maybeProcessOrcaLoadReport(const Envoy::Http::HeaderMap& headers_or orca_load_report.value(), host->loadMetricStats()); } - if (orca_load_report_cb_) { - auto status = orca_load_report_cb_(*orca_load_report); + if (orca_load_report_callbacks_.has_value()) { + const absl::Status status = orca_load_report_callbacks_->onOrcaLoadReport(*orca_load_report); if (!status.ok()) { - ENVOY_LOG_MISC(error, "Failed to invoke OrcaLoadReportCb: {}", status.message()); + ENVOY_LOG_MISC(error, "Failed to invoke OrcaLoadReportCallbacks: {}", status.message()); } } } diff --git a/source/common/router/router.h b/source/common/router/router.h index 34ae59d136d4..6f30c64646d1 100644 --- a/source/common/router/router.h +++ b/source/common/router/router.h @@ -422,8 +422,8 @@ class Filter : Logger::Loggable, return callbacks_->upstreamOverrideHost(); } - void setOrcaLoadReportCb(OrcaLoadReportCb callback) override { - orca_load_report_cb_ = std::move(callback); + void setOrcaLoadReportCallbacks(OrcaLoadReportCallbacks& callbacks) override { + orca_load_report_callbacks_ = callbacks; } /** @@ -608,7 +608,7 @@ class Filter : Logger::Loggable, Http::Code timeout_response_code_ = Http::Code::GatewayTimeout; FilterUtility::HedgingParams hedging_params_; Http::StreamFilterSidestreamWatermarkCallbacks watermark_callbacks_; - OrcaLoadReportCb orca_load_report_cb_; + OptRef orca_load_report_callbacks_; bool grpc_request_ : 1; bool exclude_http_code_stats_ : 1; bool downstream_response_started_ : 1; diff --git a/source/common/upstream/load_balancer_context_base.h b/source/common/upstream/load_balancer_context_base.h index e9616793c145..8157897a48f4 100644 --- a/source/common/upstream/load_balancer_context_base.h +++ b/source/common/upstream/load_balancer_context_base.h @@ -35,7 +35,7 @@ class LoadBalancerContextBase : public LoadBalancerContext { absl::optional overrideHostToSelect() const override { return {}; } - void setOrcaLoadReportCb(OrcaLoadReportCb) override {} + void setOrcaLoadReportCallbacks(OrcaLoadReportCallbacks&) override {} }; } // namespace Upstream diff --git a/source/extensions/load_balancing_policies/subset/subset_lb.h b/source/extensions/load_balancing_policies/subset/subset_lb.h index 6de0508cf8f3..d03296a9b3f2 100644 --- a/source/extensions/load_balancing_policies/subset/subset_lb.h +++ b/source/extensions/load_balancing_policies/subset/subset_lb.h @@ -200,8 +200,8 @@ class SubsetLoadBalancer : public LoadBalancer, Logger::LoggableoverrideHostToSelect(); } - void setOrcaLoadReportCb(OrcaLoadReportCb callback) override { - return wrapped_->setOrcaLoadReportCb(std::move(callback)); + void setOrcaLoadReportCallbacks(OrcaLoadReportCallbacks& callbacks) override { + wrapped_->setOrcaLoadReportCallbacks(callbacks); } private: diff --git a/test/common/router/router_test.cc b/test/common/router/router_test.cc index ecc13f1b1e24..418e3fc1e1ba 100644 --- a/test/common/router/router_test.cc +++ b/test/common/router/router_test.cc @@ -6745,5 +6745,127 @@ TEST_F(RouterTest, OrcaLoadReport_NoConfiguredMetricNames) { ASSERT_EQ(load_metric_stats_map, nullptr); } +class TestOrcaLoadReportCallbacks : public Filter::OrcaLoadReportCallbacks { +public: + MOCK_METHOD(absl::Status, onOrcaLoadReport, + (const xds::data::orca::v3::OrcaLoadReport& orca_load_report), (override)); +}; + +TEST_F(RouterTest, OrcaLoadReportCallbacks) { + EXPECT_CALL(callbacks_.route_->route_entry_, timeout()) + .WillOnce(Return(std::chrono::milliseconds(0))); + EXPECT_CALL(callbacks_.dispatcher_, createTimer_(_)).Times(0); + + NiceMock encoder; + Http::ResponseDecoder* response_decoder = nullptr; + expectNewStreamWithImmediateEncoder(encoder, &response_decoder, Http::Protocol::Http10); + + Http::TestRequestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + router_->decodeHeaders(headers, true); + + // Configure ORCA callbacks to receive the report. + TestOrcaLoadReportCallbacks callbacks; + xds::data::orca::v3::OrcaLoadReport received_orca_load_report; + EXPECT_CALL(callbacks, onOrcaLoadReport(_)) + .WillOnce(Invoke([&](const xds::data::orca::v3::OrcaLoadReport& orca_load_report) { + received_orca_load_report = orca_load_report; + return absl::OkStatus(); + })); + router_->setOrcaLoadReportCallbacks(callbacks); + + // Send ORCA report in the headers. + xds::data::orca::v3::OrcaLoadReport headers_orca_load_report; + headers_orca_load_report.set_cpu_utilization(0.5); + headers_orca_load_report.mutable_named_metrics()->insert({"good", 0.7}); + std::string headers_proto_string = + TestUtility::getProtobufBinaryStringFromMessage(headers_orca_load_report); + std::string headers_orca_load_report_header_bin = + Envoy::Base64::encode(headers_proto_string.c_str(), headers_proto_string.length()); + Http::ResponseHeaderMapPtr response_headers(new Http::TestResponseHeaderMapImpl{ + {":status", "200"}, {"endpoint-load-metrics-bin", headers_orca_load_report_header_bin}}); + response_decoder->decodeHeaders(std::move(response_headers), false); + + // Send different ORCA report in the trailers. Expect it to be ignored. + xds::data::orca::v3::OrcaLoadReport trailers_orca_load_report; + trailers_orca_load_report.set_cpu_utilization(1.0); + trailers_orca_load_report.mutable_named_metrics()->insert({"good", 0.1}); + std::string trailers_proto_string = + TestUtility::getProtobufBinaryStringFromMessage(trailers_orca_load_report); + std::string trailers_orca_load_report_header_bin = + Envoy::Base64::encode(trailers_proto_string.c_str(), trailers_proto_string.length()); + Http::ResponseTrailerMapPtr response_trailers(new Http::TestResponseTrailerMapImpl{ + {":status", "200"}, {"endpoint-load-metrics-bin", trailers_orca_load_report_header_bin}}); + response_decoder->decodeTrailers(std::move(response_trailers)); + // Verify that received load report is set in headers. + EXPECT_EQ(received_orca_load_report.cpu_utilization(), + headers_orca_load_report.cpu_utilization()); +} + +TEST_F(RouterTest, OrcaLoadReportCallbackReturnsError) { + EXPECT_CALL(callbacks_.route_->route_entry_, timeout()) + .WillOnce(Return(std::chrono::milliseconds(0))); + EXPECT_CALL(callbacks_.dispatcher_, createTimer_(_)).Times(0); + + NiceMock encoder; + Http::ResponseDecoder* response_decoder = nullptr; + expectNewStreamWithImmediateEncoder(encoder, &response_decoder, Http::Protocol::Http10); + + Http::TestRequestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + router_->decodeHeaders(headers, true); + + // Configure ORCA callbacks to receive the report. + TestOrcaLoadReportCallbacks callbacks; + xds::data::orca::v3::OrcaLoadReport received_orca_load_report; + EXPECT_CALL(callbacks, onOrcaLoadReport(_)) + .WillOnce(Invoke([&](const xds::data::orca::v3::OrcaLoadReport& orca_load_report) { + received_orca_load_report = orca_load_report; + // Return an error that gets logged by router filter. + return absl::InvalidArgumentError("Unexpected ORCA load Report"); + })); + router_->setOrcaLoadReportCallbacks(callbacks); + + // Send metrics in the trailers. + xds::data::orca::v3::OrcaLoadReport orca_load_report; + orca_load_report.set_cpu_utilization(0.5); + orca_load_report.mutable_named_metrics()->insert({"good", 0.7}); + std::string proto_string = TestUtility::getProtobufBinaryStringFromMessage(orca_load_report); + std::string orca_load_report_header_bin = + Envoy::Base64::encode(proto_string.c_str(), proto_string.length()); + Http::ResponseTrailerMapPtr response_trailers(new Http::TestResponseTrailerMapImpl{ + {":status", "200"}, {"endpoint-load-metrics-bin", orca_load_report_header_bin}}); + response_decoder->decodeTrailers(std::move(response_trailers)); + EXPECT_EQ(received_orca_load_report.named_metrics().at("good"), 0.7); +} + +TEST_F(RouterTest, OrcaLoadReportInvalidHeaderValue) { + EXPECT_CALL(callbacks_.route_->route_entry_, timeout()) + .WillOnce(Return(std::chrono::milliseconds(0))); + EXPECT_CALL(callbacks_.dispatcher_, createTimer_(_)).Times(0); + + NiceMock encoder; + Http::ResponseDecoder* response_decoder = nullptr; + expectNewStreamWithImmediateEncoder(encoder, &response_decoder, Http::Protocol::Http10); + + Http::TestRequestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + router_->decodeHeaders(headers, true); + + // Configure ORCA callbacks to receive the report, but don't expect it to be + // called for invalid orca header. + TestOrcaLoadReportCallbacks callbacks; + EXPECT_CALL(callbacks, onOrcaLoadReport(_)).Times(0); + router_->setOrcaLoadReportCallbacks(callbacks); + + // Send report with invalid ORCA proto. + std::string proto_string = "Invalid ORCA proto value"; + std::string orca_load_report_header_bin = + Envoy::Base64::encode(proto_string.c_str(), proto_string.length()); + Http::ResponseHeaderMapPtr response_headers(new Http::TestResponseHeaderMapImpl{ + {":status", "200"}, {"endpoint-load-metrics-bin", orca_load_report_header_bin}}); + response_decoder->decodeHeaders(std::move(response_headers), true); +} + } // namespace Router } // namespace Envoy diff --git a/test/mocks/upstream/load_balancer_context.h b/test/mocks/upstream/load_balancer_context.h index 234d5ac161b4..35ec2ffbf26e 100644 --- a/test/mocks/upstream/load_balancer_context.h +++ b/test/mocks/upstream/load_balancer_context.h @@ -26,7 +26,7 @@ class MockLoadBalancerContext : public LoadBalancerContext { MOCK_METHOD(Network::TransportSocketOptionsConstSharedPtr, upstreamTransportSocketOptions, (), (const)); MOCK_METHOD(absl::optional, overrideHostToSelect, (), (const)); - MOCK_METHOD(void, setOrcaLoadReportCb, (OrcaLoadReportCb)); + MOCK_METHOD(void, setOrcaLoadReportCallbacks, (OrcaLoadReportCallbacks&)); private: HealthyAndDegradedLoad priority_load_;