From 2f8877951f0235f34987440eb4f06e2916bd2fb9 Mon Sep 17 00:00:00 2001 From: bsurber <73970703+bsurber@users.noreply.github.com> Date: Wed, 7 Aug 2024 15:26:19 -0700 Subject: [PATCH] BucketId Dynamic Metadata for RateLimitQuota filter (#35594) Commit Message: Add dynamic metadata to the rate_limit_quota filter to export the selected bucket for a given request for logging Additional Description: Functionality tested using an access log filter in integration testing Risk Level: low Testing: integration testing Docs Changes: Release Notes: Platform Specific Features: --------- Signed-off-by: Brian Surber Signed-off-by: Martin Duke --- .../filters/http/rate_limit_quota/filter.cc | 19 ++++-- .../http/rate_limit_quota/integration_test.cc | 61 ++++++++++++++++++- 2 files changed, 73 insertions(+), 7 deletions(-) diff --git a/source/extensions/filters/http/rate_limit_quota/filter.cc b/source/extensions/filters/http/rate_limit_quota/filter.cc index 28600208658c9..60eede6633226 100644 --- a/source/extensions/filters/http/rate_limit_quota/filter.cc +++ b/source/extensions/filters/http/rate_limit_quota/filter.cc @@ -9,6 +9,8 @@ namespace Extensions { namespace HttpFilters { namespace RateLimitQuota { +const char kBucketMetadataNamespace[] = "envoy.extensions.http_filters.rate_limit_quota.bucket"; + Http::FilterHeadersStatus RateLimitQuotaFilter::decodeHeaders(Http::RequestHeaderMap& headers, bool end_stream) { ENVOY_LOG(trace, "decodeHeaders: end_stream = {}", end_stream); @@ -28,7 +30,7 @@ Http::FilterHeadersStatus RateLimitQuotaFilter::decodeHeaders(Http::RequestHeade // succeeds. const RateLimitOnMatchAction& match_action = match_result.value()->getTyped(); - auto ret = match_action.generateBucketId(*data_ptr_, factory_context_, visitor_); + absl::StatusOr ret = match_action.generateBucketId(*data_ptr_, factory_context_, visitor_); if (!ret.ok()) { // When it failed to generate the bucket id for this specific request, the request is ALLOWED by // default (i.e., fail-open). @@ -36,19 +38,26 @@ Http::FilterHeadersStatus RateLimitQuotaFilter::decodeHeaders(Http::RequestHeade return Envoy::Http::FilterHeadersStatus::Continue; } - BucketId bucket_id_proto = ret.value(); + const BucketId& bucket_id_proto = *ret; const size_t bucket_id = MessageUtil::hash(bucket_id_proto); ENVOY_LOG(trace, "Generated the associated hashed bucket id: {} for bucket id proto:\n {}", bucket_id, bucket_id_proto.DebugString()); + + ProtobufWkt::Struct bucket_log; + auto* bucket_log_fields = bucket_log.mutable_fields(); + for (const auto& bucket : bucket_id_proto.bucket()) + (*bucket_log_fields)[bucket.first] = ValueUtil::stringValue(bucket.second); + + callbacks_->streamInfo().setDynamicMetadata(kBucketMetadataNamespace, bucket_log); + if (quota_buckets_.find(bucket_id) == quota_buckets_.end()) { // For first matched request, create a new bucket in the cache and sent the report to RLQS // server immediately. createNewBucket(bucket_id_proto, match_action, bucket_id); return sendImmediateReport(bucket_id, match_action); - } else { - // Found the cached bucket entry. - return processCachedBucket(bucket_id, match_action); } + + return processCachedBucket(bucket_id, match_action); } void RateLimitQuotaFilter::createMatcher() { diff --git a/test/extensions/filters/http/rate_limit_quota/integration_test.cc b/test/extensions/filters/http/rate_limit_quota/integration_test.cc index d342e3aca00cd..5af41c95c49b7 100644 --- a/test/extensions/filters/http/rate_limit_quota/integration_test.cc +++ b/test/extensions/filters/http/rate_limit_quota/integration_test.cc @@ -50,13 +50,18 @@ class RateLimitQuotaIntegrationTest } } - void initializeConfig(ConfigOption config_option = {}) { - config_helper_.addConfigModifier([this, config_option]( + void initializeConfig(ConfigOption config_option = {}, const std::string& log_format = "") { + config_helper_.addConfigModifier([this, config_option, log_format]( envoy::config::bootstrap::v3::Bootstrap& bootstrap) { // Ensure "HTTP2 with no prior knowledge." Necessary for gRPC and for headers ConfigHelper::setHttp2( *(bootstrap.mutable_static_resources()->mutable_clusters()->Mutable(0))); + // Enable access logging for testing dynamic metadata. + if (!log_format.empty()) { + HttpIntegrationTest::useAccessLog(log_format); + } + // Clusters for ExtProc gRPC servers, starting by copying an existing cluster for (size_t i = 0; i < grpc_upstreams_.size(); ++i) { auto* server_cluster = bootstrap.mutable_static_resources()->add_clusters(); @@ -298,6 +303,58 @@ TEST_P(RateLimitQuotaIntegrationTest, BasicFlowResponseMatched) { EXPECT_EQ(response_->headers().getStatusValue(), "200"); } +TEST_P(RateLimitQuotaIntegrationTest, TestBasicMetadataLogging) { + initializeConfig({}, "Whole Bucket " + "ID=%DYNAMIC_METADATA(envoy.extensions.http_filters.rate_" + "limit_quota.bucket)%\n" + "Name=%DYNAMIC_METADATA(envoy.extensions.http_filters.rate_" + "limit_quota.bucket:name)%"); + HttpIntegrationTest::initialize(); + absl::flat_hash_map custom_headers = {{"environment", "staging"}, + {"group", "envoy"}}; + // Send downstream client request to upstream. + sendClientRequest(&custom_headers); + + // Start the gRPC stream to RLQS server. + ASSERT_TRUE(grpc_upstreams_[0]->waitForHttpConnection(*dispatcher_, rlqs_connection_)); + ASSERT_TRUE(rlqs_connection_->waitForNewStream(*dispatcher_, rlqs_stream_)); + envoy::service::rate_limit_quota::v3::RateLimitQuotaUsageReports reports; + ASSERT_TRUE(rlqs_stream_->waitForGrpcMessage(*dispatcher_, reports)); + rlqs_stream_->startGrpcStream(); + + // Build the response whose bucket ID matches the sent report. + envoy::service::rate_limit_quota::v3::RateLimitQuotaResponse rlqs_response; + custom_headers.insert({"name", "prod"}); + auto* bucket_action = rlqs_response.add_bucket_action(); + for (const auto& [key, value] : custom_headers) { + (*bucket_action->mutable_bucket_id()->mutable_bucket()).insert({key, value}); + } + // Send the response from RLQS server. + rlqs_stream_->sendGrpcMessage(rlqs_response); + + // Handle the request received by upstream. + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + upstream_request_->encodeData(100, true); + + // Verify the response to downstream. + ASSERT_TRUE(response_->waitForEndStream()); + EXPECT_TRUE(response_->complete()); + EXPECT_EQ(response_->headers().getStatusValue(), "200"); + + std::string log_output0 = + HttpIntegrationTest::waitForAccessLog(HttpIntegrationTest::access_log_name_, 0, true); + EXPECT_THAT(log_output0, testing::HasSubstr("Whole Bucket ID")); + EXPECT_THAT(log_output0, testing::HasSubstr("\"name\":\"prod\"")); + EXPECT_THAT(log_output0, testing::HasSubstr("\"group\":\"envoy\"")); + EXPECT_THAT(log_output0, testing::HasSubstr("\"environment\":\"staging\"")); + std::string log_output1 = + HttpIntegrationTest::waitForAccessLog(HttpIntegrationTest::access_log_name_, 1, true); + EXPECT_THAT(log_output1, testing::HasSubstr("Name=prod")); +} + TEST_P(RateLimitQuotaIntegrationTest, BasicFlowMultiSameRequest) { initializeConfig(); HttpIntegrationTest::initialize();