Skip to content

Commit

Permalink
rlqs: Handle remote close (envoyproxy#35677)
Browse files Browse the repository at this point in the history
Commit Message: 
- Avoid periodical reports if stream has been remotely closed. 
- Clean up/refactor the code: remove redundant `stream_closed_` flag and
use `stream_ == nullptr` as the indicator of stream close.

---------

Signed-off-by: tyxia <[email protected]>
  • Loading branch information
tyxia authored Aug 13, 2024
1 parent 495acf6 commit 3283bed
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 11 deletions.
20 changes: 11 additions & 9 deletions source/extensions/filters/http/rate_limit_quota/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,14 @@ RateLimitQuotaUsageReports RateLimitClientImpl::buildReport(absl::optional<size_
// This function covers both periodical report and immediate report case, with the difference that
// bucked id in periodical report case is empty.
void RateLimitClientImpl::sendUsageReport(absl::optional<size_t> bucket_id) {
ASSERT(stream_ != nullptr);
// Build the report and then send the report to RLQS server.
// `end_stream` should always be set to false as we don't want to close the stream locally.
stream_->sendMessage(buildReport(bucket_id), /*end_stream=*/false);
if (stream_ != nullptr) {
// Build the report and then send the report to RLQS server.
// `end_stream` should always be set to false as we don't want to close the stream locally.
stream_->sendMessage(buildReport(bucket_id), /*end_stream=*/false);
} else {
// Don't send any reports if stream has already been closed.
ENVOY_LOG(debug, "The stream has already been closed; no reports will be sent.");
}
}

void RateLimitClientImpl::onReceiveMessage(RateLimitQuotaResponsePtr&& response) {
Expand Down Expand Up @@ -143,20 +147,18 @@ void RateLimitClientImpl::onReceiveMessage(RateLimitQuotaResponsePtr&& response)

void RateLimitClientImpl::closeStream() {
// Close the stream if it is in open state.
if (stream_ != nullptr && !stream_closed_) {
if (stream_ != nullptr) {
ENVOY_LOG(debug, "Closing gRPC stream");
stream_->closeStream();
stream_closed_ = true;
stream_->resetStream();
stream_ = nullptr;
}
}

void RateLimitClientImpl::onRemoteClose(Grpc::Status::GrpcStatus status,
const std::string& message) {
// TODO(tyxia) Revisit later, maybe add some logging.
stream_closed_ = true;
ENVOY_LOG(debug, "gRPC stream closed remotely with status {}: {}", status, message);
closeStream();
stream_ = nullptr;
}

absl::Status RateLimitClientImpl::startStream(const StreamInfo::StreamInfo& stream_info) {
Expand Down
2 changes: 0 additions & 2 deletions source/extensions/filters/http/rate_limit_quota/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ class RateLimitClientImpl : public RateLimitClient,
// Build the usage report (i.e., the request sent to RLQS server) from the buckets in quota bucket
// cache.
RateLimitQuotaUsageReports buildReport(absl::optional<size_t> bucket_id);

bool stream_closed_ = false;
// Domain from filter configuration. The same domain name throughout the whole lifetime of client.
std::string domain_name_;
// Client is stored as the bare object since there is no ownership transfer involved.
Expand Down
89 changes: 89 additions & 0 deletions test/extensions/filters/http/rate_limit_quota/integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,95 @@ TEST_P(RateLimitQuotaIntegrationTest, BasicFlowPeriodicalReport) {
}
}

TEST_P(RateLimitQuotaIntegrationTest, BasicFlowPeriodicalReportWithStreamClosed) {
initializeConfig();
HttpIntegrationTest::initialize();
absl::flat_hash_map<std::string, std::string> custom_headers = {{"environment", "staging"},
{"group", "envoy"}};
// Send downstream client request to upstream.
sendClientRequest(&custom_headers);

ASSERT_TRUE(grpc_upstreams_[0]->waitForHttpConnection(*dispatcher_, rlqs_connection_));
ASSERT_TRUE(rlqs_connection_->waitForNewStream(*dispatcher_, rlqs_stream_));
// reports should be built in filter.cc
envoy::service::rate_limit_quota::v3::RateLimitQuotaUsageReports reports;
ASSERT_TRUE(rlqs_stream_->waitForGrpcMessage(*dispatcher_, reports));

// Verify the usage report content.
ASSERT_THAT(reports.bucket_quota_usages_size(), 1);
const auto& usage = reports.bucket_quota_usages(0);
// We only send single downstream client request and it is allowed.
EXPECT_EQ(usage.num_requests_allowed(), 1);
EXPECT_EQ(usage.num_requests_denied(), 0);
// It is first report so the time_elapsed is 0.
EXPECT_EQ(Protobuf::util::TimeUtil::DurationToSeconds(usage.time_elapsed()), 0);

rlqs_stream_->startGrpcStream();

// Build the response.
envoy::service::rate_limit_quota::v3::RateLimitQuotaResponse rlqs_response;
absl::flat_hash_map<std::string, std::string> custom_headers_cpy = custom_headers;
custom_headers_cpy.insert({"name", "prod"});
auto* bucket_action = rlqs_response.add_bucket_action();

for (const auto& [key, value] : custom_headers_cpy) {
(*bucket_action->mutable_bucket_id()->mutable_bucket()).insert({key, value});
}
rlqs_stream_->sendGrpcMessage(rlqs_response);

// Handle the request received by upstream.
ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_));
ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_));
ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_));
upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false);
upstream_request_->encodeData(100, true);

// Verify the response to downstream.
ASSERT_TRUE(response_->waitForEndStream());
EXPECT_TRUE(response_->complete());
EXPECT_EQ(response_->headers().getStatusValue(), "200");

// ValidMatcherConfig.
int report_interval_sec = 60;
// Trigger the report periodically.
for (int i = 0; i < 6; ++i) {
if (i == 2) {
// Close the stream.
rlqs_stream_->finishGrpcStream(Grpc::Status::Ok);
}

// Advance the time by report_interval.
simTime().advanceTimeWait(std::chrono::milliseconds(report_interval_sec * 1000));

// Only perform rlqs server check and response before stream is remotely closed.
if (i < 2) {
// Checks that the rate limit server has received the periodical reports.
ASSERT_TRUE(rlqs_stream_->waitForGrpcMessage(*dispatcher_, reports));

// Verify the usage report content.
ASSERT_THAT(reports.bucket_quota_usages_size(), 1);
const auto& usage = reports.bucket_quota_usages(0);
// Report only represents the usage since last report.
// In the periodical report case here, the number of request allowed and denied is 0 since no
// new requests comes in.
EXPECT_EQ(usage.num_requests_allowed(), 0);
EXPECT_EQ(usage.num_requests_denied(), 0);
// time_elapsed equals to periodical reporting interval.
EXPECT_EQ(Protobuf::util::TimeUtil::DurationToSeconds(usage.time_elapsed()),
report_interval_sec);

// Build the rlqs server response.
envoy::service::rate_limit_quota::v3::RateLimitQuotaResponse rlqs_response2;
auto* bucket_action2 = rlqs_response2.add_bucket_action();

for (const auto& [key, value] : custom_headers_cpy) {
(*bucket_action2->mutable_bucket_id()->mutable_bucket()).insert({key, value});
}
rlqs_stream_->sendGrpcMessage(rlqs_response2);
}
}
}

// RLQS filter is operating in non-blocking mode now, this test could be flaky until the stats are
// added to make the test behavior deterministic. (e.g., wait for stats in the test).
// Disable the test for now.
Expand Down

0 comments on commit 3283bed

Please sign in to comment.