Skip to content

Commit

Permalink
HCM: Make reverse iteration resilient to element deletion (#30158)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Yan Avlasov <[email protected]>
  • Loading branch information
yanavlasov authored and phlax committed Oct 16, 2023
1 parent 8acfab7 commit a470c6c
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 9 deletions.
4 changes: 4 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ minor_behavior_changes:

bug_fixes:
# *Changes expected to improve the state of the world and are unlikely to have negative effects*
- area: http
change: |
Fixed a bug where processing of deferred streams with the value of ``http.max_requests_per_io_cycle`` more than 1,
can cause a crash.
removed_config_or_runtime:
# *Normally occurs at the end of the* :ref:`deprecation period <deprecated>`
Expand Down
26 changes: 18 additions & 8 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <chrono>
#include <cstdint>
#include <functional>
#include <iterator>
#include <list>
#include <memory>
#include <string>
Expand Down Expand Up @@ -1934,6 +1935,8 @@ bool ConnectionManagerImpl::ActiveStream::onDeferredRequestProcessing() {
if (end_stream) {
return true;
}
// Filter manager will return early from decodeData and decodeTrailers if
// request has completed.
if (deferred_data_ != nullptr) {
end_stream = state_.deferred_end_stream_ && request_trailers_ == nullptr;
filter_manager_.decodeData(*deferred_data_, end_stream);
Expand Down Expand Up @@ -1963,19 +1966,26 @@ bool ConnectionManagerImpl::shouldDeferRequestProxyingToNextIoCycle() {
}

void ConnectionManagerImpl::onDeferredRequestProcessing() {
if (streams_.empty()) {
return;
}
requests_during_dispatch_count_ = 1; // 1 stream is always let through
// Streams are inserted at the head of the list. As such process deferred
// streams at the back of the list first.
for (auto reverse_iter = streams_.rbegin(); reverse_iter != streams_.rend();) {
auto& stream_ptr = *reverse_iter;
// Move the iterator to the next item in case the `onDeferredRequestProcessing` call removes the
// stream from the list.
++reverse_iter;
bool was_deferred = stream_ptr->onDeferredRequestProcessing();
// streams in the reverse order.
auto reverse_iter = std::prev(streams_.end());
bool at_first_element = false;
do {
at_first_element = reverse_iter == streams_.begin();
// Move the iterator to the previous item in case the `onDeferredRequestProcessing` call removes
// the stream from the list.
auto previous_element = std::prev(reverse_iter);
bool was_deferred = (*reverse_iter)->onDeferredRequestProcessing();
if (was_deferred && shouldDeferRequestProxyingToNextIoCycle()) {
break;
}
}
reverse_iter = previous_element;
// TODO(yanavlasov): see if `rend` can be used.
} while (!at_first_element);
}

} // namespace Http
Expand Down
10 changes: 10 additions & 0 deletions test/common/http/http2/http2_frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,22 @@ Http2Frame::ResponseStatus Http2Frame::responseStatus() const {
return ResponseStatus::Ok;
case StaticHeaderIndex::Status404:
return ResponseStatus::NotFound;
case StaticHeaderIndex::Status500:
return ResponseStatus::InternalServerError;
default:
break;
}
return ResponseStatus::Unknown;
}

uint32_t Http2Frame::streamId() const {
if (empty() || size() <= HeaderSize) {
return 0;
}
return (uint32_t(data_[5]) << 24) + (uint32_t(data_[6]) << 16) + (uint32_t(data_[7]) << 8) +
uint32_t(data_[8]);
}

void Http2Frame::buildHeader(Type type, uint32_t payload_size, uint8_t flags, uint32_t stream_id) {
data_.assign(payload_size + HeaderSize, 0);
setPayloadSize(payload_size);
Expand Down
3 changes: 2 additions & 1 deletion test/common/http/http2/http2_frame.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class Http2Frame {
Http11Required
};

enum class ResponseStatus { Unknown, Ok, NotFound };
enum class ResponseStatus { Unknown, Ok, NotFound, InternalServerError };

struct Header {
Header(absl::string_view key, absl::string_view value) : key_(key), value_(value) {}
Expand Down Expand Up @@ -182,6 +182,7 @@ class Http2Frame {

Type type() const { return static_cast<Type>(data_[3]); }
ResponseStatus responseStatus() const;
uint32_t streamId() const;

// Copy HTTP2 header. The `header` parameter must at least be HeaderSize long.
// Allocates payload size based on the value in the header.
Expand Down
1 change: 1 addition & 0 deletions test/integration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ envoy_cc_test(
"//source/common/buffer:buffer_lib",
"//source/common/http:header_map_lib",
"//source/extensions/filters/http/buffer:config",
"//test/integration/filters:local_reply_during_decoding_filter_lib",
"//test/integration/filters:metadata_stop_all_filter_config_lib",
"//test/integration/filters:on_local_reply_filter_config_lib",
"//test/integration/filters:request_metadata_filter_config_lib",
Expand Down
15 changes: 15 additions & 0 deletions test/integration/filters/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,21 @@ envoy_cc_test_library(
],
)

envoy_cc_test_library(
name = "local_reply_during_decoding_filter_lib",
srcs = [
"local_reply_during_decoding_filter.cc",
],
deps = [
":common_lib",
"//envoy/http:filter_interface",
"//envoy/registry",
"//envoy/server:filter_config_interface",
"//source/extensions/filters/http/common:pass_through_filter_lib",
"//test/extensions/filters/http/common:empty_http_filter_config_lib",
],
)

envoy_cc_test_library(
name = "tee_filter_lib",
srcs = [
Expand Down
37 changes: 37 additions & 0 deletions test/integration/filters/local_reply_during_decoding_filter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#include <string>

#include "envoy/http/filter.h"
#include "envoy/registry/registry.h"
#include "envoy/server/filter_config.h"

#include "source/extensions/filters/http/common/pass_through_filter.h"

#include "test/extensions/filters/http/common/empty_http_filter_config.h"
#include "test/integration/filters/common.h"

namespace Envoy {

class LocalReplyDuringDecode : public Http::PassThroughFilter {
public:
constexpr static char name[] = "local-reply-during-decode";

Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& request_headers, bool) override {
auto result = request_headers.get(Http::LowerCaseString("skip-local-reply"));
if (!result.empty() && result[0]->value() == "true") {
return Http::FilterHeadersStatus::Continue;
}
decoder_callbacks_->sendLocalReply(Http::Code::InternalServerError, "", nullptr, absl::nullopt,
"");
return Http::FilterHeadersStatus::StopIteration;
}
};

constexpr char LocalReplyDuringDecode::name[];
static Registry::RegisterFactory<SimpleFilterConfig<LocalReplyDuringDecode>,
Server::Configuration::NamedHttpFilterConfigFactory>
register_;
static Registry::RegisterFactory<SimpleFilterConfig<LocalReplyDuringDecode>,
Server::Configuration::UpstreamHttpFilterConfigFactory>
register_upstream_;

} // namespace Envoy
90 changes: 90 additions & 0 deletions test/integration/multiplexed_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2040,6 +2040,43 @@ TEST_P(Http2FrameIntegrationTest, MultipleRequests) {
tcp_client_->close();
}

// Validate the request completion during processing of deferred list works.
TEST_P(Http2FrameIntegrationTest, MultipleRequestsDecodeHeadersEndsRequest) {
const int kRequestsSentPerIOCycle = 20;
// The local-reply-during-decode will call sendLocalReply, completing them
// when processing headers. This will cause the ConnectionManagerImpl::ActiveRequest
// object to be removed from the streams_ list during the onDeferredRequestProcessing call.
config_helper_.addFilter("{ name: local-reply-during-decode }");
// Process more than 1 deferred request at a time to validate the removal of elements from
// the list does not break reverse iteration.
config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "3");
beginSession();

std::string buffer;
for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
auto request =
Http2Frame::makePostRequest(Http2Frame::makeClientStreamId(i), "a", "/",
{{"response_data_blocks", "0"}, {"no_trailers", "1"}});
absl::StrAppend(&buffer, std::string(request));
}

for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
auto data = Http2Frame::makeDataFrame(Http2Frame::makeClientStreamId(i), "a",
Http2Frame::DataFlags::EndStream);
absl::StrAppend(&buffer, std::string(data));
}

ASSERT_TRUE(tcp_client_->write(buffer, false, false));

// The local-reply-during-decode filter sends 500 status to the client
for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
auto frame = readFrame();
EXPECT_EQ(Http2Frame::Type::Headers, frame.type());
EXPECT_EQ(Http2Frame::ResponseStatus::InternalServerError, frame.responseStatus());
}
tcp_client_->close();
}

TEST_P(Http2FrameIntegrationTest, MultipleRequestsWithTrailers) {
const int kRequestsSentPerIOCycle = 20;
autonomous_upstream_ = true;
Expand Down Expand Up @@ -2079,6 +2116,59 @@ TEST_P(Http2FrameIntegrationTest, MultipleRequestsWithTrailers) {
tcp_client_->close();
}

// Validate the request completion during processing of headers in the deferred requests,
// is ok, when deferred data and trailers are also present.
TEST_P(Http2FrameIntegrationTest, MultipleRequestsWithTrailersDecodeHeadersEndsRequest) {
const int kRequestsSentPerIOCycle = 20;
autonomous_upstream_ = true;
config_helper_.addFilter("{ name: local-reply-during-decode }");
config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "6");
beginSession();

std::string buffer;
// Make every 4th request to be reset by the local-reply-during-decode filter, this will give a
// good distribution of removed requests from the deferred sequence.
for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
auto request = Http2Frame::makePostRequest(Http2Frame::makeClientStreamId(i), "a", "/",
{{"response_data_blocks", "0"},
{"no_trailers", "1"},
{"skip-local-reply", i % 4 ? "true" : "false"}});
absl::StrAppend(&buffer, std::string(request));
}

for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
auto data = Http2Frame::makeDataFrame(Http2Frame::makeClientStreamId(i), "a");
absl::StrAppend(&buffer, std::string(data));
}

for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
auto trailers = Http2Frame::makeEmptyHeadersFrame(
Http2Frame::makeClientStreamId(i),
static_cast<Http2Frame::HeadersFlags>(Http::Http2::orFlags(
Http2Frame::HeadersFlags::EndStream, Http2Frame::HeadersFlags::EndHeaders)));
trailers.appendHeaderWithoutIndexing({"k", "v"});
trailers.adjustPayloadSize();
absl::StrAppend(&buffer, std::string(trailers));
}

ASSERT_TRUE(tcp_client_->write(buffer, false, false));

for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
auto frame = readFrame();
EXPECT_EQ(Http2Frame::Type::Headers, frame.type());
uint32_t stream_id = frame.streamId();
// Client stream indexes are multiples of 2 starting at 1
if ((stream_id / 2) % 4) {
EXPECT_EQ(Http2Frame::ResponseStatus::Ok, frame.responseStatus())
<< " for stream=" << stream_id;
} else {
EXPECT_EQ(Http2Frame::ResponseStatus::InternalServerError, frame.responseStatus())
<< " for stream=" << stream_id;
}
}
tcp_client_->close();
}

TEST_P(Http2FrameIntegrationTest, MultipleHeaderOnlyRequestsFollowedByReset) {
// This number of requests stays below premature reset detection.
const int kRequestsSentPerIOCycle = 20;
Expand Down

0 comments on commit a470c6c

Please sign in to comment.