From 857107b72abdf62690b7a1c69f9a3684d57f5f3e Mon Sep 17 00:00:00 2001 From: Takeshi Yoneda Date: Thu, 19 Dec 2024 11:48:39 +0900 Subject: [PATCH] http ratelimit: option to reduce budget on stream done (#37548) Commit Message: ratelimit: option to excute action on stream done Additional Description: This adds a new option `apply_on_stream_done` to the rate limit policy corresponding to each descriptor. This basically allows to configure descriptors to be executed in a response content-aware way and do not enforce the rate limit (in other words "fire-and-forget"). Since addend can be currently controlled via metadata per descriptor, another filter can be used to set the value to reflect their intent there, for example, by using Lua or Ext Proc filters. This use case arises from the LLM API services which usually return the usage statistics in the response body. More specifically, they have "streaming" APIs whose response is a line-by-line event stream where the very last line of the response line contains the usage statistics. The lazy nature of this action is perfectly fine as in these use cases, the rate limit happens like "you are forbidden from the next time". Besides the LLM specific, I've also encountered the use case from the data center resource allocation case where the operators want to "block the computation from the next time since you used this much resources in this request". Ref: https://github.com/envoyproxy/gateway/issues/4756 Risk Level: low Testing: done Docs Changes: done Release Notes: TODO Platform Specific Features: n/a --------- Signed-off-by: Takeshi Yoneda --- .../config/route/v3/route_components.proto | 19 +++- envoy/router/router_ratelimit.h | 5 + source/common/router/router_ratelimit.cc | 3 +- source/common/router/router_ratelimit.h | 2 + .../filters/common/ratelimit/ratelimit.h | 2 +- .../common/ratelimit/ratelimit_impl.cc | 25 +++-- .../filters/common/ratelimit/ratelimit_impl.h | 2 +- .../filters/http/ratelimit/ratelimit.cc | 95 +++++++++++++------ .../filters/http/ratelimit/ratelimit.h | 34 ++++++- .../filters/common/ratelimit/mocks.h | 2 +- .../filters/http/ratelimit/ratelimit_test.cc | 30 +++++- test/mocks/router/mocks.h | 1 + 12 files changed, 168 insertions(+), 52 deletions(-) diff --git a/api/envoy/config/route/v3/route_components.proto b/api/envoy/config/route/v3/route_components.proto index 909a7305f123..a3d4d009f0eb 100644 --- a/api/envoy/config/route/v3/route_components.proto +++ b/api/envoy/config/route/v3/route_components.proto @@ -1868,7 +1868,7 @@ message VirtualCluster { // Global rate limiting :ref:`architecture overview `. // Also applies to Local rate limiting :ref:`using descriptors `. -// [#next-free-field: 6] +// [#next-free-field: 7] message RateLimit { option (udpa.annotations.versioning).previous_message_type = "envoy.api.v2.route.RateLimit"; @@ -2245,6 +2245,23 @@ message RateLimit { // :ref:`VirtualHost.typed_per_filter_config` or // :ref:`Route.typed_per_filter_config`, etc. HitsAddend hits_addend = 5; + + // If true, the rate limit request will be applied when the stream completes. The default value is false. + // This is useful when the rate limit budget needs to reflect the response context that is not available + // on the request path. + // + // For example, let's say the upstream service calculates the usage statistics and returns them in the response body + // and we want to utilize these numbers to apply the rate limit action for the subsequent requests. + // Combined with another filter that can set the desired addend based on the response (e.g. Lua filter), + // this can be used to subtract the usage statistics from the rate limit budget. + // + // A rate limit applied on the stream completion is "fire-and-forget" by nature, and rate limit is not enforced by this config. + // In other words, the current request won't be blocked when this is true, but the budget will be updated for the subsequent + // requests based on the action with this field set to true. Users should ensure that the rate limit is enforced by the actions + // applied on the request path, i.e. the ones with this field set to false. + // + // Currently, this is only supported by the HTTP global rate filter. + bool apply_on_stream_done = 6; } // .. attention:: diff --git a/envoy/router/router_ratelimit.h b/envoy/router/router_ratelimit.h index 199b3248130e..7b3dcdbedc72 100644 --- a/envoy/router/router_ratelimit.h +++ b/envoy/router/router_ratelimit.h @@ -48,6 +48,11 @@ class RateLimitPolicyEntry { */ virtual const std::string& disableKey() const PURE; + /** + * @return true if this rate limit policy should be applied on stream done. + */ + virtual bool applyOnStreamDone() const PURE; + /** * Potentially populate the descriptor array with new descriptors to query. * @param descriptors supplies the descriptor array to optionally fill. diff --git a/source/common/router/router_ratelimit.cc b/source/common/router/router_ratelimit.cc index 77c285a85a24..386374d39725 100644 --- a/source/common/router/router_ratelimit.cc +++ b/source/common/router/router_ratelimit.cc @@ -267,7 +267,8 @@ RateLimitPolicyEntryImpl::RateLimitPolicyEntryImpl( const envoy::config::route::v3::RateLimit& config, Server::Configuration::CommonFactoryContext& context, absl::Status& creation_status) : disable_key_(config.disable_key()), - stage_(static_cast(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, stage, 0))) { + stage_(static_cast(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, stage, 0))), + apply_on_stream_done_(config.apply_on_stream_done()) { for (const auto& action : config.actions()) { switch (action.action_specifier_case()) { case envoy::config::route::v3::RateLimit::Action::ActionSpecifierCase::kSourceCluster: diff --git a/source/common/router/router_ratelimit.h b/source/common/router/router_ratelimit.h index 3fb5149a4cc2..686f15a6938d 100644 --- a/source/common/router/router_ratelimit.h +++ b/source/common/router/router_ratelimit.h @@ -256,12 +256,14 @@ class RateLimitPolicyEntryImpl : public RateLimitPolicyEntry { const std::string& local_service_cluster, const Http::RequestHeaderMap&, const StreamInfo::StreamInfo& info) const override; + bool applyOnStreamDone() const override { return apply_on_stream_done_; } private: const std::string disable_key_; uint64_t stage_; std::vector actions_; absl::optional limit_override_ = absl::nullopt; + const bool apply_on_stream_done_ = false; }; /** diff --git a/source/extensions/filters/common/ratelimit/ratelimit.h b/source/extensions/filters/common/ratelimit/ratelimit.h index 11267cc7db03..44a29d13da6a 100644 --- a/source/extensions/filters/common/ratelimit/ratelimit.h +++ b/source/extensions/filters/common/ratelimit/ratelimit.h @@ -90,7 +90,7 @@ class Client { */ virtual void limit(RequestCallbacks& callbacks, const std::string& domain, const std::vector& descriptors, - Tracing::Span& parent_span, const StreamInfo::StreamInfo& stream_info, + Tracing::Span& parent_span, OptRef stream_info, uint32_t hits_addend) PURE; }; diff --git a/source/extensions/filters/common/ratelimit/ratelimit_impl.cc b/source/extensions/filters/common/ratelimit/ratelimit_impl.cc index 3350e132562a..7f1b07e8f6f3 100644 --- a/source/extensions/filters/common/ratelimit/ratelimit_impl.cc +++ b/source/extensions/filters/common/ratelimit/ratelimit_impl.cc @@ -59,18 +59,19 @@ void GrpcClientImpl::createRequest(envoy::service::ratelimit::v3::RateLimitReque void GrpcClientImpl::limit(RequestCallbacks& callbacks, const std::string& domain, const std::vector& descriptors, - Tracing::Span& parent_span, const StreamInfo::StreamInfo& stream_info, - uint32_t hits_addend) { + Tracing::Span& parent_span, + OptRef stream_info, uint32_t hits_addend) { ASSERT(callbacks_ == nullptr); callbacks_ = &callbacks; envoy::service::ratelimit::v3::RateLimitRequest request; createRequest(request, domain, descriptors, hits_addend); - request_ = - async_client_->send(service_method_, request, *this, parent_span, - Http::AsyncClient::RequestOptions().setTimeout(timeout_).setParentContext( - Http::AsyncClient::ParentContext{&stream_info})); + auto options = Http::AsyncClient::RequestOptions().setTimeout(timeout_); + if (stream_info.has_value()) { + options.setParentContext(Http::AsyncClient::ParentContext{stream_info.ptr()}); + } + request_ = async_client_->send(service_method_, request, *this, parent_span, options); } void GrpcClientImpl::onSuccess( @@ -107,10 +108,13 @@ void GrpcClientImpl::onSuccess( response->has_dynamic_metadata() ? std::make_unique(response->dynamic_metadata()) : nullptr; - callbacks_->complete(status, std::move(descriptor_statuses), std::move(response_headers_to_add), + // The rate limit requests applied on stream-done will destroy the client inside the complete + // callback, so we release the callback here to make the destructor happy. + auto call_backs = callbacks_; + callbacks_ = nullptr; + call_backs->complete(status, std::move(descriptor_statuses), std::move(response_headers_to_add), std::move(request_headers_to_add), response->raw_body(), std::move(dynamic_metadata)); - callbacks_ = nullptr; } void GrpcClientImpl::onFailure(Grpc::Status::GrpcStatus status, const std::string& msg, @@ -118,8 +122,11 @@ void GrpcClientImpl::onFailure(Grpc::Status::GrpcStatus status, const std::strin ASSERT(status != Grpc::Status::WellKnownGrpcStatus::Ok); ENVOY_LOG_TO_LOGGER(Logger::Registry::getLog(Logger::Id::filter), debug, "rate limit fail, status={} msg={}", status, msg); - callbacks_->complete(LimitStatus::Error, nullptr, nullptr, nullptr, EMPTY_STRING, nullptr); + // The rate limit requests applied on stream-done will destroy the client inside the complete + // callback, so we release the callback here to make the destructor happy. + auto call_backs = callbacks_; callbacks_ = nullptr; + call_backs->complete(LimitStatus::Error, nullptr, nullptr, nullptr, EMPTY_STRING, nullptr); } ClientPtr rateLimitClient(Server::Configuration::FactoryContext& context, diff --git a/source/extensions/filters/common/ratelimit/ratelimit_impl.h b/source/extensions/filters/common/ratelimit/ratelimit_impl.h index 79502ec2ef78..61a6c1c5ec88 100644 --- a/source/extensions/filters/common/ratelimit/ratelimit_impl.h +++ b/source/extensions/filters/common/ratelimit/ratelimit_impl.h @@ -57,7 +57,7 @@ class GrpcClientImpl : public Client, void cancel() override; void limit(RequestCallbacks& callbacks, const std::string& domain, const std::vector& descriptors, - Tracing::Span& parent_span, const StreamInfo::StreamInfo& stream_info, + Tracing::Span& parent_span, OptRef stream_info, uint32_t hits_addend = 0) override; // Grpc::AsyncRequestCallbacks diff --git a/source/extensions/filters/http/ratelimit/ratelimit.cc b/source/extensions/filters/http/ratelimit/ratelimit.cc index 7052f8f793ed..858b1548810a 100644 --- a/source/extensions/filters/http/ratelimit/ratelimit.cc +++ b/source/extensions/filters/http/ratelimit/ratelimit.cc @@ -55,37 +55,58 @@ void Filter::initiateCall(const Http::RequestHeaderMap& headers) { return; } - Router::RouteConstSharedPtr route = callbacks_->route(); - if (!route || !route->routeEntry()) { - return; + std::vector descriptors; + populateRateLimitDescriptors(descriptors, headers, false); + if (!descriptors.empty()) { + state_ = State::Calling; + initiating_call_ = true; + client_->limit(*this, getDomain(), descriptors, callbacks_->activeSpan(), + callbacks_->streamInfo(), getHitAddend()); + initiating_call_ = false; } +} - cluster_ = callbacks_->clusterInfo(); - if (!cluster_) { +void Filter::populateRateLimitDescriptors(std::vector& descriptors, + const Http::RequestHeaderMap& headers, + bool on_stream_done) { + if (!on_stream_done) { + // To use the exact same context for both request and on_stream_done rate limiting descriptors, + // we save the route and per-route configuration here and use them later. + route_ = callbacks_->route(); + cluster_ = callbacks_->clusterInfo(); + } + if (!route_ || !cluster_) { return; } - std::vector descriptors; - - const Router::RouteEntry* route_entry = route->routeEntry(); + const Router::RouteEntry* route_entry = route_->routeEntry(); + if (!route_entry) { + return; + } + if (!on_stream_done) { + initializeVirtualHostRateLimitOption(route_entry); + } // Get all applicable rate limit policy entries for the route. - populateRateLimitDescriptors(route_entry->rateLimitPolicy(), descriptors, headers); + populateRateLimitDescriptorsForPolicy(route_entry->rateLimitPolicy(), descriptors, headers, + on_stream_done); - VhRateLimitOptions vh_rate_limit_option = getVirtualHostRateLimitOption(route); - - switch (vh_rate_limit_option) { + switch (vh_rate_limits_) { case VhRateLimitOptions::Ignore: break; case VhRateLimitOptions::Include: - populateRateLimitDescriptors(route->virtualHost().rateLimitPolicy(), descriptors, headers); + populateRateLimitDescriptorsForPolicy(route_->virtualHost().rateLimitPolicy(), descriptors, + headers, on_stream_done); break; case VhRateLimitOptions::Override: if (route_entry->rateLimitPolicy().empty()) { - populateRateLimitDescriptors(route->virtualHost().rateLimitPolicy(), descriptors, headers); + populateRateLimitDescriptorsForPolicy(route_->virtualHost().rateLimitPolicy(), descriptors, + headers, on_stream_done); } break; } +} +double Filter::getHitAddend() { const StreamInfo::UInt32Accessor* hits_addend_filter_state = callbacks_->streamInfo().filterState()->getDataReadOnly( HitsAddendFilterStateKey); @@ -93,14 +114,7 @@ void Filter::initiateCall(const Http::RequestHeaderMap& headers) { if (hits_addend_filter_state != nullptr) { hits_addend = hits_addend_filter_state->value(); } - - if (!descriptors.empty()) { - state_ = State::Calling; - initiating_call_ = true; - client_->limit(*this, getDomain(), descriptors, callbacks_->activeSpan(), - callbacks_->streamInfo(), hits_addend); - initiating_call_ = false; - } + return hits_addend; } Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, bool) { @@ -161,6 +175,16 @@ void Filter::onDestroy() { if (state_ == State::Calling) { state_ = State::Complete; client_->cancel(); + } else if (client_ != nullptr) { + std::vector descriptors; + populateRateLimitDescriptors(descriptors, *request_headers_, true); + if (!descriptors.empty()) { + // Since this filter is being destroyed, we need to keep the client alive until the request + // is complete by leaking the client with OnStreamDoneCallBack. + auto callback = new OnStreamDoneCallBack(std::move(client_)); + callback->client().limit(*callback, getDomain(), descriptors, Tracing::NullSpan::instance(), + absl::nullopt, getHitAddend()); + } } } @@ -256,9 +280,10 @@ void Filter::complete(Filters::Common::RateLimit::LimitStatus status, } } -void Filter::populateRateLimitDescriptors(const Router::RateLimitPolicy& rate_limit_policy, - std::vector& descriptors, - const Http::RequestHeaderMap& headers) const { +void Filter::populateRateLimitDescriptorsForPolicy(const Router::RateLimitPolicy& rate_limit_policy, + std::vector& descriptors, + const Http::RequestHeaderMap& headers, + bool on_stream_done) { for (const Router::RateLimitPolicyEntry& rate_limit : rate_limit_policy.getApplicableRateLimit(config_->stage())) { const std::string& disable_key = rate_limit.disableKey(); @@ -267,8 +292,11 @@ void Filter::populateRateLimitDescriptors(const Router::RateLimitPolicy& rate_li fmt::format("ratelimit.{}.http_filter_enabled", disable_key), 100)) { continue; } - rate_limit.populateDescriptors(descriptors, config_->localInfo().clusterName(), headers, - callbacks_->streamInfo()); + const bool apply_on_stream_done = rate_limit.applyOnStreamDone(); + if (on_stream_done == apply_on_stream_done) { + rate_limit.populateDescriptors(descriptors, config_->localInfo().clusterName(), headers, + callbacks_->streamInfo()); + } } } @@ -296,8 +324,8 @@ void Filter::appendRequestHeaders(Http::HeaderMapPtr& request_headers_to_add) { } } -VhRateLimitOptions Filter::getVirtualHostRateLimitOption(const Router::RouteConstSharedPtr& route) { - if (route->routeEntry()->includeVirtualHostRateLimits()) { +void Filter::initializeVirtualHostRateLimitOption(const Router::RouteEntry* route_entry) { + if (route_entry->includeVirtualHostRateLimits()) { vh_rate_limits_ = VhRateLimitOptions::Include; } else { const auto* specific_per_route_config = @@ -318,7 +346,6 @@ VhRateLimitOptions Filter::getVirtualHostRateLimitOption(const Router::RouteCons vh_rate_limits_ = VhRateLimitOptions::Override; } } - return vh_rate_limits_; } std::string Filter::getDomain() { @@ -330,6 +357,14 @@ std::string Filter::getDomain() { return config_->domain(); } +void OnStreamDoneCallBack::complete(Filters::Common::RateLimit::LimitStatus, + Filters::Common::RateLimit::DescriptorStatusListPtr&&, + Http::ResponseHeaderMapPtr&&, Http::RequestHeaderMapPtr&&, + const std::string&, + Filters::Common::RateLimit::DynamicMetadataPtr&&) { + delete this; +} + } // namespace RateLimitFilter } // namespace HttpFilters } // namespace Extensions diff --git a/source/extensions/filters/http/ratelimit/ratelimit.h b/source/extensions/filters/http/ratelimit/ratelimit.h index 889e79a95957..60b110a50584 100644 --- a/source/extensions/filters/http/ratelimit/ratelimit.h +++ b/source/extensions/filters/http/ratelimit/ratelimit.h @@ -185,12 +185,16 @@ class Filter : public Http::StreamFilter, public Filters::Common::RateLimit::Req private: void initiateCall(const Http::RequestHeaderMap& headers); - void populateRateLimitDescriptors(const Router::RateLimitPolicy& rate_limit_policy, - std::vector& descriptors, - const Http::RequestHeaderMap& headers) const; + void populateRateLimitDescriptors(std::vector& descriptors, + const Http::RequestHeaderMap& headers, bool on_stream_done); + void populateRateLimitDescriptorsForPolicy(const Router::RateLimitPolicy& rate_limit_policy, + std::vector& descriptors, + const Http::RequestHeaderMap& headers, + bool on_stream_done); void populateResponseHeaders(Http::HeaderMap& response_headers, bool from_local_reply); void appendRequestHeaders(Http::HeaderMapPtr& request_headers_to_add); - VhRateLimitOptions getVirtualHostRateLimitOption(const Router::RouteConstSharedPtr& route); + double getHitAddend(); + void initializeVirtualHostRateLimitOption(const Router::RouteEntry* route_entry); std::string getDomain(); Http::Context& httpContext() { return config_->httpContext(); } @@ -203,11 +207,33 @@ class Filter : public Http::StreamFilter, public Filters::Common::RateLimit::Req State state_{State::NotStarted}; VhRateLimitOptions vh_rate_limits_; Upstream::ClusterInfoConstSharedPtr cluster_; + Router::RouteConstSharedPtr route_ = nullptr; bool initiating_call_{}; Http::ResponseHeaderMapPtr response_headers_to_add_; Http::RequestHeaderMap* request_headers_{}; }; +/** + * This implements the rate limit callback that outlives the filter holding the client. + * On completion, it deletes itself. + */ +class OnStreamDoneCallBack : public Filters::Common::RateLimit::RequestCallbacks { +public: + OnStreamDoneCallBack(Filters::Common::RateLimit::ClientPtr client) : client_(std::move(client)) {} + ~OnStreamDoneCallBack() override = default; + + // RateLimit::RequestCallbacks + void complete(Filters::Common::RateLimit::LimitStatus, + Filters::Common::RateLimit::DescriptorStatusListPtr&&, Http::ResponseHeaderMapPtr&&, + Http::RequestHeaderMapPtr&&, const std::string&, + Filters::Common::RateLimit::DynamicMetadataPtr&&) override; + + Filters::Common::RateLimit::Client& client() { return *client_; } + +private: + Filters::Common::RateLimit::ClientPtr client_; +}; + } // namespace RateLimitFilter } // namespace HttpFilters } // namespace Extensions diff --git a/test/extensions/filters/common/ratelimit/mocks.h b/test/extensions/filters/common/ratelimit/mocks.h index 5155d335f057..259fa3f08881 100644 --- a/test/extensions/filters/common/ratelimit/mocks.h +++ b/test/extensions/filters/common/ratelimit/mocks.h @@ -26,7 +26,7 @@ class MockClient : public Client { MOCK_METHOD(void, limit, (RequestCallbacks & callbacks, const std::string& domain, const std::vector& descriptors, - Tracing::Span& parent_span, const StreamInfo::StreamInfo& stream_info, + Tracing::Span& parent_span, OptRef stream_info, uint32_t hits_addend)); }; diff --git a/test/extensions/filters/http/ratelimit/ratelimit_test.cc b/test/extensions/filters/http/ratelimit/ratelimit_test.cc index 6caa81e8eb5f..63f14004ef3d 100644 --- a/test/extensions/filters/http/ratelimit/ratelimit_test.cc +++ b/test/extensions/filters/http/ratelimit/ratelimit_test.cc @@ -265,15 +265,13 @@ TEST_F(HttpRateLimitFilterTest, OkResponseWithAdditionalHitsAddend) { filter_callbacks_.stream_info_.filter_state_->setData( "envoy.ratelimit.hits_addend", std::make_unique(5), - StreamInfo::FilterState::StateType::ReadOnly); + StreamInfo::FilterState::StateType::Mutable); EXPECT_CALL(filter_callbacks_.route_->route_entry_.rate_limit_policy_, getApplicableRateLimit(0)); EXPECT_CALL(route_rate_limit_, populateDescriptors(_, _, _, _)) .WillOnce(SetArgReferee<0>(descriptor_)); - EXPECT_CALL(filter_callbacks_.route_->virtual_host_.rate_limit_policy_, - getApplicableRateLimit(0)); - + EXPECT_CALL(vh_rate_limit_, applyOnStreamDone()).WillRepeatedly(Return(true)); EXPECT_CALL(*client_, limit(_, "foo", testing::ContainerEq(std::vector{ {{{"descriptor_key", "descriptor_value"}}}}), @@ -304,6 +302,30 @@ TEST_F(HttpRateLimitFilterTest, OkResponseWithAdditionalHitsAddend) { EXPECT_EQ( 1U, filter_callbacks_.clusterInfo()->statsScope().counterFromStatName(ratelimit_ok_).value()); + + // Test the behavior for the apply_on_stream_done flag. + testing::Mock::VerifyAndClearExpectations(client_); + testing::Mock::VerifyAndClearExpectations(&filter_callbacks_); + testing::Mock::VerifyAndClearExpectations( + &filter_callbacks_.route_->route_entry_.rate_limit_policy_); + testing::Mock::VerifyAndClearExpectations(&route_rate_limit_); + testing::Mock::VerifyAndClearExpectations(&vh_rate_limit_); + filter_callbacks_.stream_info_.filter_state_->setData( + // Ensures that addend can be set differently than the request path. + "envoy.ratelimit.hits_addend", std::make_unique(100), + StreamInfo::FilterState::StateType::Mutable); + EXPECT_CALL(filter_callbacks_.route_->route_entry_.rate_limit_policy_, getApplicableRateLimit(0)); + EXPECT_CALL(vh_rate_limit_, applyOnStreamDone()).WillRepeatedly(Return(true)); + EXPECT_CALL(vh_rate_limit_, populateDescriptors(_, _, _, _)) + .WillOnce(SetArgReferee<0>(descriptor_two_)); + EXPECT_CALL(*client_, limit(_, "foo", testing::ContainerEq(descriptor_two_), _, _, 100)) + .WillOnce( + WithArgs<0>(Invoke([&](Filters::Common::RateLimit::RequestCallbacks& callbacks) -> void { + request_callbacks_ = &callbacks; + }))); + filter_->onDestroy(); + request_callbacks_->complete(Filters::Common::RateLimit::LimitStatus::OK, nullptr, nullptr, + nullptr, "", nullptr); } TEST_F(HttpRateLimitFilterTest, OkResponseWithHeaders) { diff --git a/test/mocks/router/mocks.h b/test/mocks/router/mocks.h index 9c21895dcad9..e4e3ac409357 100644 --- a/test/mocks/router/mocks.h +++ b/test/mocks/router/mocks.h @@ -250,6 +250,7 @@ class MockRateLimitPolicyEntry : public RateLimitPolicyEntry { const std::string& local_service_cluster, const Http::RequestHeaderMap& headers, const StreamInfo::StreamInfo& info), (const)); + MOCK_METHOD(bool, applyOnStreamDone, (), (const)); uint64_t stage_{}; std::string disable_key_;