Skip to content

Commit

Permalink
BucketId Dynamic Metadata for RateLimitQuota filter (envoyproxy#35594)
Browse files Browse the repository at this point in the history
Commit Message: Add dynamic metadata to the rate_limit_quota filter to
export the selected bucket for a given request for logging
Additional Description: Functionality tested using an access log filter
in integration testing
Risk Level: low
Testing: integration testing
Docs Changes:
Release Notes:
Platform Specific Features:

---------

Signed-off-by: Brian Surber <[email protected]>
  • Loading branch information
bsurber authored Aug 7, 2024
1 parent d283802 commit c9fd85f
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 7 deletions.
19 changes: 14 additions & 5 deletions source/extensions/filters/http/rate_limit_quota/filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ namespace Extensions {
namespace HttpFilters {
namespace RateLimitQuota {

const char kBucketMetadataNamespace[] = "envoy.extensions.http_filters.rate_limit_quota.bucket";

Http::FilterHeadersStatus RateLimitQuotaFilter::decodeHeaders(Http::RequestHeaderMap& headers,
bool end_stream) {
ENVOY_LOG(trace, "decodeHeaders: end_stream = {}", end_stream);
Expand All @@ -28,27 +30,34 @@ Http::FilterHeadersStatus RateLimitQuotaFilter::decodeHeaders(Http::RequestHeade
// succeeds.
const RateLimitOnMatchAction& match_action =
match_result.value()->getTyped<RateLimitOnMatchAction>();
auto ret = match_action.generateBucketId(*data_ptr_, factory_context_, visitor_);
absl::StatusOr ret = match_action.generateBucketId(*data_ptr_, factory_context_, visitor_);
if (!ret.ok()) {
// When it failed to generate the bucket id for this specific request, the request is ALLOWED by
// default (i.e., fail-open).
ENVOY_LOG(debug, "Unable to generate the bucket id: {}", ret.status().message());
return Envoy::Http::FilterHeadersStatus::Continue;
}

BucketId bucket_id_proto = ret.value();
const BucketId& bucket_id_proto = *ret;
const size_t bucket_id = MessageUtil::hash(bucket_id_proto);
ENVOY_LOG(trace, "Generated the associated hashed bucket id: {} for bucket id proto:\n {}",
bucket_id, bucket_id_proto.DebugString());

ProtobufWkt::Struct bucket_log;
auto* bucket_log_fields = bucket_log.mutable_fields();
for (const auto& bucket : bucket_id_proto.bucket())
(*bucket_log_fields)[bucket.first] = ValueUtil::stringValue(bucket.second);

callbacks_->streamInfo().setDynamicMetadata(kBucketMetadataNamespace, bucket_log);

if (quota_buckets_.find(bucket_id) == quota_buckets_.end()) {
// For first matched request, create a new bucket in the cache and sent the report to RLQS
// server immediately.
createNewBucket(bucket_id_proto, match_action, bucket_id);
return sendImmediateReport(bucket_id, match_action);
} else {
// Found the cached bucket entry.
return processCachedBucket(bucket_id, match_action);
}

return processCachedBucket(bucket_id, match_action);
}

void RateLimitQuotaFilter::createMatcher() {
Expand Down
61 changes: 59 additions & 2 deletions test/extensions/filters/http/rate_limit_quota/integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,18 @@ class RateLimitQuotaIntegrationTest
}
}

void initializeConfig(ConfigOption config_option = {}) {
config_helper_.addConfigModifier([this, config_option](
void initializeConfig(ConfigOption config_option = {}, const std::string& log_format = "") {
config_helper_.addConfigModifier([this, config_option, log_format](
envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
// Ensure "HTTP2 with no prior knowledge." Necessary for gRPC and for headers
ConfigHelper::setHttp2(
*(bootstrap.mutable_static_resources()->mutable_clusters()->Mutable(0)));

// Enable access logging for testing dynamic metadata.
if (!log_format.empty()) {
HttpIntegrationTest::useAccessLog(log_format);
}

// Clusters for ExtProc gRPC servers, starting by copying an existing cluster
for (size_t i = 0; i < grpc_upstreams_.size(); ++i) {
auto* server_cluster = bootstrap.mutable_static_resources()->add_clusters();
Expand Down Expand Up @@ -298,6 +303,58 @@ TEST_P(RateLimitQuotaIntegrationTest, BasicFlowResponseMatched) {
EXPECT_EQ(response_->headers().getStatusValue(), "200");
}

TEST_P(RateLimitQuotaIntegrationTest, TestBasicMetadataLogging) {
initializeConfig({}, "Whole Bucket "
"ID=%DYNAMIC_METADATA(envoy.extensions.http_filters.rate_"
"limit_quota.bucket)%\n"
"Name=%DYNAMIC_METADATA(envoy.extensions.http_filters.rate_"
"limit_quota.bucket:name)%");
HttpIntegrationTest::initialize();
absl::flat_hash_map<std::string, std::string> custom_headers = {{"environment", "staging"},
{"group", "envoy"}};
// Send downstream client request to upstream.
sendClientRequest(&custom_headers);

// Start the gRPC stream to RLQS server.
ASSERT_TRUE(grpc_upstreams_[0]->waitForHttpConnection(*dispatcher_, rlqs_connection_));
ASSERT_TRUE(rlqs_connection_->waitForNewStream(*dispatcher_, rlqs_stream_));
envoy::service::rate_limit_quota::v3::RateLimitQuotaUsageReports reports;
ASSERT_TRUE(rlqs_stream_->waitForGrpcMessage(*dispatcher_, reports));
rlqs_stream_->startGrpcStream();

// Build the response whose bucket ID matches the sent report.
envoy::service::rate_limit_quota::v3::RateLimitQuotaResponse rlqs_response;
custom_headers.insert({"name", "prod"});
auto* bucket_action = rlqs_response.add_bucket_action();
for (const auto& [key, value] : custom_headers) {
(*bucket_action->mutable_bucket_id()->mutable_bucket()).insert({key, value});
}
// Send the response from RLQS server.
rlqs_stream_->sendGrpcMessage(rlqs_response);

// Handle the request received by upstream.
ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_));
ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_));
ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_));
upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false);
upstream_request_->encodeData(100, true);

// Verify the response to downstream.
ASSERT_TRUE(response_->waitForEndStream());
EXPECT_TRUE(response_->complete());
EXPECT_EQ(response_->headers().getStatusValue(), "200");

std::string log_output0 =
HttpIntegrationTest::waitForAccessLog(HttpIntegrationTest::access_log_name_, 0, true);
EXPECT_THAT(log_output0, testing::HasSubstr("Whole Bucket ID"));
EXPECT_THAT(log_output0, testing::HasSubstr("\"name\":\"prod\""));
EXPECT_THAT(log_output0, testing::HasSubstr("\"group\":\"envoy\""));
EXPECT_THAT(log_output0, testing::HasSubstr("\"environment\":\"staging\""));
std::string log_output1 =
HttpIntegrationTest::waitForAccessLog(HttpIntegrationTest::access_log_name_, 1, true);
EXPECT_THAT(log_output1, testing::HasSubstr("Name=prod"));
}

TEST_P(RateLimitQuotaIntegrationTest, BasicFlowMultiSameRequest) {
initializeConfig();
HttpIntegrationTest::initialize();
Expand Down

0 comments on commit c9fd85f

Please sign in to comment.