diff --git a/envoy/upstream/BUILD b/envoy/upstream/BUILD index b2253b11bbfe..0ea30e9fb349 100644 --- a/envoy/upstream/BUILD +++ b/envoy/upstream/BUILD @@ -86,6 +86,7 @@ envoy_cc_library( ":upstream_interface", "//envoy/router:router_interface", "//envoy/upstream:types_interface", + "@com_github_cncf_xds//xds/data/orca/v3:pkg_cc_proto", ], ) diff --git a/envoy/upstream/load_balancer.h b/envoy/upstream/load_balancer.h index cc58b35e5ea3..196102790bd3 100644 --- a/envoy/upstream/load_balancer.h +++ b/envoy/upstream/load_balancer.h @@ -10,6 +10,8 @@ #include "envoy/upstream/types.h" #include "envoy/upstream/upstream.h" +#include "xds/data/orca/v3/orca_load_report.pb.h" + namespace Envoy { namespace Http { namespace ConnectionPool { @@ -103,6 +105,21 @@ class LoadBalancerContext { * and return the corresponding host directly. */ virtual absl::optional overrideHostToSelect() const PURE; + + // Interface for callbacks when ORCA load reports are received from upstream. + class OrcaLoadReportCallbacks { + public: + virtual ~OrcaLoadReportCallbacks() = default; + // Invoked when a new orca report is received for this LB context. + virtual absl::Status + onOrcaLoadReport(const xds::data::orca::v3::OrcaLoadReport& orca_load_report) PURE; + }; + + /** + * Install a callback to be invoked when ORCA Load report is received for this + * LB context. + */ + virtual void setOrcaLoadReportCallbacks(OrcaLoadReportCallbacks& callbacks) PURE; }; /** diff --git a/source/common/router/router.cc b/source/common/router/router.cc index 05ad0e6430c4..8d73983e85ba 100644 --- a/source/common/router/router.cc +++ b/source/common/router/router.cc @@ -2118,7 +2118,7 @@ void Filter::maybeProcessOrcaLoadReport(const Envoy::Http::HeaderMap& headers_or auto host = upstream_request.upstreamHost(); const bool need_to_send_load_report = (host != nullptr) && cluster_->lrsReportMetricNames().has_value(); - if (!need_to_send_load_report) { + if (!need_to_send_load_report && !orca_load_report_callbacks_.has_value()) { return; } @@ -2132,10 +2132,19 @@ void Filter::maybeProcessOrcaLoadReport(const Envoy::Http::HeaderMap& headers_or orca_load_report_received_ = true; - ENVOY_STREAM_LOG(trace, "Adding ORCA load report {} to load metrics", *callbacks_, - orca_load_report->DebugString()); - Envoy::Orca::addOrcaLoadReportToLoadMetricStats( - cluster_->lrsReportMetricNames().value(), orca_load_report.value(), host->loadMetricStats()); + if (need_to_send_load_report) { + ENVOY_STREAM_LOG(trace, "Adding ORCA load report {} to load metrics", *callbacks_, + orca_load_report->DebugString()); + Envoy::Orca::addOrcaLoadReportToLoadMetricStats(cluster_->lrsReportMetricNames().value(), + orca_load_report.value(), + host->loadMetricStats()); + } + if (orca_load_report_callbacks_.has_value()) { + const absl::Status status = orca_load_report_callbacks_->onOrcaLoadReport(*orca_load_report); + if (!status.ok()) { + ENVOY_LOG_MISC(error, "Failed to invoke OrcaLoadReportCallbacks: {}", status.message()); + } + } } RetryStatePtr diff --git a/source/common/router/router.h b/source/common/router/router.h index 6e92950fbe1d..6f30c64646d1 100644 --- a/source/common/router/router.h +++ b/source/common/router/router.h @@ -422,6 +422,10 @@ class Filter : Logger::Loggable, return callbacks_->upstreamOverrideHost(); } + void setOrcaLoadReportCallbacks(OrcaLoadReportCallbacks& callbacks) override { + orca_load_report_callbacks_ = callbacks; + } + /** * Set a computed cookie to be sent with the downstream headers. * @param key supplies the size of the cookie @@ -604,6 +608,7 @@ class Filter : Logger::Loggable, Http::Code timeout_response_code_ = Http::Code::GatewayTimeout; FilterUtility::HedgingParams hedging_params_; Http::StreamFilterSidestreamWatermarkCallbacks watermark_callbacks_; + OptRef orca_load_report_callbacks_; bool grpc_request_ : 1; bool exclude_http_code_stats_ : 1; bool downstream_response_started_ : 1; diff --git a/source/common/upstream/load_balancer_context_base.h b/source/common/upstream/load_balancer_context_base.h index ded22edbdb01..8157897a48f4 100644 --- a/source/common/upstream/load_balancer_context_base.h +++ b/source/common/upstream/load_balancer_context_base.h @@ -34,6 +34,8 @@ class LoadBalancerContextBase : public LoadBalancerContext { } absl::optional overrideHostToSelect() const override { return {}; } + + void setOrcaLoadReportCallbacks(OrcaLoadReportCallbacks&) override {} }; } // namespace Upstream diff --git a/source/extensions/load_balancing_policies/subset/subset_lb.h b/source/extensions/load_balancing_policies/subset/subset_lb.h index 78854be3eb9b..d03296a9b3f2 100644 --- a/source/extensions/load_balancing_policies/subset/subset_lb.h +++ b/source/extensions/load_balancing_policies/subset/subset_lb.h @@ -200,6 +200,10 @@ class SubsetLoadBalancer : public LoadBalancer, Logger::LoggableoverrideHostToSelect(); } + void setOrcaLoadReportCallbacks(OrcaLoadReportCallbacks& callbacks) override { + wrapped_->setOrcaLoadReportCallbacks(callbacks); + } + private: LoadBalancerContext* wrapped_; Router::MetadataMatchCriteriaConstPtr metadata_match_; diff --git a/test/common/router/router_test.cc b/test/common/router/router_test.cc index ecc13f1b1e24..418e3fc1e1ba 100644 --- a/test/common/router/router_test.cc +++ b/test/common/router/router_test.cc @@ -6745,5 +6745,127 @@ TEST_F(RouterTest, OrcaLoadReport_NoConfiguredMetricNames) { ASSERT_EQ(load_metric_stats_map, nullptr); } +class TestOrcaLoadReportCallbacks : public Filter::OrcaLoadReportCallbacks { +public: + MOCK_METHOD(absl::Status, onOrcaLoadReport, + (const xds::data::orca::v3::OrcaLoadReport& orca_load_report), (override)); +}; + +TEST_F(RouterTest, OrcaLoadReportCallbacks) { + EXPECT_CALL(callbacks_.route_->route_entry_, timeout()) + .WillOnce(Return(std::chrono::milliseconds(0))); + EXPECT_CALL(callbacks_.dispatcher_, createTimer_(_)).Times(0); + + NiceMock encoder; + Http::ResponseDecoder* response_decoder = nullptr; + expectNewStreamWithImmediateEncoder(encoder, &response_decoder, Http::Protocol::Http10); + + Http::TestRequestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + router_->decodeHeaders(headers, true); + + // Configure ORCA callbacks to receive the report. + TestOrcaLoadReportCallbacks callbacks; + xds::data::orca::v3::OrcaLoadReport received_orca_load_report; + EXPECT_CALL(callbacks, onOrcaLoadReport(_)) + .WillOnce(Invoke([&](const xds::data::orca::v3::OrcaLoadReport& orca_load_report) { + received_orca_load_report = orca_load_report; + return absl::OkStatus(); + })); + router_->setOrcaLoadReportCallbacks(callbacks); + + // Send ORCA report in the headers. + xds::data::orca::v3::OrcaLoadReport headers_orca_load_report; + headers_orca_load_report.set_cpu_utilization(0.5); + headers_orca_load_report.mutable_named_metrics()->insert({"good", 0.7}); + std::string headers_proto_string = + TestUtility::getProtobufBinaryStringFromMessage(headers_orca_load_report); + std::string headers_orca_load_report_header_bin = + Envoy::Base64::encode(headers_proto_string.c_str(), headers_proto_string.length()); + Http::ResponseHeaderMapPtr response_headers(new Http::TestResponseHeaderMapImpl{ + {":status", "200"}, {"endpoint-load-metrics-bin", headers_orca_load_report_header_bin}}); + response_decoder->decodeHeaders(std::move(response_headers), false); + + // Send different ORCA report in the trailers. Expect it to be ignored. + xds::data::orca::v3::OrcaLoadReport trailers_orca_load_report; + trailers_orca_load_report.set_cpu_utilization(1.0); + trailers_orca_load_report.mutable_named_metrics()->insert({"good", 0.1}); + std::string trailers_proto_string = + TestUtility::getProtobufBinaryStringFromMessage(trailers_orca_load_report); + std::string trailers_orca_load_report_header_bin = + Envoy::Base64::encode(trailers_proto_string.c_str(), trailers_proto_string.length()); + Http::ResponseTrailerMapPtr response_trailers(new Http::TestResponseTrailerMapImpl{ + {":status", "200"}, {"endpoint-load-metrics-bin", trailers_orca_load_report_header_bin}}); + response_decoder->decodeTrailers(std::move(response_trailers)); + // Verify that received load report is set in headers. + EXPECT_EQ(received_orca_load_report.cpu_utilization(), + headers_orca_load_report.cpu_utilization()); +} + +TEST_F(RouterTest, OrcaLoadReportCallbackReturnsError) { + EXPECT_CALL(callbacks_.route_->route_entry_, timeout()) + .WillOnce(Return(std::chrono::milliseconds(0))); + EXPECT_CALL(callbacks_.dispatcher_, createTimer_(_)).Times(0); + + NiceMock encoder; + Http::ResponseDecoder* response_decoder = nullptr; + expectNewStreamWithImmediateEncoder(encoder, &response_decoder, Http::Protocol::Http10); + + Http::TestRequestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + router_->decodeHeaders(headers, true); + + // Configure ORCA callbacks to receive the report. + TestOrcaLoadReportCallbacks callbacks; + xds::data::orca::v3::OrcaLoadReport received_orca_load_report; + EXPECT_CALL(callbacks, onOrcaLoadReport(_)) + .WillOnce(Invoke([&](const xds::data::orca::v3::OrcaLoadReport& orca_load_report) { + received_orca_load_report = orca_load_report; + // Return an error that gets logged by router filter. + return absl::InvalidArgumentError("Unexpected ORCA load Report"); + })); + router_->setOrcaLoadReportCallbacks(callbacks); + + // Send metrics in the trailers. + xds::data::orca::v3::OrcaLoadReport orca_load_report; + orca_load_report.set_cpu_utilization(0.5); + orca_load_report.mutable_named_metrics()->insert({"good", 0.7}); + std::string proto_string = TestUtility::getProtobufBinaryStringFromMessage(orca_load_report); + std::string orca_load_report_header_bin = + Envoy::Base64::encode(proto_string.c_str(), proto_string.length()); + Http::ResponseTrailerMapPtr response_trailers(new Http::TestResponseTrailerMapImpl{ + {":status", "200"}, {"endpoint-load-metrics-bin", orca_load_report_header_bin}}); + response_decoder->decodeTrailers(std::move(response_trailers)); + EXPECT_EQ(received_orca_load_report.named_metrics().at("good"), 0.7); +} + +TEST_F(RouterTest, OrcaLoadReportInvalidHeaderValue) { + EXPECT_CALL(callbacks_.route_->route_entry_, timeout()) + .WillOnce(Return(std::chrono::milliseconds(0))); + EXPECT_CALL(callbacks_.dispatcher_, createTimer_(_)).Times(0); + + NiceMock encoder; + Http::ResponseDecoder* response_decoder = nullptr; + expectNewStreamWithImmediateEncoder(encoder, &response_decoder, Http::Protocol::Http10); + + Http::TestRequestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + router_->decodeHeaders(headers, true); + + // Configure ORCA callbacks to receive the report, but don't expect it to be + // called for invalid orca header. + TestOrcaLoadReportCallbacks callbacks; + EXPECT_CALL(callbacks, onOrcaLoadReport(_)).Times(0); + router_->setOrcaLoadReportCallbacks(callbacks); + + // Send report with invalid ORCA proto. + std::string proto_string = "Invalid ORCA proto value"; + std::string orca_load_report_header_bin = + Envoy::Base64::encode(proto_string.c_str(), proto_string.length()); + Http::ResponseHeaderMapPtr response_headers(new Http::TestResponseHeaderMapImpl{ + {":status", "200"}, {"endpoint-load-metrics-bin", orca_load_report_header_bin}}); + response_decoder->decodeHeaders(std::move(response_headers), true); +} + } // namespace Router } // namespace Envoy diff --git a/test/mocks/upstream/load_balancer_context.h b/test/mocks/upstream/load_balancer_context.h index ef0fccbe90be..35ec2ffbf26e 100644 --- a/test/mocks/upstream/load_balancer_context.h +++ b/test/mocks/upstream/load_balancer_context.h @@ -26,6 +26,7 @@ class MockLoadBalancerContext : public LoadBalancerContext { MOCK_METHOD(Network::TransportSocketOptionsConstSharedPtr, upstreamTransportSocketOptions, (), (const)); MOCK_METHOD(absl::optional, overrideHostToSelect, (), (const)); + MOCK_METHOD(void, setOrcaLoadReportCallbacks, (OrcaLoadReportCallbacks&)); private: HealthyAndDegradedLoad priority_load_;