diff --git a/changelogs/current.yaml b/changelogs/current.yaml index 615add3d8a65..3a2c5ce37334 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -47,6 +47,13 @@ behavior_changes: (UHV) on and off. The default value is off. This option is currently functional only when the ``ENVOY_ENABLE_UHV`` build flag is enabled. See https://github.com/envoyproxy/envoy/issues/10646 for more information about UHV. +- area: http + change: | + Add runtime flag ``http.max_requests_per_io_cycle`` for setting the limit on the number of HTTP requests processed + from a single connection in a single I/O cycle. Requests over this limit are processed in subsequent I/O cycles. This + mitigates CPU starvation by connections that simultaneously send high number of requests by allowing requests from other + connections to make progress. This runtime value can be set to 1 in the presence of abusive HTTP/2 or HTTP/3 connections. + By default this limit is disabled. minor_behavior_changes: # *Changes that may cause incompatibilities for some users, but should not for most* @@ -160,6 +167,15 @@ bug_fixes: - area: redis change: | Fixed a bug where redis key with ``%`` in the key is failing with a validation error. +- area: http + change: | + Close HTTP/2 and HTTP/3 connections that prematurely reset streams. The runtime key + ``overload.premature_reset_min_stream_lifetime_seconds`` determines the interval where received stream + reset is considered premature (with 1 second default). The runtime key ``overload.premature_reset_total_stream_count``, + with the default value of 500, determines the number of requests received from a connection before the check for premature + resets is applied. The connection is disconnected if more than 50% of resets are premature. + Setting the runtime key ``envoy.restart_features.send_goaway_for_premature_rst_streams`` to ``false`` completely disables + this check. removed_config_or_runtime: # *Normally occurs at the end of the* :ref:`deprecation period ` diff --git a/source/common/http/conn_manager_config.h b/source/common/http/conn_manager_config.h index 52f8188c205c..a07eb825f789 100644 --- a/source/common/http/conn_manager_config.h +++ b/source/common/http/conn_manager_config.h @@ -66,6 +66,7 @@ namespace Http { COUNTER(downstream_rq_rejected_via_ip_detection) \ COUNTER(downstream_rq_response_before_rq_complete) \ COUNTER(downstream_rq_rx_reset) \ + COUNTER(downstream_rq_too_many_premature_resets) \ COUNTER(downstream_rq_timeout) \ COUNTER(downstream_rq_header_timeout) \ COUNTER(downstream_rq_too_large) \ diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index 57bcfc33acb0..547817a1411d 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -1,5 +1,6 @@ #include "source/common/http/conn_manager_impl.h" +#include #include #include #include @@ -55,6 +56,15 @@ namespace Envoy { namespace Http { +const absl::string_view ConnectionManagerImpl::PrematureResetTotalStreamCountKey = + "overload.premature_reset_total_stream_count"; +const absl::string_view ConnectionManagerImpl::PrematureResetMinStreamLifetimeSecondsKey = + "overload.premature_reset_min_stream_lifetime_seconds"; +// Runtime key for maximum number of requests that can be processed from a single connection per +// I/O cycle. Requests over this limit are deferred until the next I/O cycle. +const absl::string_view ConnectionManagerImpl::MaxRequestsPerIoCycle = + "http.max_requests_per_io_cycle"; + bool requestWasConnect(const RequestHeaderMapSharedPtr& headers, Protocol protocol) { if (!headers) { return false; @@ -110,6 +120,8 @@ ConnectionManagerImpl::ConnectionManagerImpl(ConnectionManagerConfig& config, /*node_id=*/local_info_.node().id(), /*server_name=*/config_.serverName(), /*proxy_status_config=*/config_.proxyStatusConfig())), + max_requests_during_dispatch_( + runtime_.snapshot().getInteger(ConnectionManagerImpl::MaxRequestsPerIoCycle, UINT32_MAX)), refresh_rtt_after_request_( Runtime::runtimeFeatureEnabled("envoy.reloadable_features.refresh_rtt_after_request")) { ENVOY_LOG_ONCE_IF( @@ -127,6 +139,10 @@ const ResponseHeaderMap& ConnectionManagerImpl::continueHeader() { void ConnectionManagerImpl::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) { read_callbacks_ = &callbacks; dispatcher_ = &callbacks.connection().dispatcher(); + if (max_requests_during_dispatch_ != UINT32_MAX) { + deferred_request_processing_callback_ = + dispatcher_->createSchedulableCallback([this]() -> void { onDeferredRequestProcessing(); }); + } stats_.named_.downstream_cx_total_.inc(); stats_.named_.downstream_cx_active_.inc(); @@ -273,6 +289,12 @@ void ConnectionManagerImpl::doEndStream(ActiveStream& stream, bool check_for_def } void ConnectionManagerImpl::doDeferredStreamDestroy(ActiveStream& stream) { + if (!stream.state_.is_internally_destroyed_) { + ++closed_non_internally_destroyed_requests_; + if (isPrematureRstStream(stream)) { + ++number_premature_stream_resets_; + } + } if (stream.max_stream_duration_timer_ != nullptr) { stream.max_stream_duration_timer_->disableTimer(); stream.max_stream_duration_timer_ = nullptr; @@ -349,6 +371,7 @@ void ConnectionManagerImpl::doDeferredStreamDestroy(ActiveStream& stream) { if (connection_idle_timer_ && streams_.empty()) { connection_idle_timer_->enableTimer(config_.idleTimeout().value()); } + maybeDrainDueToPrematureResets(); } RequestDecoderHandlePtr ConnectionManagerImpl::newStreamHandle(ResponseEncoder& response_encoder, @@ -453,6 +476,7 @@ void ConnectionManagerImpl::createCodec(Buffer::Instance& data) { } Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool) { + requests_during_dispatch_count_ = 0; if (!codec_) { // Http3 codec should have been instantiated by now. createCodec(data); @@ -619,6 +643,58 @@ void ConnectionManagerImpl::doConnectionClose( } } +bool ConnectionManagerImpl::isPrematureRstStream(const ActiveStream& stream) const { + // Check if the request was prematurely reset, by comparing its lifetime to the configured + // threshold. + ASSERT(!stream.state_.is_internally_destroyed_); + absl::optional duration = + stream.filter_manager_.streamInfo().currentDuration(); + + // Check if request lifetime is longer than the premature reset threshold. + if (duration) { + const uint64_t lifetime = std::chrono::duration_cast(*duration).count(); + const uint64_t min_lifetime = runtime_.snapshot().getInteger( + ConnectionManagerImpl::PrematureResetMinStreamLifetimeSecondsKey, 1); + if (lifetime > min_lifetime) { + return false; + } + } + + // If request has completed before configured threshold, also check if the Envoy proxied the + // response from the upstream. Requests without the response status were reset. + // TODO(RyanTheOptimist): Possibly support half_closed_local instead. + return !stream.filter_manager_.streamInfo().responseCode(); +} + +// Sends a GOAWAY if too many streams have been reset prematurely on this +// connection. +void ConnectionManagerImpl::maybeDrainDueToPrematureResets() { + if (!Runtime::runtimeFeatureEnabled( + "envoy.restart_features.send_goaway_for_premature_rst_streams") || + closed_non_internally_destroyed_requests_ == 0) { + return; + } + + const uint64_t limit = + runtime_.snapshot().getInteger(ConnectionManagerImpl::PrematureResetTotalStreamCountKey, 500); + + if (closed_non_internally_destroyed_requests_ < limit) { + return; + } + + if (static_cast(number_premature_stream_resets_) / + closed_non_internally_destroyed_requests_ < + .5) { + return; + } + + if (drain_state_ == DrainState::NotDraining) { + stats_.named_.downstream_rq_too_many_premature_resets_.inc(); + doConnectionClose(Network::ConnectionCloseType::Abort, absl::nullopt, + "too_many_premature_resets"); + } +} + void ConnectionManagerImpl::onGoAway(GoAwayErrorCode) { // Currently we do nothing with remote go away frames. In the future we can decide to no longer // push resources if applicable. @@ -1341,7 +1417,12 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapSharedPt traceRequest(); } - filter_manager_.decodeHeaders(*request_headers_, end_stream); + if (!connection_manager_.shouldDeferRequestProxyingToNextIoCycle()) { + filter_manager_.decodeHeaders(*request_headers_, end_stream); + } else { + state_.deferred_to_next_io_iteration_ = true; + state_.deferred_end_stream_ = end_stream; + } // Reset it here for both global and overridden cases. resetIdleTimer(); @@ -1408,8 +1489,15 @@ void ConnectionManagerImpl::ActiveStream::decodeData(Buffer::Instance& data, boo connection_manager_.read_callbacks_->connection().dispatcher()); maybeEndDecode(end_stream); filter_manager_.streamInfo().addBytesReceived(data.length()); - - filter_manager_.decodeData(data, end_stream); + if (!state_.deferred_to_next_io_iteration_) { + filter_manager_.decodeData(data, end_stream); + } else { + if (!deferred_data_) { + deferred_data_ = std::make_unique(); + } + deferred_data_->move(data); + state_.deferred_end_stream_ = end_stream; + } } void ConnectionManagerImpl::ActiveStream::decodeTrailers(RequestTrailerMapPtr&& trailers) { @@ -1425,7 +1513,9 @@ void ConnectionManagerImpl::ActiveStream::decodeTrailers(RequestTrailerMapPtr&& return; } maybeEndDecode(true); - filter_manager_.decodeTrailers(*request_trailers_); + if (!state_.deferred_to_next_io_iteration_) { + filter_manager_.decodeTrailers(*request_trailers_); + } } void ConnectionManagerImpl::ActiveStream::decodeMetadata(MetadataMapPtr&& metadata_map) { @@ -2138,5 +2228,61 @@ void ConnectionManagerImpl::ActiveStream::resetStream(Http::StreamResetReason, a connection_manager_.doEndStream(*this); } +bool ConnectionManagerImpl::ActiveStream::onDeferredRequestProcessing() { + // TODO(yanavlasov): Merge this with the filter manager continueIteration() method + if (!state_.deferred_to_next_io_iteration_) { + return false; + } + state_.deferred_to_next_io_iteration_ = false; + bool end_stream = + state_.deferred_end_stream_ && deferred_data_ == nullptr && request_trailers_ == nullptr; + filter_manager_.decodeHeaders(*request_headers_, end_stream); + if (end_stream) { + return true; + } + if (deferred_data_ != nullptr) { + end_stream = state_.deferred_end_stream_ && request_trailers_ == nullptr; + filter_manager_.decodeData(*deferred_data_, end_stream); + } + if (request_trailers_ != nullptr) { + filter_manager_.decodeTrailers(*request_trailers_); + } + return true; +} + +bool ConnectionManagerImpl::shouldDeferRequestProxyingToNextIoCycle() { + // Do not defer this stream if stream deferral is disabled + if (deferred_request_processing_callback_ == nullptr) { + return false; + } + // Defer this stream if there are already deferred streams, so they are not + // processed out of order + if (deferred_request_processing_callback_->enabled()) { + return true; + } + ++requests_during_dispatch_count_; + bool defer = requests_during_dispatch_count_ > max_requests_during_dispatch_; + if (defer) { + deferred_request_processing_callback_->scheduleCallbackNextIteration(); + } + return defer; +} + +void ConnectionManagerImpl::onDeferredRequestProcessing() { + requests_during_dispatch_count_ = 1; // 1 stream is always let through + // Streams are inserted at the head of the list. As such process deferred + // streams at the back of the list first. + for (auto reverse_iter = streams_.rbegin(); reverse_iter != streams_.rend();) { + auto& stream_ptr = *reverse_iter; + // Move the iterator to the next item in case the `onDeferredRequestProcessing` call removes the + // stream from the list. + ++reverse_iter; + bool was_deferred = stream_ptr->onDeferredRequestProcessing(); + if (was_deferred && shouldDeferRequestProxyingToNextIoCycle()) { + break; + } + } +} + } // namespace Http } // namespace Envoy diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h index 3049bec11f6e..143f425000fe 100644 --- a/source/common/http/conn_manager_impl.h +++ b/source/common/http/conn_manager_impl.h @@ -118,6 +118,15 @@ class ConnectionManagerImpl : Logger::Loggable, void setClearHopByHopResponseHeaders(bool value) { clear_hop_by_hop_response_headers_ = value; } bool clearHopByHopResponseHeaders() const { return clear_hop_by_hop_response_headers_; } + // This runtime key configures the number of streams which must be closed on a connection before + // envoy will potentially drain a connection due to excessive prematurely reset streams. + static const absl::string_view PrematureResetTotalStreamCountKey; + + // The minimum lifetime of a stream, in seconds, in order not to be considered + // prematurely closed. + static const absl::string_view PrematureResetMinStreamLifetimeSecondsKey; + static const absl::string_view MaxRequestsPerIoCycle; + private: struct ActiveStream; class MobileConnectionManagerImpl; @@ -340,7 +349,7 @@ class ConnectionManagerImpl : Logger::Loggable, : codec_saw_local_complete_(false), codec_encode_complete_(false), on_reset_stream_called_(false), is_zombie_stream_(false), successful_upgrade_(false), is_internally_destroyed_(false), is_internally_created_(false), is_tunneling_(false), - decorated_propagate_(true) {} + decorated_propagate_(true), deferred_to_next_io_iteration_(false) {} // It's possibly for the codec to see the completed response but not fully // encode it. @@ -365,6 +374,14 @@ class ConnectionManagerImpl : Logger::Loggable, bool is_tunneling_ : 1; bool decorated_propagate_ : 1; + + // Indicates that sending headers to the filter manager is deferred to the + // next I/O cycle. If data or trailers are received when this flag is set + // they are deferred too. + // TODO(yanavlasov): encapsulate the entire state of deferred streams into a separate + // structure, so it can be atomically created and cleared. + bool deferred_to_next_io_iteration_ : 1; + bool deferred_end_stream_ : 1; }; bool canDestroyStream() const { @@ -414,6 +431,11 @@ class ConnectionManagerImpl : Logger::Loggable, std::weak_ptr stillAlive() { return {still_alive_}; } + // Dispatch deferred headers, body and trailers to the filter manager. + // Return true if this stream was deferred and dispatched pending headers, body and trailers (if + // present). Return false if this stream was not deferred. + bool onDeferredRequestProcessing(); + ConnectionManagerImpl& connection_manager_; OptRef connection_manager_tracing_config_; // TODO(snowp): It might make sense to move this to the FilterManager to avoid storing it in @@ -503,6 +525,7 @@ class ConnectionManagerImpl : Logger::Loggable, bool spawnUpstreamSpan() const override; std::shared_ptr still_alive_ = std::make_shared(true); + std::unique_ptr deferred_data_; }; using ActiveStreamPtr = std::unique_ptr; @@ -570,6 +593,18 @@ class ConnectionManagerImpl : Logger::Loggable, void doConnectionClose(absl::optional close_type, absl::optional response_flag, absl::string_view details); + // Returns true if a RST_STREAM for the given stream is premature. Premature + // means the RST_STREAM arrived before response headers were sent and than + // the stream was alive for short period of time. This period is specified + // by the optional runtime value PrematureResetMinStreamLifetimeSecondsKey, + // or one second if that is not present. + bool isPrematureRstStream(const ActiveStream& stream) const; + // Sends a GOAWAY if both sufficient streams have been closed on a connection + // and at least half have been prematurely reset? + void maybeDrainDueToPrematureResets(); + + bool shouldDeferRequestProxyingToNextIoCycle(); + void onDeferredRequestProcessing(); enum class DrainState { NotDraining, Draining, Closing }; @@ -610,7 +645,16 @@ class ConnectionManagerImpl : Logger::Loggable, bool clear_hop_by_hop_response_headers_{true}; // The number of requests accumulated on the current connection. uint64_t accumulated_requests_{}; + // The number of requests closed on the current connection which were + // not internally destroyed + uint64_t closed_non_internally_destroyed_requests_{}; + // The number of requests that received a premature RST_STREAM, according to + // the definition given in `isPrematureRstStream()`. + uint64_t number_premature_stream_resets_{0}; const std::string proxy_name_; // for Proxy-Status. + uint32_t requests_during_dispatch_count_{0}; + const uint32_t max_requests_during_dispatch_{UINT32_MAX}; + Event::SchedulableCallbackPtr deferred_request_processing_callback_; const bool refresh_rtt_after_request_{}; }; diff --git a/source/common/runtime/runtime_features.cc b/source/common/runtime/runtime_features.cc index de2fc4c2a298..ec6b310eacda 100644 --- a/source/common/runtime/runtime_features.cc +++ b/source/common/runtime/runtime_features.cc @@ -91,6 +91,7 @@ RUNTIME_GUARD(envoy_reloadable_features_validate_connect); RUNTIME_GUARD(envoy_reloadable_features_validate_detailed_override_host_statuses); RUNTIME_GUARD(envoy_reloadable_features_validate_grpc_header_before_log_grpc_status); RUNTIME_GUARD(envoy_reloadable_features_validate_upstream_headers); +RUNTIME_GUARD(envoy_restart_features_send_goaway_for_premature_rst_streams); RUNTIME_GUARD(envoy_restart_features_udp_read_normalize_addresses); // Begin false flags. These should come with a TODO to flip true. diff --git a/test/common/http/conn_manager_impl_test_2.cc b/test/common/http/conn_manager_impl_test_2.cc index 5daf2b1a45e6..078bd1d85ab7 100644 --- a/test/common/http/conn_manager_impl_test_2.cc +++ b/test/common/http/conn_manager_impl_test_2.cc @@ -11,6 +11,7 @@ using testing::InvokeWithoutArgs; using testing::Mock; using testing::Ref; using testing::Return; +using testing::ReturnArg; using testing::ReturnRef; namespace Envoy { @@ -3767,5 +3768,249 @@ TEST_F(HttpConnectionManagerImplTest, NoProxyProtocolAdded) { // Clean up. filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::RemoteClose); } + +// Validate that deferred streams are processed with a variety of +// headers, data and trailer arriving in the same I/O cycle +TEST_F(HttpConnectionManagerImplTest, LimitWorkPerIOCycle) { + const int kRequestsSentPerIOCycle = 100; + EXPECT_CALL(runtime_.snapshot_, getInteger(_, _)).WillRepeatedly(ReturnArg<1>()); + // Process 1 request per I/O cycle + auto* deferred_request_callback = enableStreamsPerIoLimit(1); + setup(false, ""); + + // Store the basic request encoder during filter chain setup. + std::vector> encoder_filters; + int decode_headers_call_count = 0; + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + std::shared_ptr filter(new NiceMock()); + + // Each 4th request is headers only + EXPECT_CALL(*filter, decodeHeaders(_, i % 4 == 0 ? true : false)) + .WillRepeatedly(Invoke([&](RequestHeaderMap&, bool) -> FilterHeadersStatus { + ++decode_headers_call_count; + return FilterHeadersStatus::StopIteration; + })); + + // Each 1st request is headers and data only + // Each 2nd request is headers, data and trailers + if (i % 4 == 1 || i % 4 == 2) { + EXPECT_CALL(*filter, decodeData(_, i % 4 == 1 ? true : false)) + .WillOnce(Return(FilterDataStatus::StopIterationNoBuffer)); + } + + // Each 3rd request is headers and trailers (no data) + if (i % 4 == 2 || i % 4 == 3) { + EXPECT_CALL(*filter, decodeTrailers(_)).WillOnce(Return(FilterTrailersStatus::StopIteration)); + } + + EXPECT_CALL(*filter, setDecoderFilterCallbacks(_)); + encoder_filters.push_back(std::move(filter)); + } + + uint64_t random_value = 0; + EXPECT_CALL(random_, random()).WillRepeatedly(Invoke([&random_value]() { + return random_value++; + })); + + EXPECT_CALL(filter_factory_, createFilterChain(_)) + .Times(kRequestsSentPerIOCycle) + .WillRepeatedly(Invoke([&encoder_filters](FilterChainManager& manager) -> bool { + static int index = 0; + int i = index++; + FilterFactoryCb factory([&encoder_filters, i](FilterChainFactoryCallbacks& callbacks) { + callbacks.addStreamDecoderFilter(encoder_filters[i]); + }); + manager.applyFilterFactoryCb({}, factory); + return true; + })); + + EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, deferredDelete_(_)) + .Times(kRequestsSentPerIOCycle); + + std::vector> response_encoders(kRequestsSentPerIOCycle); + for (auto& encoder : response_encoders) { + EXPECT_CALL(encoder, getStream()).WillRepeatedly(ReturnRef(encoder.stream_)); + } + + EXPECT_CALL(*codec_, dispatch(_)) + .WillRepeatedly(Invoke([&](Buffer::Instance& data) -> Http::Status { + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + decoder_ = &conn_manager_->newStream(response_encoders[i]); + + RequestHeaderMapPtr headers{new TestRequestHeaderMapImpl{ + {":authority", "host"}, {":path", "/"}, {":method", "GET"}}}; + + RequestTrailerMapPtr trailers{ + new TestRequestTrailerMapImpl{{"key1", "value1"}, {"key2", "value2"}}}; + + Buffer::OwnedImpl data("data"); + + switch (i % 4) { + case 0: + decoder_->decodeHeaders(std::move(headers), true); + break; + case 1: + decoder_->decodeHeaders(std::move(headers), false); + decoder_->decodeData(data, true); + break; + case 2: + decoder_->decodeHeaders(std::move(headers), false); + decoder_->decodeData(data, false); + decoder_->decodeTrailers(std::move(trailers)); + break; + case 3: + decoder_->decodeHeaders(std::move(headers), false); + decoder_->decodeTrailers(std::move(trailers)); + break; + } + } + + data.drain(4); + return Http::okStatus(); + })); + + // Kick off the incoming data. + Buffer::OwnedImpl fake_input("1234"); + conn_manager_->onData(fake_input, false); + + EXPECT_TRUE(deferred_request_callback->enabled_); + // Only one request should go through the filter chain + ASSERT_EQ(decode_headers_call_count, 1); + + // Let other requests to go through the filter chain. Call expectations will fail + // if this is not the case. + int deferred_request_count = 0; + while (deferred_request_callback->enabled_) { + deferred_request_callback->invokeCallback(); + ++deferred_request_count; + } + + ASSERT_EQ(deferred_request_count, kRequestsSentPerIOCycle); + + for (auto& filter : encoder_filters) { + ResponseHeaderMapPtr response_headers{new TestResponseHeaderMapImpl{{":status", "200"}}}; + filter->callbacks_->streamInfo().setResponseCodeDetails(""); + filter->callbacks_->encodeHeaders(std::move(response_headers), true, "details"); + } + + EXPECT_EQ(kRequestsSentPerIOCycle, stats_.named_.downstream_rq_2xx_.value()); + EXPECT_EQ(kRequestsSentPerIOCycle, listener_stats_.downstream_rq_2xx_.value()); + EXPECT_EQ(kRequestsSentPerIOCycle, stats_.named_.downstream_rq_completed_.value()); + EXPECT_EQ(kRequestsSentPerIOCycle, listener_stats_.downstream_rq_completed_.value()); +} + +TEST_F(HttpConnectionManagerImplTest, StreamDeferralPreservesOrder) { + EXPECT_CALL(runtime_.snapshot_, getInteger(_, _)).WillRepeatedly(ReturnArg<1>()); + // Process 1 request per I/O cycle + auto* deferred_request_callback = enableStreamsPerIoLimit(1); + setup(false, ""); + + std::vector> encoder_filters; + int expected_request_id = 0; + const Http::LowerCaseString request_id_header(absl::string_view("request-id")); + // Two requests are processed in 2 I/O reads + const int TotalRequests = 2 * 2; + for (int i = 0; i < TotalRequests; ++i) { + std::shared_ptr filter(new NiceMock()); + + EXPECT_CALL(*filter, decodeHeaders(_, true)) + .WillRepeatedly(Invoke([&](RequestHeaderMap& headers, bool) -> FilterHeadersStatus { + // Check that requests are decoded in expected order + int request_id = 0; + ASSERT(absl::SimpleAtoi(headers.get(request_id_header)[0]->value().getStringView(), + &request_id)); + ASSERT(request_id == expected_request_id); + ++expected_request_id; + return FilterHeadersStatus::StopIteration; + })); + + EXPECT_CALL(*filter, setDecoderFilterCallbacks(_)); + encoder_filters.push_back(std::move(filter)); + } + + uint64_t random_value = 0; + EXPECT_CALL(random_, random()).WillRepeatedly(Invoke([&random_value]() { + return random_value++; + })); + + EXPECT_CALL(filter_factory_, createFilterChain(_)) + .Times(TotalRequests) + .WillRepeatedly(Invoke([&encoder_filters](FilterChainManager& manager) -> bool { + static int index = 0; + int i = index++; + FilterFactoryCb factory([&encoder_filters, i](FilterChainFactoryCallbacks& callbacks) { + callbacks.addStreamDecoderFilter(encoder_filters[i]); + }); + manager.applyFilterFactoryCb({}, factory); + return true; + })); + + EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, deferredDelete_(_)).Times(TotalRequests); + + std::vector> response_encoders(TotalRequests); + for (auto& encoder : response_encoders) { + EXPECT_CALL(encoder, getStream()).WillRepeatedly(ReturnRef(encoder.stream_)); + } + auto response_encoders_iter = response_encoders.begin(); + + int request_id = 0; + EXPECT_CALL(*codec_, dispatch(_)) + .WillRepeatedly(Invoke([&](Buffer::Instance& data) -> Http::Status { + // The second request should be deferred + for (int i = 0; i < 2; ++i) { + decoder_ = &conn_manager_->newStream(*response_encoders_iter); + ++response_encoders_iter; + + RequestHeaderMapPtr headers{ + new TestRequestHeaderMapImpl{{":authority", "host"}, + {":path", "/"}, + {":method", "GET"}, + {"request-id", absl::StrCat(request_id)}}}; + + ++request_id; + decoder_->decodeHeaders(std::move(headers), true); + } + + data.drain(4); + return Http::okStatus(); + })); + + // Kick off the incoming data. + Buffer::OwnedImpl fake_input("1234"); + conn_manager_->onData(fake_input, false); + + EXPECT_TRUE(deferred_request_callback->enabled_); + // Only one request should go through the filter chain + ASSERT_EQ(expected_request_id, 1); + + // Test arrival of another request. New request is read from the socket before deferred callbacks. + Buffer::OwnedImpl fake_input2("1234"); + conn_manager_->onData(fake_input2, false); + + // No requests from the second read should go through as there are deferred stream present + ASSERT_EQ(expected_request_id, 1); + + // Let other requests to go through the filter chain. Call expectations will fail + // if this is not the case. + int deferred_request_count = 0; + while (deferred_request_callback->enabled_) { + deferred_request_callback->invokeCallback(); + ++deferred_request_count; + } + + ASSERT_EQ(deferred_request_count, TotalRequests); + + for (auto& filter : encoder_filters) { + ResponseHeaderMapPtr response_headers{new TestResponseHeaderMapImpl{{":status", "200"}}}; + filter->callbacks_->streamInfo().setResponseCodeDetails(""); + filter->callbacks_->encodeHeaders(std::move(response_headers), true, "details"); + } + + EXPECT_EQ(TotalRequests, stats_.named_.downstream_rq_2xx_.value()); + EXPECT_EQ(TotalRequests, listener_stats_.downstream_rq_2xx_.value()); + EXPECT_EQ(TotalRequests, stats_.named_.downstream_rq_completed_.value()); + EXPECT_EQ(TotalRequests, listener_stats_.downstream_rq_completed_.value()); +} + } // namespace Http } // namespace Envoy diff --git a/test/common/http/conn_manager_impl_test_base.cc b/test/common/http/conn_manager_impl_test_base.cc index ee9cace50012..da6d8d8aa9d7 100644 --- a/test/common/http/conn_manager_impl_test_base.cc +++ b/test/common/http/conn_manager_impl_test_base.cc @@ -78,6 +78,7 @@ void HttpConnectionManagerImplMixin::setup(bool ssl, const std::string& server_n conn_manager_ = std::make_unique( *this, drain_close_, random_, http_context_, runtime_, local_info_, cluster_manager_, overload_manager_, test_time_.timeSystem()); + conn_manager_->initializeReadFilterCallbacks(filter_callbacks_); if (tracing) { @@ -395,5 +396,23 @@ void HttpConnectionManagerImplMixin::expectUhvTrailerCheck( })); } +Event::MockSchedulableCallback* +HttpConnectionManagerImplMixin::enableStreamsPerIoLimit(uint32_t limit) { + EXPECT_CALL(runtime_.snapshot_, getInteger("http.max_requests_per_io_cycle", _)) + .WillOnce(Return(limit)); + + // Expect HCM to create and set schedulable callback + auto* deferred_request_callback = + new Event::MockSchedulableCallback(&filter_callbacks_.connection_.dispatcher_); + EXPECT_CALL(*deferred_request_callback, enabled()) + .WillRepeatedly( + Invoke([deferred_request_callback]() { return deferred_request_callback->enabled_; })); + EXPECT_CALL(*deferred_request_callback, scheduleCallbackNextIteration()) + .WillRepeatedly( + Invoke([deferred_request_callback]() { deferred_request_callback->enabled_ = true; })); + + return deferred_request_callback; +} + } // namespace Http } // namespace Envoy diff --git a/test/common/http/conn_manager_impl_test_base.h b/test/common/http/conn_manager_impl_test_base.h index 76a051c8641e..ddccd0a77321 100644 --- a/test/common/http/conn_manager_impl_test_base.h +++ b/test/common/http/conn_manager_impl_test_base.h @@ -206,6 +206,8 @@ class HttpConnectionManagerImplMixin : public ConnectionManagerConfig { // validate headers. void expectCheckWithDefaultUhv(); + Event::MockSchedulableCallback* enableStreamsPerIoLimit(uint32_t limit); + Envoy::Event::SimulatedTimeSystem test_time_; NiceMock route_config_provider_; std::shared_ptr route_config_{new NiceMock()}; diff --git a/test/common/http/http2/http2_frame.cc b/test/common/http/http2/http2_frame.cc index c5172938a804..46ba3751ba24 100644 --- a/test/common/http/http2/http2_frame.cc +++ b/test/common/http/http2/http2_frame.cc @@ -341,7 +341,11 @@ Http2Frame Http2Frame::makeRequest(uint32_t stream_index, absl::string_view host makeNetworkOrderStreamId(stream_index)); frame.appendStaticHeader(StaticHeaderIndex::MethodGet); frame.appendStaticHeader(StaticHeaderIndex::SchemeHttps); - frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Path, path); + if (path.empty() || path == "/") { + frame.appendStaticHeader(StaticHeaderIndex::Path); + } else { + frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Path, path); + } frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Authority, host); frame.adjustPayloadSize(); return frame; @@ -365,7 +369,11 @@ Http2Frame Http2Frame::makePostRequest(uint32_t stream_index, absl::string_view makeNetworkOrderStreamId(stream_index)); frame.appendStaticHeader(StaticHeaderIndex::MethodPost); frame.appendStaticHeader(StaticHeaderIndex::SchemeHttps); - frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Path, path); + if (path.empty() || path == "/") { + frame.appendStaticHeader(StaticHeaderIndex::Path); + } else { + frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Path, path); + } frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Authority, host); frame.adjustPayloadSize(); return frame; diff --git a/test/common/http/http2/http2_frame.h b/test/common/http/http2/http2_frame.h index c41b3a81808e..ea1613d4635a 100644 --- a/test/common/http/http2/http2_frame.h +++ b/test/common/http/http2/http2_frame.h @@ -253,6 +253,13 @@ class Http2Frame { ConstIterator end() const { return data_.end(); } bool empty() const { return data_.empty(); } + void appendHeaderWithoutIndexing(const Header& header); + // This method updates payload length in the HTTP2 header based on the size of the data_ + void adjustPayloadSize() { + ASSERT(size() >= HeaderSize); + setPayloadSize(size() - HeaderSize); + } + private: void buildHeader(Type type, uint32_t payload_size = 0, uint8_t flags = 0, uint32_t stream_id = 0); void setPayloadSize(uint32_t size); @@ -272,15 +279,8 @@ class Http2Frame { // Headers are directly encoded void appendStaticHeader(StaticHeaderIndex index); void appendHeaderWithoutIndexing(StaticHeaderIndex index, absl::string_view value); - void appendHeaderWithoutIndexing(const Header& header); void appendEmptyHeader(); - // This method updates payload length in the HTTP2 header based on the size of the data_ - void adjustPayloadSize() { - ASSERT(size() >= HeaderSize); - setPayloadSize(size() - HeaderSize); - } - DataContainer data_; }; diff --git a/test/integration/multiplexed_integration_test.cc b/test/integration/multiplexed_integration_test.cc index 88d51696a6dc..676ea9f5ab1d 100644 --- a/test/integration/multiplexed_integration_test.cc +++ b/test/integration/multiplexed_integration_test.cc @@ -1,4 +1,5 @@ #include +#include #include #include @@ -25,6 +26,7 @@ #include "test/mocks/http/mocks.h" #include "test/test_common/network_utility.h" #include "test/test_common/printers.h" +#include "test/test_common/simulated_time_system.h" #include "test/test_common/utility.h" #include "gtest/gtest.h" @@ -92,6 +94,15 @@ INSTANTIATE_TEST_SUITE_P(IpVersions, MultiplexedIntegrationTest, {Http::CodecType::HTTP1})), HttpProtocolIntegrationTest::protocolTestParamsToString); +class MultiplexedIntegrationTestWithSimulatedTime : public Event::TestUsingSimulatedTime, + public MultiplexedIntegrationTest {}; + +INSTANTIATE_TEST_SUITE_P(IpVersions, MultiplexedIntegrationTestWithSimulatedTime, + testing::ValuesIn(HttpProtocolIntegrationTest::getProtocolTestParams( + {Http::CodecType::HTTP2, Http::CodecType::HTTP3}, + {Http::CodecType::HTTP1})), + HttpProtocolIntegrationTest::protocolTestParamsToString); + TEST_P(MultiplexedIntegrationTest, RouterRequestAndResponseWithBodyNoBuffer) { testRouterRequestAndResponseWithBody(1024, 512, false, false); } @@ -1076,6 +1087,67 @@ TEST_P(MultiplexedIntegrationTest, GoAway) { EXPECT_EQ("200", response->headers().getStatusValue()); } +// TODO(rch): Add a unit test which covers internal redirect handling. +TEST_P(MultiplexedIntegrationTestWithSimulatedTime, GoAwayAfterTooManyResets) { + EXCLUDE_DOWNSTREAM_HTTP3; // Need to wait for the server to reset the stream + // before opening new one. + config_helper_.addRuntimeOverride("envoy.restart_features.send_goaway_for_premature_rst_streams", + "true"); + const int total_streams = 100; + config_helper_.addRuntimeOverride("overload.premature_reset_total_stream_count", + absl::StrCat(total_streams)); + initialize(); + + Http::TestRequestHeaderMapImpl headers{ + {":method", "GET"}, {":path", "/healthcheck"}, {":scheme", "http"}, {":authority", "host"}}; + codec_client_ = makeHttpConnection(lookupPort("http")); + for (int i = 0; i < total_streams; ++i) { + auto encoder_decoder = codec_client_->startRequest(headers); + request_encoder_ = &encoder_decoder.first; + auto response = std::move(encoder_decoder.second); + codec_client_->sendReset(*request_encoder_); + ASSERT_TRUE(response->waitForReset()); + } + + // Envoy should disconnect client due to premature reset check + ASSERT_TRUE(codec_client_->waitForDisconnect()); + test_server_->waitForCounterEq("http.config_test.downstream_rq_rx_reset", total_streams); + test_server_->waitForCounterEq("http.config_test.downstream_rq_too_many_premature_resets", 1); +} + +TEST_P(MultiplexedIntegrationTestWithSimulatedTime, DontGoAwayAfterTooManyResetsForLongStreams) { + EXCLUDE_DOWNSTREAM_HTTP3; // Need to wait for the server to reset the stream + // before opening new one. + config_helper_.addRuntimeOverride("envoy.restart_features.send_goaway_for_premature_rst_streams", + "true"); + const int total_streams = 100; + const int stream_lifetime_seconds = 2; + config_helper_.addRuntimeOverride("overload.premature_reset_total_stream_count", + absl::StrCat(total_streams)); + + config_helper_.addRuntimeOverride("overload.premature_reset_min_stream_lifetime_seconds", + absl::StrCat(stream_lifetime_seconds)); + + initialize(); + + Http::TestRequestHeaderMapImpl headers{ + {":method", "GET"}, {":path", "/healthcheck"}, {":scheme", "http"}, {":authority", "host"}}; + codec_client_ = makeHttpConnection(lookupPort("http")); + + std::string request_counter = "http.config_test.downstream_rq_total"; + std::string reset_counter = "http.config_test.downstream_rq_rx_reset"; + for (int i = 0; i < total_streams * 2; ++i) { + auto encoder_decoder = codec_client_->startRequest(headers); + request_encoder_ = &encoder_decoder.first; + auto response = std::move(encoder_decoder.second); + test_server_->waitForCounterEq(request_counter, i + 1); + timeSystem().advanceTimeWait(std::chrono::seconds(2 * stream_lifetime_seconds)); + codec_client_->sendReset(*request_encoder_); + ASSERT_TRUE(response->waitForReset()); + test_server_->waitForCounterEq(reset_counter, i + 1); + } +} + TEST_P(MultiplexedIntegrationTest, Trailers) { testTrailers(1024, 2048, false, false); } TEST_P(MultiplexedIntegrationTest, TrailersGiantBody) { @@ -2117,6 +2189,178 @@ TEST_P(Http2FrameIntegrationTest, HostSameAsAuthority) { tcp_client_->close(); } +TEST_P(Http2FrameIntegrationTest, MultipleHeaderOnlyRequests) { + const int kRequestsSentPerIOCycle = 20; + autonomous_upstream_ = true; + config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "1"); + beginSession(); + + std::string buffer; + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto request = Http2Frame::makeRequest(Http2Frame::makeClientStreamId(i), "a", "/", + {{"response_data_blocks", "0"}, {"no_trailers", "1"}}); + absl::StrAppend(&buffer, std::string(request)); + } + + ASSERT_TRUE(tcp_client_->write(buffer, false, false)); + + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto frame = readFrame(); + EXPECT_EQ(Http2Frame::Type::Headers, frame.type()); + EXPECT_EQ(Http2Frame::ResponseStatus::Ok, frame.responseStatus()); + } + tcp_client_->close(); +} + +TEST_P(Http2FrameIntegrationTest, MultipleRequests) { + const int kRequestsSentPerIOCycle = 20; + autonomous_upstream_ = true; + config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "1"); + beginSession(); + + std::string buffer; + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto request = + Http2Frame::makePostRequest(Http2Frame::makeClientStreamId(i), "a", "/", + {{"response_data_blocks", "0"}, {"no_trailers", "1"}}); + absl::StrAppend(&buffer, std::string(request)); + } + + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto data = Http2Frame::makeDataFrame(Http2Frame::makeClientStreamId(i), "a", + Http2Frame::DataFlags::EndStream); + absl::StrAppend(&buffer, std::string(data)); + } + + ASSERT_TRUE(tcp_client_->write(buffer, false, false)); + + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto frame = readFrame(); + EXPECT_EQ(Http2Frame::Type::Headers, frame.type()); + EXPECT_EQ(Http2Frame::ResponseStatus::Ok, frame.responseStatus()); + } + tcp_client_->close(); +} + +TEST_P(Http2FrameIntegrationTest, MultipleRequestsWithTrailers) { + const int kRequestsSentPerIOCycle = 20; + autonomous_upstream_ = true; + config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "1"); + beginSession(); + + std::string buffer; + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto request = + Http2Frame::makePostRequest(Http2Frame::makeClientStreamId(i), "a", "/", + {{"response_data_blocks", "0"}, {"no_trailers", "1"}}); + absl::StrAppend(&buffer, std::string(request)); + } + + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto data = Http2Frame::makeDataFrame(Http2Frame::makeClientStreamId(i), "a"); + absl::StrAppend(&buffer, std::string(data)); + } + + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto trailers = Http2Frame::makeEmptyHeadersFrame( + Http2Frame::makeClientStreamId(i), + static_cast(Http::Http2::orFlags( + Http2Frame::HeadersFlags::EndStream, Http2Frame::HeadersFlags::EndHeaders))); + trailers.appendHeaderWithoutIndexing({"k", "v"}); + trailers.adjustPayloadSize(); + absl::StrAppend(&buffer, std::string(trailers)); + } + + ASSERT_TRUE(tcp_client_->write(buffer, false, false)); + + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto frame = readFrame(); + EXPECT_EQ(Http2Frame::Type::Headers, frame.type()); + EXPECT_EQ(Http2Frame::ResponseStatus::Ok, frame.responseStatus()); + } + tcp_client_->close(); +} + +TEST_P(Http2FrameIntegrationTest, MultipleHeaderOnlyRequestsFollowedByReset) { + // This number of requests stays below premature reset detection. + const int kRequestsSentPerIOCycle = 20; + config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "1"); + beginSession(); + + std::string buffer; + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto request = Http2Frame::makeRequest(Http2Frame::makeClientStreamId(i), "a", "/", + {{"response_data_blocks", "0"}, {"no_trailers", "1"}}); + absl::StrAppend(&buffer, std::string(request)); + } + + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto reset = Http2Frame::makeResetStreamFrame(Http2Frame::makeClientStreamId(i), + Http2Frame::ErrorCode::Cancel); + absl::StrAppend(&buffer, std::string(reset)); + } + + ASSERT_TRUE(tcp_client_->write(buffer, false, false)); + test_server_->waitForCounterEq("http.config_test.downstream_rq_rx_reset", + kRequestsSentPerIOCycle); + // Client should remain connected + ASSERT_TRUE(tcp_client_->connected()); + tcp_client_->close(); +} + +// This test depends on an another patch with premature resets +TEST_P(Http2FrameIntegrationTest, ResettingDeferredRequestsTriggersPrematureResetCheck) { + const int kRequestsSentPerIOCycle = 20; + // Set premature stream count to the number of streams we are about to send + config_helper_.addRuntimeOverride("overload.premature_reset_total_stream_count", "20"); + config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "1"); + beginSession(); + + std::string buffer; + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto request = Http2Frame::makeRequest(Http2Frame::makeClientStreamId(i), "a", "/", + {{"response_data_blocks", "0"}, {"no_trailers", "1"}}); + absl::StrAppend(&buffer, std::string(request)); + } + + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto reset = Http2Frame::makeResetStreamFrame(Http2Frame::makeClientStreamId(i), + Http2Frame::ErrorCode::Cancel); + absl::StrAppend(&buffer, std::string(reset)); + } + + ASSERT_TRUE(tcp_client_->write(buffer, false, false)); + // Envoy should close the client connection due to too many premature resets + tcp_client_->waitForDisconnect(); + test_server_->waitForCounterEq("http.config_test.downstream_rq_too_many_premature_resets", 1); + tcp_client_->close(); +} + +TEST_P(Http2FrameIntegrationTest, CloseConnectionWithDeferredStreams) { + // Use large number of requests to ensure close is detected while there are + // still some deferred streams. + const int kRequestsSentPerIOCycle = 1000; + config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "1"); + // Ensure premature reset detection does not get in the way + config_helper_.addRuntimeOverride("overload.premature_reset_total_stream_count", "1001"); + beginSession(); + + std::string buffer; + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto request = Http2Frame::makeRequest(Http2Frame::makeClientStreamId(i), "a", "/", + {{"response_data_blocks", "0"}, {"no_trailers", "1"}}); + absl::StrAppend(&buffer, std::string(request)); + } + + ASSERT_TRUE(tcp_client_->write(buffer, false, false)); + ASSERT_TRUE(tcp_client_->connected()); + // Drop the downstream connection + tcp_client_->close(); + // Test that Envoy can clean-up deferred streams + test_server_->waitForCounterEq("http.config_test.downstream_rq_rx_reset", + kRequestsSentPerIOCycle); +} + INSTANTIATE_TEST_SUITE_P(IpVersions, Http2FrameIntegrationTest, testing::ValuesIn(Http2FrameIntegrationTest::testParams()), frameIntegrationTestParamToString);