Skip to content

Commit

Permalink
Change rate limit to use new AsyncClientManagerAPI (#29705)
Browse files Browse the repository at this point in the history
Signed-off-by: AlanDiaz <[email protected]>
  • Loading branch information
DiazAlan authored Sep 21, 2023
1 parent 287a42f commit 7850edd
Show file tree
Hide file tree
Showing 16 changed files with 62 additions and 38 deletions.
6 changes: 3 additions & 3 deletions source/extensions/filters/common/ratelimit/ratelimit_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,13 @@ void GrpcClientImpl::onFailure(Grpc::Status::GrpcStatus status, const std::strin
}

ClientPtr rateLimitClient(Server::Configuration::FactoryContext& context,
const envoy::config::core::v3::GrpcService& grpc_service,
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
const std::chrono::milliseconds timeout) {
// TODO(ramaraochavali): register client to singleton when GrpcClientImpl supports concurrent
// requests.
return std::make_unique<Filters::Common::RateLimit::GrpcClientImpl>(
context.clusterManager().grpcAsyncClientManager().getOrCreateRawAsyncClient(
grpc_service, context.scope(), true),
context.clusterManager().grpcAsyncClientManager().getOrCreateRawAsyncClientWithHashKey(
config_with_hash_key, context.scope(), true),
timeout);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class GrpcClientImpl : public Client,
* Builds the rate limit client.
*/
ClientPtr rateLimitClient(Server::Configuration::FactoryContext& context,
const envoy::config::core::v3::GrpcService& grpc_service,
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
const std::chrono::milliseconds timeout);

} // namespace RateLimit
Expand Down
2 changes: 2 additions & 0 deletions source/extensions/filters/http/rate_limit_quota/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ envoy_cc_library(
":client_lib",
":matcher_lib",
":quota_bucket_cache",
"//envoy/grpc:async_client_manager_interface",
"//envoy/registry",
"//source/common/http:headers_lib",
"//source/common/http:message_lib",
Expand All @@ -39,6 +40,7 @@ envoy_cc_extension(
deps = [
":client_interface",
":rate_limit_quota",
"//envoy/grpc:async_client_manager_interface",
"//envoy/registry",
"//source/extensions/filters/http/common:factory_base_lib",
"@envoy_api//envoy/extensions/filters/http/rate_limit_quota/v3:pkg_cc_proto",
Expand Down
1 change: 1 addition & 0 deletions source/extensions/filters/http/rate_limit_quota/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "envoy/common/pure.h"
#include "envoy/extensions/filters/http/rate_limit_quota/v3/rate_limit_quota.pb.h"
#include "envoy/grpc/async_client_manager.h"
#include "envoy/service/rate_limit_quota/v3/rlqs.pb.h"
#include "envoy/stream_info/stream_info.h"

Expand Down
15 changes: 8 additions & 7 deletions source/extensions/filters/http/rate_limit_quota/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ class RateLimitClientImpl : public RateLimitClient,
envoy::service::rate_limit_quota::v3::RateLimitQuotaResponse>,
public Logger::Loggable<Logger::Id::rate_limit_quota> {
public:
RateLimitClientImpl(const envoy::config::core::v3::GrpcService& grpc_service,
RateLimitClientImpl(const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
Server::Configuration::FactoryContext& context, absl::string_view domain_name,
RateLimitQuotaCallbacks* callbacks, BucketsCache& quota_buckets)
: domain_name_(domain_name),
aync_client_(context.clusterManager().grpcAsyncClientManager().getOrCreateRawAsyncClient(
grpc_service, context.scope(), true)),
aync_client_(
context.clusterManager().grpcAsyncClientManager().getOrCreateRawAsyncClientWithHashKey(
config_with_hash_key, context.scope(), true)),
rlqs_callback_(callbacks), quota_buckets_(quota_buckets),
time_source_(context.mainThreadDispatcher().timeSource()) {}

Expand Down Expand Up @@ -83,11 +84,11 @@ using RateLimitClientPtr = std::unique_ptr<RateLimitClientImpl>;
*/
inline RateLimitClientPtr
createRateLimitClient(Server::Configuration::FactoryContext& context,
const envoy::config::core::v3::GrpcService& grpc_service,
RateLimitQuotaCallbacks* callbacks, BucketsCache& quota_buckets,
absl::string_view domain_name) {
return std::make_unique<RateLimitClientImpl>(grpc_service, context, domain_name, callbacks,
quota_buckets);
absl::string_view domain_name,
Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key) {
return std::make_unique<RateLimitClientImpl>(config_with_hash_key, context, domain_name,
callbacks, quota_buckets);
}

} // namespace RateLimitQuota
Expand Down
9 changes: 5 additions & 4 deletions source/extensions/filters/http/rate_limit_quota/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ Http::FilterFactoryCb RateLimitQuotaFilterFactory::createFilterFactoryFromProtoT

// Quota bucket TLS object is created on the main thread and shared between worker threads.
std::shared_ptr<QuotaBucket> bucket_cache = std::make_shared<QuotaBucket>(context);

return [config = std::move(config), &context, bucket_cache = std::move(bucket_cache)](
Http::FilterChainFactoryCallbacks& callbacks) -> void {
Grpc::GrpcServiceConfigWithHashKey config_with_hash_key =
Grpc::GrpcServiceConfigWithHashKey(config->rlqs_server());
return [config = std::move(config), &context, bucket_cache = std::move(bucket_cache),
config_with_hash_key](Http::FilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addStreamFilter(std::make_shared<RateLimitQuotaFilter>(
config, context, bucket_cache->tls.get()->quotaBuckets(),
bucket_cache->tls.get()->rateLimitClient()));
bucket_cache->tls.get()->rateLimitClient(), config_with_hash_key));
};
}

Expand Down
4 changes: 2 additions & 2 deletions source/extensions/filters/http/rate_limit_quota/filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ RateLimitQuotaFilter::sendImmediateReport(const size_t bucket_id,

// Create the gRPC client if it has not been created.
if (client_.rate_limit_client == nullptr) {
client_.rate_limit_client = createRateLimitClient(factory_context_, config_->rlqs_server(),
this, quota_buckets_, config_->domain());
client_.rate_limit_client = createRateLimitClient(factory_context_, this, quota_buckets_,
config_->domain(), config_with_hash_key_);
} else {
// Callback has been reset to nullptr when filter was destroyed last time.
// Reset it here when new filter has been created.
Expand Down
9 changes: 6 additions & 3 deletions source/extensions/filters/http/rate_limit_quota/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include "envoy/extensions/filters/http/rate_limit_quota/v3/rate_limit_quota.pb.h"
#include "envoy/extensions/filters/http/rate_limit_quota/v3/rate_limit_quota.pb.validate.h"
#include "envoy/grpc/async_client_manager.h"
#include "envoy/registry/registry.h"
#include "envoy/service/rate_limit_quota/v3/rlqs.pb.h"
#include "envoy/service/rate_limit_quota/v3/rlqs.pb.validate.h"
Expand Down Expand Up @@ -48,9 +49,10 @@ class RateLimitQuotaFilter : public Http::PassThroughFilter,
public:
RateLimitQuotaFilter(FilterConfigConstSharedPtr config,
Server::Configuration::FactoryContext& factory_context,
BucketsCache& quota_buckets, ThreadLocalClient& client)
: config_(std::move(config)), factory_context_(factory_context),
quota_buckets_(quota_buckets), client_(client),
BucketsCache& quota_buckets, ThreadLocalClient& client,
Grpc::GrpcServiceConfigWithHashKey config_with_hash_key)
: config_(std::move(config)), config_with_hash_key_(config_with_hash_key),
factory_context_(factory_context), quota_buckets_(quota_buckets), client_(client),
time_source_(factory_context.mainThreadDispatcher().timeSource()) {
createMatcher();
}
Expand Down Expand Up @@ -91,6 +93,7 @@ class RateLimitQuotaFilter : public Http::PassThroughFilter,
const RateLimitOnMatchAction& match_action);

FilterConfigConstSharedPtr config_;
Grpc::GrpcServiceConfigWithHashKey config_with_hash_key_;
Server::Configuration::FactoryContext& factory_context_;
Http::StreamDecoderFilterCallbacks* callbacks_ = nullptr;
RateLimitQuotaValidationVisitor visitor_ = {};
Expand Down
8 changes: 5 additions & 3 deletions source/extensions/filters/http/ratelimit/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ Http::FilterFactoryCb RateLimitFilterConfig::createFilterFactoryFromProtoTyped(
std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(proto_config, timeout, 20));

THROW_IF_NOT_OK(Config::Utility::checkTransportVersion(proto_config.rate_limit_service()));
return [proto_config, &context, timeout,
Grpc::GrpcServiceConfigWithHashKey config_with_hash_key =
Grpc::GrpcServiceConfigWithHashKey(proto_config.rate_limit_service().grpc_service());
return [config_with_hash_key, &context, timeout,
filter_config](Http::FilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addStreamFilter(std::make_shared<Filter>(
filter_config, Filters::Common::RateLimit::rateLimitClient(
context, proto_config.rate_limit_service().grpc_service(), timeout)));
filter_config,
Filters::Common::RateLimit::rateLimitClient(context, config_with_hash_key, timeout)));
};
}

Expand Down
9 changes: 6 additions & 3 deletions source/extensions/filters/network/ratelimit/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@ Network::FilterFactoryCb RateLimitConfigFactory::createFilterFactoryFromProtoTyp
ConfigSharedPtr filter_config(new Config(proto_config, context.scope(), context.runtime()));
const std::chrono::milliseconds timeout =
std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(proto_config, timeout, 20));

THROW_IF_NOT_OK(Envoy::Config::Utility::checkTransportVersion(proto_config.rate_limit_service()));
return [proto_config, &context, timeout,
Grpc::GrpcServiceConfigWithHashKey config_with_hash_key =
Grpc::GrpcServiceConfigWithHashKey(proto_config.rate_limit_service().grpc_service());
return [config_with_hash_key, &context, timeout,
filter_config](Network::FilterManager& filter_manager) -> void {
filter_manager.addReadFilter(std::make_shared<Filter>(
filter_config, Filters::Common::RateLimit::rateLimitClient(
context, proto_config.rate_limit_service().grpc_service(), timeout)));
filter_config,
Filters::Common::RateLimit::rateLimitClient(context, config_with_hash_key, timeout)));
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ RateLimitFilterConfig::createFilterFactoryFromProtoTyped(
std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(proto_config, timeout, 20));

THROW_IF_NOT_OK(Envoy::Config::Utility::checkTransportVersion(proto_config.rate_limit_service()));
return [proto_config, &context, timeout,
Grpc::GrpcServiceConfigWithHashKey config_with_hash_key =
Grpc::GrpcServiceConfigWithHashKey(proto_config.rate_limit_service().grpc_service());
return [config_with_hash_key, &context, timeout,
config](ThriftProxy::ThriftFilters::FilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addDecoderFilter(std::make_shared<Filter>(
config, Filters::Common::RateLimit::rateLimitClient(
context, proto_config.rate_limit_service().grpc_service(), timeout)));
config,
Filters::Common::RateLimit::rateLimitClient(context, config_with_hash_key, timeout)));
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,15 @@ class RateLimitTestClient {
.WillRepeatedly(Invoke(this, &RateLimitTestClient::mockCreateAsyncClient));
} else {
EXPECT_CALL(context.cluster_manager_.async_client_manager_,
getOrCreateRawAsyncClient(_, _, _))
getOrCreateRawAsyncClientWithHashKey(_, _, _))
.WillOnce(Invoke(this, &RateLimitTestClient::mockCreateAsyncClient));
}

client_ = createRateLimitClient(context, grpc_service, &callbacks_, bucket_cache_, domain_);
Grpc::GrpcServiceConfigWithHashKey config_with_hash_key =
Grpc::GrpcServiceConfigWithHashKey(grpc_service);

client_ =
createRateLimitClient(context, &callbacks_, bucket_cache_, domain_, config_with_hash_key);
}

Grpc::RawAsyncClientSharedPtr mockCreateAsyncClient(Unused, Unused, Unused) {
Expand Down
4 changes: 3 additions & 1 deletion test/extensions/filters/http/rate_limit_quota/filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,10 @@ class FilterTest : public testing::Test {

void createFilter(bool set_callback = true) {
filter_config_ = std::make_shared<FilterConfig>(config_);
Grpc::GrpcServiceConfigWithHashKey config_with_hash_key =
Grpc::GrpcServiceConfigWithHashKey(filter_config_->rlqs_server());
filter_ = std::make_unique<RateLimitQuotaFilter>(filter_config_, context_, bucket_cache_,
thread_local_client_);
thread_local_client_, config_with_hash_key);
if (set_callback) {
filter_->setDecoderFilterCallbacks(decoder_callbacks_);
}
Expand Down
5 changes: 3 additions & 2 deletions test/extensions/filters/http/ratelimit/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ TEST(RateLimitFilterConfigTest, RatelimitCorrectProto) {

NiceMock<Server::Configuration::MockFactoryContext> context;

EXPECT_CALL(context.cluster_manager_.async_client_manager_, getOrCreateRawAsyncClient(_, _, _))
.WillOnce(Invoke([](const envoy::config::core::v3::GrpcService&, Stats::Scope&, bool) {
EXPECT_CALL(context.cluster_manager_.async_client_manager_,
getOrCreateRawAsyncClientWithHashKey(_, _, _))
.WillOnce(Invoke([](const Grpc::GrpcServiceConfigWithHashKey&, Stats::Scope&, bool) {
return std::make_unique<NiceMock<Grpc::MockAsyncClient>>();
}));

Expand Down
5 changes: 3 additions & 2 deletions test/extensions/filters/network/ratelimit/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ TEST(RateLimitFilterConfigTest, CorrectProto) {

NiceMock<Server::Configuration::MockFactoryContext> context;

EXPECT_CALL(context.cluster_manager_.async_client_manager_, getOrCreateRawAsyncClient(_, _, _))
.WillOnce(Invoke([](const envoy::config::core::v3::GrpcService&, Stats::Scope&, bool) {
EXPECT_CALL(context.cluster_manager_.async_client_manager_,
getOrCreateRawAsyncClientWithHashKey(_, _, _))
.WillOnce(Invoke([](const Grpc::GrpcServiceConfigWithHashKey&, Stats::Scope&, bool) {
return std::make_unique<NiceMock<Grpc::MockAsyncClient>>();
}));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ timeout: "1.337s"

NiceMock<Server::Configuration::MockFactoryContext> context;

EXPECT_CALL(context.cluster_manager_.async_client_manager_, getOrCreateRawAsyncClient(_, _, _))
.WillOnce(Invoke([](const envoy::config::core::v3::GrpcService&, Stats::Scope&, bool) {
EXPECT_CALL(context.cluster_manager_.async_client_manager_,
getOrCreateRawAsyncClientWithHashKey(_, _, _))
.WillOnce(Invoke([](const Grpc::GrpcServiceConfigWithHashKey&, Stats::Scope&, bool) {
return std::make_unique<NiceMock<Grpc::MockAsyncClient>>();
}));

Expand Down

0 comments on commit 7850edd

Please sign in to comment.