diff --git a/changelogs/current.yaml b/changelogs/current.yaml index 9ecf0d6e48ce..f8919d9ee8ef 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -8,6 +8,10 @@ minor_behavior_changes: bug_fixes: # *Changes expected to improve the state of the world and are unlikely to have negative effects* +- area: http + change: | + Fixed a bug where processing of deferred streams with the value of ``http.max_requests_per_io_cycle`` more than 1, + can cause a crash. removed_config_or_runtime: # *Normally occurs at the end of the* :ref:`deprecation period ` diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index 02faaa932d43..1a14cc891b40 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -1934,6 +1935,8 @@ bool ConnectionManagerImpl::ActiveStream::onDeferredRequestProcessing() { if (end_stream) { return true; } + // Filter manager will return early from decodeData and decodeTrailers if + // request has completed. if (deferred_data_ != nullptr) { end_stream = state_.deferred_end_stream_ && request_trailers_ == nullptr; filter_manager_.decodeData(*deferred_data_, end_stream); @@ -1963,19 +1966,26 @@ bool ConnectionManagerImpl::shouldDeferRequestProxyingToNextIoCycle() { } void ConnectionManagerImpl::onDeferredRequestProcessing() { + if (streams_.empty()) { + return; + } requests_during_dispatch_count_ = 1; // 1 stream is always let through // Streams are inserted at the head of the list. As such process deferred - // streams at the back of the list first. - for (auto reverse_iter = streams_.rbegin(); reverse_iter != streams_.rend();) { - auto& stream_ptr = *reverse_iter; - // Move the iterator to the next item in case the `onDeferredRequestProcessing` call removes the - // stream from the list. - ++reverse_iter; - bool was_deferred = stream_ptr->onDeferredRequestProcessing(); + // streams in the reverse order. + auto reverse_iter = std::prev(streams_.end()); + bool at_first_element = false; + do { + at_first_element = reverse_iter == streams_.begin(); + // Move the iterator to the previous item in case the `onDeferredRequestProcessing` call removes + // the stream from the list. + auto previous_element = std::prev(reverse_iter); + bool was_deferred = (*reverse_iter)->onDeferredRequestProcessing(); if (was_deferred && shouldDeferRequestProxyingToNextIoCycle()) { break; } - } + reverse_iter = previous_element; + // TODO(yanavlasov): see if `rend` can be used. + } while (!at_first_element); } } // namespace Http diff --git a/test/common/http/http2/http2_frame.cc b/test/common/http/http2/http2_frame.cc index f7f2ad117d76..e093285b7729 100644 --- a/test/common/http/http2/http2_frame.cc +++ b/test/common/http/http2/http2_frame.cc @@ -51,12 +51,22 @@ Http2Frame::ResponseStatus Http2Frame::responseStatus() const { return ResponseStatus::Ok; case StaticHeaderIndex::Status404: return ResponseStatus::NotFound; + case StaticHeaderIndex::Status500: + return ResponseStatus::InternalServerError; default: break; } return ResponseStatus::Unknown; } +uint32_t Http2Frame::streamId() const { + if (empty() || size() <= HeaderSize) { + return 0; + } + return (uint32_t(data_[5]) << 24) + (uint32_t(data_[6]) << 16) + (uint32_t(data_[7]) << 8) + + uint32_t(data_[8]); +} + void Http2Frame::buildHeader(Type type, uint32_t payload_size, uint8_t flags, uint32_t stream_id) { data_.assign(payload_size + HeaderSize, 0); setPayloadSize(payload_size); diff --git a/test/common/http/http2/http2_frame.h b/test/common/http/http2/http2_frame.h index 1b2ba0fea5d5..44340d4e9bb6 100644 --- a/test/common/http/http2/http2_frame.h +++ b/test/common/http/http2/http2_frame.h @@ -109,7 +109,7 @@ class Http2Frame { Http11Required }; - enum class ResponseStatus { Unknown, Ok, NotFound }; + enum class ResponseStatus { Unknown, Ok, NotFound, InternalServerError }; struct Header { Header(absl::string_view key, absl::string_view value) : key_(key), value_(value) {} @@ -182,6 +182,7 @@ class Http2Frame { Type type() const { return static_cast(data_[3]); } ResponseStatus responseStatus() const; + uint32_t streamId() const; // Copy HTTP2 header. The `header` parameter must at least be HeaderSize long. // Allocates payload size based on the value in the header. diff --git a/test/integration/BUILD b/test/integration/BUILD index 0dfc805e6bb2..c2c7d0ba5d39 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -412,6 +412,7 @@ envoy_cc_test( "//source/common/buffer:buffer_lib", "//source/common/http:header_map_lib", "//source/extensions/filters/http/buffer:config", + "//test/integration/filters:local_reply_during_decoding_filter_lib", "//test/integration/filters:metadata_stop_all_filter_config_lib", "//test/integration/filters:on_local_reply_filter_config_lib", "//test/integration/filters:request_metadata_filter_config_lib", diff --git a/test/integration/filters/BUILD b/test/integration/filters/BUILD index f85648e1aa3e..2031f9a764a4 100644 --- a/test/integration/filters/BUILD +++ b/test/integration/filters/BUILD @@ -82,6 +82,21 @@ envoy_cc_test_library( ], ) +envoy_cc_test_library( + name = "local_reply_during_decoding_filter_lib", + srcs = [ + "local_reply_during_decoding_filter.cc", + ], + deps = [ + ":common_lib", + "//envoy/http:filter_interface", + "//envoy/registry", + "//envoy/server:filter_config_interface", + "//source/extensions/filters/http/common:pass_through_filter_lib", + "//test/extensions/filters/http/common:empty_http_filter_config_lib", + ], +) + envoy_cc_test_library( name = "tee_filter_lib", srcs = [ diff --git a/test/integration/filters/local_reply_during_decoding_filter.cc b/test/integration/filters/local_reply_during_decoding_filter.cc new file mode 100644 index 000000000000..f29beb572365 --- /dev/null +++ b/test/integration/filters/local_reply_during_decoding_filter.cc @@ -0,0 +1,37 @@ +#include + +#include "envoy/http/filter.h" +#include "envoy/registry/registry.h" +#include "envoy/server/filter_config.h" + +#include "source/extensions/filters/http/common/pass_through_filter.h" + +#include "test/extensions/filters/http/common/empty_http_filter_config.h" +#include "test/integration/filters/common.h" + +namespace Envoy { + +class LocalReplyDuringDecode : public Http::PassThroughFilter { +public: + constexpr static char name[] = "local-reply-during-decode"; + + Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& request_headers, bool) override { + auto result = request_headers.get(Http::LowerCaseString("skip-local-reply")); + if (!result.empty() && result[0]->value() == "true") { + return Http::FilterHeadersStatus::Continue; + } + decoder_callbacks_->sendLocalReply(Http::Code::InternalServerError, "", nullptr, absl::nullopt, + ""); + return Http::FilterHeadersStatus::StopIteration; + } +}; + +constexpr char LocalReplyDuringDecode::name[]; +static Registry::RegisterFactory, + Server::Configuration::NamedHttpFilterConfigFactory> + register_; +static Registry::RegisterFactory, + Server::Configuration::UpstreamHttpFilterConfigFactory> + register_upstream_; + +} // namespace Envoy diff --git a/test/integration/multiplexed_integration_test.cc b/test/integration/multiplexed_integration_test.cc index 69a75e3dfd18..a03c5533143a 100644 --- a/test/integration/multiplexed_integration_test.cc +++ b/test/integration/multiplexed_integration_test.cc @@ -2040,6 +2040,43 @@ TEST_P(Http2FrameIntegrationTest, MultipleRequests) { tcp_client_->close(); } +// Validate the request completion during processing of deferred list works. +TEST_P(Http2FrameIntegrationTest, MultipleRequestsDecodeHeadersEndsRequest) { + const int kRequestsSentPerIOCycle = 20; + // The local-reply-during-decode will call sendLocalReply, completing them + // when processing headers. This will cause the ConnectionManagerImpl::ActiveRequest + // object to be removed from the streams_ list during the onDeferredRequestProcessing call. + config_helper_.addFilter("{ name: local-reply-during-decode }"); + // Process more than 1 deferred request at a time to validate the removal of elements from + // the list does not break reverse iteration. + config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "3"); + beginSession(); + + std::string buffer; + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto request = + Http2Frame::makePostRequest(Http2Frame::makeClientStreamId(i), "a", "/", + {{"response_data_blocks", "0"}, {"no_trailers", "1"}}); + absl::StrAppend(&buffer, std::string(request)); + } + + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto data = Http2Frame::makeDataFrame(Http2Frame::makeClientStreamId(i), "a", + Http2Frame::DataFlags::EndStream); + absl::StrAppend(&buffer, std::string(data)); + } + + ASSERT_TRUE(tcp_client_->write(buffer, false, false)); + + // The local-reply-during-decode filter sends 500 status to the client + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto frame = readFrame(); + EXPECT_EQ(Http2Frame::Type::Headers, frame.type()); + EXPECT_EQ(Http2Frame::ResponseStatus::InternalServerError, frame.responseStatus()); + } + tcp_client_->close(); +} + TEST_P(Http2FrameIntegrationTest, MultipleRequestsWithTrailers) { const int kRequestsSentPerIOCycle = 20; autonomous_upstream_ = true; @@ -2079,6 +2116,59 @@ TEST_P(Http2FrameIntegrationTest, MultipleRequestsWithTrailers) { tcp_client_->close(); } +// Validate the request completion during processing of headers in the deferred requests, +// is ok, when deferred data and trailers are also present. +TEST_P(Http2FrameIntegrationTest, MultipleRequestsWithTrailersDecodeHeadersEndsRequest) { + const int kRequestsSentPerIOCycle = 20; + autonomous_upstream_ = true; + config_helper_.addFilter("{ name: local-reply-during-decode }"); + config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "6"); + beginSession(); + + std::string buffer; + // Make every 4th request to be reset by the local-reply-during-decode filter, this will give a + // good distribution of removed requests from the deferred sequence. + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto request = Http2Frame::makePostRequest(Http2Frame::makeClientStreamId(i), "a", "/", + {{"response_data_blocks", "0"}, + {"no_trailers", "1"}, + {"skip-local-reply", i % 4 ? "true" : "false"}}); + absl::StrAppend(&buffer, std::string(request)); + } + + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto data = Http2Frame::makeDataFrame(Http2Frame::makeClientStreamId(i), "a"); + absl::StrAppend(&buffer, std::string(data)); + } + + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto trailers = Http2Frame::makeEmptyHeadersFrame( + Http2Frame::makeClientStreamId(i), + static_cast(Http::Http2::orFlags( + Http2Frame::HeadersFlags::EndStream, Http2Frame::HeadersFlags::EndHeaders))); + trailers.appendHeaderWithoutIndexing({"k", "v"}); + trailers.adjustPayloadSize(); + absl::StrAppend(&buffer, std::string(trailers)); + } + + ASSERT_TRUE(tcp_client_->write(buffer, false, false)); + + for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { + auto frame = readFrame(); + EXPECT_EQ(Http2Frame::Type::Headers, frame.type()); + uint32_t stream_id = frame.streamId(); + // Client stream indexes are multiples of 2 starting at 1 + if ((stream_id / 2) % 4) { + EXPECT_EQ(Http2Frame::ResponseStatus::Ok, frame.responseStatus()) + << " for stream=" << stream_id; + } else { + EXPECT_EQ(Http2Frame::ResponseStatus::InternalServerError, frame.responseStatus()) + << " for stream=" << stream_id; + } + } + tcp_client_->close(); +} + TEST_P(Http2FrameIntegrationTest, MultipleHeaderOnlyRequestsFollowedByReset) { // This number of requests stays below premature reset detection. const int kRequestsSentPerIOCycle = 20;