From 5474132e5ecdde6facf6b550e7280024ead222da Mon Sep 17 00:00:00 2001 From: Yan Avlasov Date: Sat, 30 Sep 2023 13:58:46 +0000 Subject: [PATCH] Limit on the number of HTTP requests processed from a connection in an I/O cycle Signed-off-by: Yan Avlasov --- changelogs/current.yaml | 7 + source/common/http/conn_manager_impl.cc | 93 ++++++- source/common/http/conn_manager_impl.h | 24 +- test/common/http/conn_manager_impl_test_2.cc | 244 ++++++++++++++++++ .../http/conn_manager_impl_test_base.cc | 19 ++ .../common/http/conn_manager_impl_test_base.h | 2 + test/common/http/http2/http2_frame.cc | 12 +- test/common/http/http2/http2_frame.h | 14 +- .../multiplexed_integration_test.cc | 170 ++++++++++++ 9 files changed, 570 insertions(+), 15 deletions(-) diff --git a/changelogs/current.yaml b/changelogs/current.yaml index d458798d1e88..49dcb27ddd6a 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -11,6 +11,13 @@ behavior_changes: resets is applied. The connection is disconnected if more than 50% of resets are premature. Setting the runtime key ``envoy.restart_features.send_goaway_for_premature_rst_streams`` to ``false`` completely disables this check. +- area: http + change: | + Add runtime flag ``http.max_requests_per_io_cycle`` for setting the limit on the number of HTTP requests processed + from a single connection in a single I/O cycle. Requests over this limit are processed in subsequent I/O cycles. This + mitigates CPU starvation by connections that simultaneously send high number of requests by allowing requests from other + connections to make progress. This runtime value can be set to 1 in the presence of abusive HTTP/2 or HTTP/3 connections. + By default this limit is disabled. minor_behavior_changes: # *Changes that may cause incompatibilities for some users, but should not for most* diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index f6d2f6bcea26..e8afab89e7a4 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -60,6 +60,10 @@ const absl::string_view ConnectionManagerImpl::PrematureResetTotalStreamCountKey "overload.premature_reset_total_stream_count"; const absl::string_view ConnectionManagerImpl::PrematureResetMinStreamLifetimeSecondsKey = "overload.premature_reset_min_stream_lifetime_seconds"; +// Runtime key for maximum number of requests that can be processed from a single connection per +// I/O cycle. Requests over this limit are deferred until the next I/O cycle. +const absl::string_view ConnectionManagerImpl::MaxRequestsPerIoCycle = + "http.max_requests_per_io_cycle"; bool requestWasConnect(const RequestHeaderMapPtr& headers, Protocol protocol) { if (!headers) { @@ -113,7 +117,9 @@ ConnectionManagerImpl::ConnectionManagerImpl(ConnectionManagerConfig& config, time_source_(time_source), proxy_name_(StreamInfo::ProxyStatusUtils::makeProxyName( /*node_id=*/local_info_.node().id(), /*server_name=*/config_.serverName(), - /*proxy_status_config=*/config_.proxyStatusConfig())) {} + /*proxy_status_config=*/config_.proxyStatusConfig())), + max_requests_during_dispatch_(runtime_.snapshot().getInteger( + ConnectionManagerImpl::MaxRequestsPerIoCycle, UINT32_MAX)) {} const ResponseHeaderMap& ConnectionManagerImpl::continueHeader() { static const auto headers = createHeaderMap( @@ -123,6 +129,12 @@ const ResponseHeaderMap& ConnectionManagerImpl::continueHeader() { void ConnectionManagerImpl::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) { read_callbacks_ = &callbacks; + if (max_requests_during_dispatch_ != UINT32_MAX) { + deferred_request_processing_callback_ = + callbacks.connection().dispatcher().createSchedulableCallback( + [this]() -> void { onDeferredRequestProcessing(); }); + } + stats_.named_.downstream_cx_total_.inc(); stats_.named_.downstream_cx_active_.inc(); if (read_callbacks_->connection().ssl()) { @@ -391,6 +403,7 @@ void ConnectionManagerImpl::createCodec(Buffer::Instance& data) { } Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool) { + requests_during_dispatch_count_ = 0; if (!codec_) { // Http3 codec should have been instantiated by now. createCodec(data); @@ -1250,7 +1263,12 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapPtr&& he traceRequest(); } - filter_manager_.decodeHeaders(*request_headers_, end_stream); + if (!connection_manager_.shouldDeferRequestProxyingToNextIoCycle()) { + filter_manager_.decodeHeaders(*request_headers_, end_stream); + } else { + state_.deferred_to_next_io_iteration_ = true; + state_.deferred_end_stream_ = end_stream; + } // Reset it here for both global and overridden cases. resetIdleTimer(); @@ -1317,8 +1335,15 @@ void ConnectionManagerImpl::ActiveStream::decodeData(Buffer::Instance& data, boo connection_manager_.read_callbacks_->connection().dispatcher()); maybeEndDecode(end_stream); filter_manager_.streamInfo().addBytesReceived(data.length()); - - filter_manager_.decodeData(data, end_stream); + if (!state_.deferred_to_next_io_iteration_) { + filter_manager_.decodeData(data, end_stream); + } else { + if (!deferred_data_) { + deferred_data_ = std::make_unique(); + } + deferred_data_->move(data); + state_.deferred_end_stream_ = end_stream; + } } void ConnectionManagerImpl::ActiveStream::decodeTrailers(RequestTrailerMapPtr&& trailers) { @@ -1334,7 +1359,9 @@ void ConnectionManagerImpl::ActiveStream::decodeTrailers(RequestTrailerMapPtr&& return; } maybeEndDecode(true); - filter_manager_.decodeTrailers(*request_trailers_); + if (!state_.deferred_to_next_io_iteration_) { + filter_manager_.decodeTrailers(*request_trailers_); + } } void ConnectionManagerImpl::ActiveStream::decodeMetadata(MetadataMapPtr&& metadata_map) { @@ -1936,5 +1963,61 @@ void ConnectionManagerImpl::ActiveStream::resetStream(Http::StreamResetReason, a connection_manager_.doEndStream(*this); } +bool ConnectionManagerImpl::ActiveStream::onDeferredRequestProcessing() { + // TODO(yanavlasov): Merge this with the filter manager continueIteration() method + if (!state_.deferred_to_next_io_iteration_) { + return false; + } + state_.deferred_to_next_io_iteration_ = false; + bool end_stream = + state_.deferred_end_stream_ && deferred_data_ == nullptr && request_trailers_ == nullptr; + filter_manager_.decodeHeaders(*request_headers_, end_stream); + if (end_stream) { + return true; + } + if (deferred_data_ != nullptr) { + end_stream = state_.deferred_end_stream_ && request_trailers_ == nullptr; + filter_manager_.decodeData(*deferred_data_, end_stream); + } + if (request_trailers_ != nullptr) { + filter_manager_.decodeTrailers(*request_trailers_); + } + return true; +} + +bool ConnectionManagerImpl::shouldDeferRequestProxyingToNextIoCycle() { + // Do not defer this stream if stream deferral is disabled + if (deferred_request_processing_callback_ == nullptr) { + return false; + } + // Defer this stream if there are already deferred streams, so they are not + // processed out of order + if (deferred_request_processing_callback_->enabled()) { + return true; + } + ++requests_during_dispatch_count_; + bool defer = requests_during_dispatch_count_ > max_requests_during_dispatch_; + if (defer) { + deferred_request_processing_callback_->scheduleCallbackNextIteration(); + } + return defer; +} + +void ConnectionManagerImpl::onDeferredRequestProcessing() { + requests_during_dispatch_count_ = 1; // 1 stream is always let through + // Streams are inserted at the head of the list. As such process deferred + // streams at the back of the list first. + for (auto reverse_iter = streams_.rbegin(); reverse_iter != streams_.rend();) { + auto& stream_ptr = *reverse_iter; + // Move the iterator to the next item in case the `onDeferredRequestProcessing` call removes the + // stream from the list. + ++reverse_iter; + bool was_deferred = stream_ptr->onDeferredRequestProcessing(); + if (was_deferred && shouldDeferRequestProxyingToNextIoCycle()) { + break; + } + } +} + } // namespace Http } // namespace Envoy diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h index 0fe5a9297e6e..6022b91c4e08 100644 --- a/source/common/http/conn_manager_impl.h +++ b/source/common/http/conn_manager_impl.h @@ -122,6 +122,7 @@ class ConnectionManagerImpl : Logger::Loggable, // The minimum lifetime of a stream, in seconds, in order not to be considered // prematurely closed. static const absl::string_view PrematureResetMinStreamLifetimeSecondsKey; + static const absl::string_view MaxRequestsPerIoCycle; private: struct ActiveStream; @@ -315,7 +316,7 @@ class ConnectionManagerImpl : Logger::Loggable, State() : codec_saw_local_complete_(false), saw_connection_close_(false), successful_upgrade_(false), is_internally_created_(false), is_tunneling_(false), - decorated_propagate_(true) {} + decorated_propagate_(true), deferred_to_next_io_iteration_(false) {} bool codec_saw_local_complete_ : 1; // This indicates that local is complete as written all // the way through to the codec. @@ -331,6 +332,14 @@ class ConnectionManagerImpl : Logger::Loggable, bool is_tunneling_ : 1; bool decorated_propagate_ : 1; + + // Indicates that sending headers to the filter manager is deferred to the + // next I/O cycle. If data or trailers are received when this flag is set + // they are deferred too. + // TODO(yanavlasov): encapsulate the entire state of deferred streams into a separate + // structure, so it can be atomically created and cleared. + bool deferred_to_next_io_iteration_ : 1; + bool deferred_end_stream_ : 1; }; // Per-stream idle timeout callback. @@ -373,6 +382,11 @@ class ConnectionManagerImpl : Logger::Loggable, // HTTP connection manager configuration, then the entire connection is closed. bool validateTrailers(); + // Dispatch deferred headers, body and trailers to the filter manager. + // Return true if this stream was deferred and dispatched pending headers, body and trailers (if + // present). Return false if this stream was not deferred. + bool onDeferredRequestProcessing(); + ConnectionManagerImpl& connection_manager_; OptRef connection_manager_tracing_config_; // TODO(snowp): It might make sense to move this to the FilterManager to avoid storing it in @@ -427,6 +441,8 @@ class ConnectionManagerImpl : Logger::Loggable, const Tracing::CustomTagMap* customTags() const override; bool verbose() const override; uint32_t maxPathTagLength() const override; + + std::unique_ptr deferred_data_; }; using ActiveStreamPtr = std::unique_ptr; @@ -482,6 +498,9 @@ class ConnectionManagerImpl : Logger::Loggable, // and at least half have been prematurely reset? void maybeDrainDueToPrematureResets(); + bool shouldDeferRequestProxyingToNextIoCycle(); + void onDeferredRequestProcessing(); + enum class DrainState { NotDraining, Draining, Closing }; ConnectionManagerConfig& config_; @@ -526,6 +545,9 @@ class ConnectionManagerImpl : Logger::Loggable, // the definition given in `isPrematureRstStream()`. uint64_t number_premature_stream_resets_{0}; const std::string proxy_name_; // for Proxy-Status. + uint32_t requests_during_dispatch_count_{0}; + const uint32_t max_requests_during_dispatch_{UINT32_MAX}; + Event::SchedulableCallbackPtr deferred_request_processing_callback_; }; } // namespace Http diff --git a/test/common/http/conn_manager_impl_test_2.cc b/test/common/http/conn_manager_impl_test_2.cc index de8f1e0782fa..27059ac8e239 100644 --- a/test/common/http/conn_manager_impl_test_2.cc +++ b/test/common/http/conn_manager_impl_test_2.cc @@ -13,6 +13,7 @@ using testing::Mock; using testing::Property; using testing::Ref; using testing::Return; +using testing::ReturnArg; using testing::ReturnRef; namespace Envoy { @@ -3503,5 +3504,248 @@ TEST_F(HttpConnectionManagerImplTest, HeaderValidatorAccept) { EXPECT_EQ(1U, listener_stats_.downstream_rq_completed_.value()); } +// Validate that deferred streams are processed with a variety of +// headers, data and trailer arriving in the same I/O cycle +TEST_F(HttpConnectionManagerImplTest, LimitWorkPerIOCycle) { + const int kRequestsSentPerIOCycle = 100; + EXPECT_CALL(runtime_.snapshot_, getInteger(_, _)).WillRepeatedly(ReturnArg<1>()); + // Process 1 request per I/O cycle + auto* deferred_request_callback = enableStreamsPerIoLimit(1); + setup(false, ""); + + // Store the basic request encoder during filter chain setup. + std::vector> encoder_filters; + int decode_headers_call_count = 0; + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + std::shared_ptr filter(new NiceMock()); + + // Each 4th request is headers only + EXPECT_CALL(*filter, decodeHeaders(_, i % 4 == 0 ? true : false)) + .WillRepeatedly(Invoke([&](RequestHeaderMap&, bool) -> FilterHeadersStatus { + ++decode_headers_call_count; + return FilterHeadersStatus::StopIteration; + })); + + // Each 1st request is headers and data only + // Each 2nd request is headers, data and trailers + if (i % 4 == 1 || i % 4 == 2) { + EXPECT_CALL(*filter, decodeData(_, i % 4 == 1 ? true : false)) + .WillOnce(Return(FilterDataStatus::StopIterationNoBuffer)); + } + + // Each 3rd request is headers and trailers (no data) + if (i % 4 == 2 || i % 4 == 3) { + EXPECT_CALL(*filter, decodeTrailers(_)).WillOnce(Return(FilterTrailersStatus::StopIteration)); + } + + EXPECT_CALL(*filter, setDecoderFilterCallbacks(_)); + encoder_filters.push_back(std::move(filter)); + } + + uint64_t random_value = 0; + EXPECT_CALL(random_, random()).WillRepeatedly(Invoke([&random_value]() { + return random_value++; + })); + + EXPECT_CALL(filter_factory_, createFilterChain(_)) + .Times(kRequestsSentPerIOCycle) + .WillRepeatedly(Invoke([&encoder_filters](FilterChainManager& manager) -> bool { + static int index = 0; + int i = index++; + FilterFactoryCb factory([&encoder_filters, i](FilterChainFactoryCallbacks& callbacks) { + callbacks.addStreamDecoderFilter(encoder_filters[i]); + }); + manager.applyFilterFactoryCb({}, factory); + return true; + })); + + EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, deferredDelete_(_)) + .Times(kRequestsSentPerIOCycle); + + std::vector> response_encoders(kRequestsSentPerIOCycle); + for (auto& encoder : response_encoders) { + EXPECT_CALL(encoder, getStream()).WillRepeatedly(ReturnRef(encoder.stream_)); + } + + EXPECT_CALL(*codec_, dispatch(_)) + .WillRepeatedly(Invoke([&](Buffer::Instance& data) -> Http::Status { + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + decoder_ = &conn_manager_->newStream(response_encoders[i]); + + RequestHeaderMapPtr headers{new TestRequestHeaderMapImpl{ + {":authority", "host"}, {":path", "/"}, {":method", "GET"}}}; + + RequestTrailerMapPtr trailers{ + new TestRequestTrailerMapImpl{{"key1", "value1"}, {"key2", "value2"}}}; + + Buffer::OwnedImpl data("data"); + + switch (i % 4) { + case 0: + decoder_->decodeHeaders(std::move(headers), true); + break; + case 1: + decoder_->decodeHeaders(std::move(headers), false); + decoder_->decodeData(data, true); + break; + case 2: + decoder_->decodeHeaders(std::move(headers), false); + decoder_->decodeData(data, false); + decoder_->decodeTrailers(std::move(trailers)); + break; + case 3: + decoder_->decodeHeaders(std::move(headers), false); + decoder_->decodeTrailers(std::move(trailers)); + break; + } + } + + data.drain(4); + return Http::okStatus(); + })); + + // Kick off the incoming data. + Buffer::OwnedImpl fake_input("1234"); + conn_manager_->onData(fake_input, false); + + EXPECT_TRUE(deferred_request_callback->enabled_); + // Only one request should go through the filter chain + ASSERT_EQ(decode_headers_call_count, 1); + + // Let other requests to go through the filter chain. Call expectations will fail + // if this is not the case. + int deferred_request_count = 0; + while (deferred_request_callback->enabled_) { + deferred_request_callback->invokeCallback(); + ++deferred_request_count; + } + + ASSERT_EQ(deferred_request_count, kRequestsSentPerIOCycle); + + for (auto& filter : encoder_filters) { + ResponseHeaderMapPtr response_headers{new TestResponseHeaderMapImpl{{":status", "200"}}}; + filter->callbacks_->streamInfo().setResponseCodeDetails(""); + filter->callbacks_->encodeHeaders(std::move(response_headers), true, "details"); + } + + EXPECT_EQ(kRequestsSentPerIOCycle, stats_.named_.downstream_rq_2xx_.value()); + EXPECT_EQ(kRequestsSentPerIOCycle, listener_stats_.downstream_rq_2xx_.value()); + EXPECT_EQ(kRequestsSentPerIOCycle, stats_.named_.downstream_rq_completed_.value()); + EXPECT_EQ(kRequestsSentPerIOCycle, listener_stats_.downstream_rq_completed_.value()); +} + +TEST_F(HttpConnectionManagerImplTest, StreamDeferralPreservesOrder) { + EXPECT_CALL(runtime_.snapshot_, getInteger(_, _)).WillRepeatedly(ReturnArg<1>()); + // Process 1 request per I/O cycle + auto* deferred_request_callback = enableStreamsPerIoLimit(1); + setup(false, ""); + + std::vector> encoder_filters; + int expected_request_id = 0; + const Http::LowerCaseString request_id_header(absl::string_view("request-id")); + // Two requests are processed in 2 I/O reads + const int TotalRequests = 2 * 2; + for (int i = 0; i < TotalRequests; ++i) { + std::shared_ptr filter(new NiceMock()); + + EXPECT_CALL(*filter, decodeHeaders(_, true)) + .WillRepeatedly(Invoke([&](RequestHeaderMap& headers, bool) -> FilterHeadersStatus { + // Check that requests are decoded in expected order + int request_id = 0; + ASSERT(absl::SimpleAtoi(headers.get(request_id_header)[0]->value().getStringView(), + &request_id)); + ASSERT(request_id == expected_request_id); + ++expected_request_id; + return FilterHeadersStatus::StopIteration; + })); + + EXPECT_CALL(*filter, setDecoderFilterCallbacks(_)); + encoder_filters.push_back(std::move(filter)); + } + + uint64_t random_value = 0; + EXPECT_CALL(random_, random()).WillRepeatedly(Invoke([&random_value]() { + return random_value++; + })); + + EXPECT_CALL(filter_factory_, createFilterChain(_)) + .Times(TotalRequests) + .WillRepeatedly(Invoke([&encoder_filters](FilterChainManager& manager) -> bool { + static int index = 0; + int i = index++; + FilterFactoryCb factory([&encoder_filters, i](FilterChainFactoryCallbacks& callbacks) { + callbacks.addStreamDecoderFilter(encoder_filters[i]); + }); + manager.applyFilterFactoryCb({}, factory); + return true; + })); + + EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, deferredDelete_(_)).Times(TotalRequests); + + std::vector> response_encoders(TotalRequests); + for (auto& encoder : response_encoders) { + EXPECT_CALL(encoder, getStream()).WillRepeatedly(ReturnRef(encoder.stream_)); + } + auto response_encoders_iter = response_encoders.begin(); + + int request_id = 0; + EXPECT_CALL(*codec_, dispatch(_)) + .WillRepeatedly(Invoke([&](Buffer::Instance& data) -> Http::Status { + // The second request should be deferred + for (int i = 0; i < 2; ++i) { + decoder_ = &conn_manager_->newStream(*response_encoders_iter); + ++response_encoders_iter; + + RequestHeaderMapPtr headers{ + new TestRequestHeaderMapImpl{{":authority", "host"}, + {":path", "/"}, + {":method", "GET"}, + {"request-id", absl::StrCat(request_id)}}}; + + ++request_id; + decoder_->decodeHeaders(std::move(headers), true); + } + + data.drain(4); + return Http::okStatus(); + })); + + // Kick off the incoming data. + Buffer::OwnedImpl fake_input("1234"); + conn_manager_->onData(fake_input, false); + + EXPECT_TRUE(deferred_request_callback->enabled_); + // Only one request should go through the filter chain + ASSERT_EQ(expected_request_id, 1); + + // Test arrival of another request. New request is read from the socket before deferred callbacks. + Buffer::OwnedImpl fake_input2("1234"); + conn_manager_->onData(fake_input2, false); + + // No requests from the second read should go through as there are deferred stream present + ASSERT_EQ(expected_request_id, 1); + + // Let other requests to go through the filter chain. Call expectations will fail + // if this is not the case. + int deferred_request_count = 0; + while (deferred_request_callback->enabled_) { + deferred_request_callback->invokeCallback(); + ++deferred_request_count; + } + + ASSERT_EQ(deferred_request_count, TotalRequests); + + for (auto& filter : encoder_filters) { + ResponseHeaderMapPtr response_headers{new TestResponseHeaderMapImpl{{":status", "200"}}}; + filter->callbacks_->streamInfo().setResponseCodeDetails(""); + filter->callbacks_->encodeHeaders(std::move(response_headers), true, "details"); + } + + EXPECT_EQ(TotalRequests, stats_.named_.downstream_rq_2xx_.value()); + EXPECT_EQ(TotalRequests, listener_stats_.downstream_rq_2xx_.value()); + EXPECT_EQ(TotalRequests, stats_.named_.downstream_rq_completed_.value()); + EXPECT_EQ(TotalRequests, listener_stats_.downstream_rq_completed_.value()); +} + } // namespace Http } // namespace Envoy diff --git a/test/common/http/conn_manager_impl_test_base.cc b/test/common/http/conn_manager_impl_test_base.cc index 973e98681ba3..a0cfe8332be4 100644 --- a/test/common/http/conn_manager_impl_test_base.cc +++ b/test/common/http/conn_manager_impl_test_base.cc @@ -78,6 +78,7 @@ void HttpConnectionManagerImplTest::setup(bool ssl, const std::string& server_na conn_manager_ = std::make_unique( *this, drain_close_, random_, http_context_, runtime_, local_info_, cluster_manager_, overload_manager_, test_time_.timeSystem()); + conn_manager_->initializeReadFilterCallbacks(filter_callbacks_); if (tracing) { @@ -327,5 +328,23 @@ void HttpConnectionManagerImplTest::expectUhvTrailerCheckFail() { })); } +Event::MockSchedulableCallback* +HttpConnectionManagerImplTest::enableStreamsPerIoLimit(uint32_t limit) { + EXPECT_CALL(runtime_.snapshot_, getInteger("http.max_requests_per_io_cycle", _)) + .WillOnce(Return(limit)); + + // Expect HCM to create and set schedulable callback + auto* deferred_request_callback = + new Event::MockSchedulableCallback(&filter_callbacks_.connection_.dispatcher_); + EXPECT_CALL(*deferred_request_callback, enabled()) + .WillRepeatedly( + Invoke([deferred_request_callback]() { return deferred_request_callback->enabled_; })); + EXPECT_CALL(*deferred_request_callback, scheduleCallbackNextIteration()) + .WillRepeatedly( + Invoke([deferred_request_callback]() { deferred_request_callback->enabled_ = true; })); + + return deferred_request_callback; +} + } // namespace Http } // namespace Envoy diff --git a/test/common/http/conn_manager_impl_test_base.h b/test/common/http/conn_manager_impl_test_base.h index 8b1e9b0e2d0f..04b6115bb7ad 100644 --- a/test/common/http/conn_manager_impl_test_base.h +++ b/test/common/http/conn_manager_impl_test_base.h @@ -178,6 +178,8 @@ class HttpConnectionManagerImplTest : public testing::Test, public ConnectionMan } void expectUhvTrailerCheckFail(); + Event::MockSchedulableCallback* enableStreamsPerIoLimit(uint32_t limit); + Envoy::Event::SimulatedTimeSystem test_time_; NiceMock route_config_provider_; std::shared_ptr route_config_{new NiceMock()}; diff --git a/test/common/http/http2/http2_frame.cc b/test/common/http/http2/http2_frame.cc index 68569c0daf2e..f7f2ad117d76 100644 --- a/test/common/http/http2/http2_frame.cc +++ b/test/common/http/http2/http2_frame.cc @@ -339,7 +339,11 @@ Http2Frame Http2Frame::makeRequest(uint32_t stream_index, absl::string_view host makeNetworkOrderStreamId(stream_index)); frame.appendStaticHeader(StaticHeaderIndex::MethodGet); frame.appendStaticHeader(StaticHeaderIndex::SchemeHttps); - frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Path, path); + if (path.empty() || path == "/") { + frame.appendStaticHeader(StaticHeaderIndex::Path); + } else { + frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Path, path); + } frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Authority, host); frame.adjustPayloadSize(); return frame; @@ -363,7 +367,11 @@ Http2Frame Http2Frame::makePostRequest(uint32_t stream_index, absl::string_view makeNetworkOrderStreamId(stream_index)); frame.appendStaticHeader(StaticHeaderIndex::MethodPost); frame.appendStaticHeader(StaticHeaderIndex::SchemeHttps); - frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Path, path); + if (path.empty() || path == "/") { + frame.appendStaticHeader(StaticHeaderIndex::Path); + } else { + frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Path, path); + } frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Authority, host); frame.adjustPayloadSize(); return frame; diff --git a/test/common/http/http2/http2_frame.h b/test/common/http/http2/http2_frame.h index dca17c480434..1b2ba0fea5d5 100644 --- a/test/common/http/http2/http2_frame.h +++ b/test/common/http/http2/http2_frame.h @@ -209,6 +209,13 @@ class Http2Frame { ConstIterator end() const { return data_.end(); } bool empty() const { return data_.empty(); } + void appendHeaderWithoutIndexing(const Header& header); + // This method updates payload length in the HTTP2 header based on the size of the data_ + void adjustPayloadSize() { + ASSERT(size() >= HeaderSize); + setPayloadSize(size() - HeaderSize); + } + private: void buildHeader(Type type, uint32_t payload_size = 0, uint8_t flags = 0, uint32_t stream_id = 0); void setPayloadSize(uint32_t size); @@ -228,15 +235,8 @@ class Http2Frame { // Headers are directly encoded void appendStaticHeader(StaticHeaderIndex index); void appendHeaderWithoutIndexing(StaticHeaderIndex index, absl::string_view value); - void appendHeaderWithoutIndexing(const Header& header); void appendEmptyHeader(); - // This method updates payload length in the HTTP2 header based on the size of the data_ - void adjustPayloadSize() { - ASSERT(size() >= HeaderSize); - setPayloadSize(size() - HeaderSize); - } - DataContainer data_; }; diff --git a/test/integration/multiplexed_integration_test.cc b/test/integration/multiplexed_integration_test.cc index 6256539a01e5..6db47ef9d8ef 100644 --- a/test/integration/multiplexed_integration_test.cc +++ b/test/integration/multiplexed_integration_test.cc @@ -2008,6 +2008,176 @@ TEST_P(Http2FrameIntegrationTest, UpstreamWindowUpdateAfterGoAway) { tcp_client_->close(); } +TEST_P(Http2FrameIntegrationTest, MultipleHeaderOnlyRequests) { + const int kRequestsSentPerIOCycle = 20; + autonomous_upstream_ = true; + config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "1"); + beginSession(); + + std::string buffer; + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto request = Http2Frame::makeRequest(Http2Frame::makeClientStreamId(i), "a", "/", + {{"response_data_blocks", "0"}, {"no_trailers", "1"}}); + absl::StrAppend(&buffer, std::string(request)); + } + + ASSERT_TRUE(tcp_client_->write(buffer, false, false)); + + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto frame = readFrame(); + EXPECT_EQ(Http2Frame::Type::Headers, frame.type()); + EXPECT_EQ(Http2Frame::ResponseStatus::Ok, frame.responseStatus()); + } + tcp_client_->close(); +} + +TEST_P(Http2FrameIntegrationTest, MultipleRequests) { + const int kRequestsSentPerIOCycle = 20; + autonomous_upstream_ = true; + config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "1"); + beginSession(); + + std::string buffer; + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto request = + Http2Frame::makePostRequest(Http2Frame::makeClientStreamId(i), "a", "/", + {{"response_data_blocks", "0"}, {"no_trailers", "1"}}); + absl::StrAppend(&buffer, std::string(request)); + } + + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto data = Http2Frame::makeDataFrame(Http2Frame::makeClientStreamId(i), "a", + Http2Frame::DataFlags::EndStream); + absl::StrAppend(&buffer, std::string(data)); + } + + ASSERT_TRUE(tcp_client_->write(buffer, false, false)); + + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto frame = readFrame(); + EXPECT_EQ(Http2Frame::Type::Headers, frame.type()); + EXPECT_EQ(Http2Frame::ResponseStatus::Ok, frame.responseStatus()); + } + tcp_client_->close(); +} + +TEST_P(Http2FrameIntegrationTest, MultipleRequestsWithTrailers) { + const int kRequestsSentPerIOCycle = 20; + autonomous_upstream_ = true; + config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "1"); + beginSession(); + + std::string buffer; + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto request = + Http2Frame::makePostRequest(Http2Frame::makeClientStreamId(i), "a", "/", + {{"response_data_blocks", "0"}, {"no_trailers", "1"}}); + absl::StrAppend(&buffer, std::string(request)); + } + + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto data = Http2Frame::makeDataFrame(Http2Frame::makeClientStreamId(i), "a"); + absl::StrAppend(&buffer, std::string(data)); + } + + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto trailers = Http2Frame::makeEmptyHeadersFrame( + Http2Frame::makeClientStreamId(i), + static_cast(Http::Http2::orFlags( + Http2Frame::HeadersFlags::EndStream, Http2Frame::HeadersFlags::EndHeaders))); + trailers.appendHeaderWithoutIndexing({"k", "v"}); + trailers.adjustPayloadSize(); + absl::StrAppend(&buffer, std::string(trailers)); + } + + ASSERT_TRUE(tcp_client_->write(buffer, false, false)); + + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto frame = readFrame(); + EXPECT_EQ(Http2Frame::Type::Headers, frame.type()); + EXPECT_EQ(Http2Frame::ResponseStatus::Ok, frame.responseStatus()); + } + tcp_client_->close(); +} + +TEST_P(Http2FrameIntegrationTest, MultipleHeaderOnlyRequestsFollowedByReset) { + // This number of requests stays below premature reset detection. + const int kRequestsSentPerIOCycle = 20; + config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "1"); + beginSession(); + + std::string buffer; + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto request = Http2Frame::makeRequest(Http2Frame::makeClientStreamId(i), "a", "/", + {{"response_data_blocks", "0"}, {"no_trailers", "1"}}); + absl::StrAppend(&buffer, std::string(request)); + } + + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto reset = Http2Frame::makeResetStreamFrame(Http2Frame::makeClientStreamId(i), + Http2Frame::ErrorCode::Cancel); + absl::StrAppend(&buffer, std::string(reset)); + } + + ASSERT_TRUE(tcp_client_->write(buffer, false, false)); + test_server_->waitForCounterEq("http.config_test.downstream_rq_rx_reset", + kRequestsSentPerIOCycle); + // Client should remain connected + ASSERT_TRUE(tcp_client_->connected()); + tcp_client_->close(); +} + +TEST_P(Http2FrameIntegrationTest, ResettingDeferredRequestsTriggersPrematureResetCheck) { + const int kRequestsSentPerIOCycle = 20; + // Set premature stream count to the number of streams we are about to send + config_helper_.addRuntimeOverride("overload.premature_reset_total_stream_count", "20"); + config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "1"); + beginSession(); + + std::string buffer; + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto request = Http2Frame::makeRequest(Http2Frame::makeClientStreamId(i), "a", "/", + {{"response_data_blocks", "0"}, {"no_trailers", "1"}}); + absl::StrAppend(&buffer, std::string(request)); + } + + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto reset = Http2Frame::makeResetStreamFrame(Http2Frame::makeClientStreamId(i), + Http2Frame::ErrorCode::Cancel); + absl::StrAppend(&buffer, std::string(reset)); + } + + ASSERT_TRUE(tcp_client_->write(buffer, false, false)); + // Envoy should close the client connection due to too many premature resets + tcp_client_->waitForDisconnect(); + test_server_->waitForCounterEq("http.config_test.downstream_rq_too_many_premature_resets", 1); +} + +TEST_P(Http2FrameIntegrationTest, CloseConnectionWithDeferredStreams) { + // Use large number of requests to ensure close is detected while there are + // still some deferred streams. + const int kRequestsSentPerIOCycle = 1000; + config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "1"); + // Ensure premature reset detection does not get in the way + config_helper_.addRuntimeOverride("overload.premature_reset_total_stream_count", "1001"); + beginSession(); + + std::string buffer; + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto request = Http2Frame::makeRequest(Http2Frame::makeClientStreamId(i), "a", "/", + {{"response_data_blocks", "0"}, {"no_trailers", "1"}}); + absl::StrAppend(&buffer, std::string(request)); + } + + ASSERT_TRUE(tcp_client_->write(buffer, false, false)); + ASSERT_TRUE(tcp_client_->connected()); + // Drop the downstream connection + tcp_client_->close(); + // Test that Envoy can clean-up deferred streams + test_server_->waitForCounterEq("http.config_test.downstream_rq_rx_reset", + kRequestsSentPerIOCycle); +} + INSTANTIATE_TEST_SUITE_P(IpVersions, Http2FrameIntegrationTest, testing::ValuesIn(Http2FrameIntegrationTest::testParams()), frameIntegrationTestParamToString);