Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate state for tracking downstream end_stream in filter manager and HCM #35815

Merged
merged 3 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ bug_fixes:
change: |
RBAC will now allow stat prefixes configured in per-route config to override the base config's
stat prefix.
- area: http
change: |
Fixed a bug where an incomplete request (missing body or trailers) may be proxied to the upstream when the limit on
the number of requests per I/O cycle is configured and an HTTP decoder filter that pauses filter chain is present. This behavior
can be reverted by setting the runtime guard ``envoy.reloadable_features.use_filter_manager_state_for_downstream_end_stream``
to false.

removed_config_or_runtime:
# *Normally occurs at the end of the* :ref:`deprecation period <deprecated>`
Expand Down
76 changes: 43 additions & 33 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,9 @@ void ConnectionManagerImpl::doEndStream(ActiveStream& stream, bool check_for_def
// here is when Envoy "ends" the stream by calling recreateStream at which point recreateStream
// explicitly nulls out response_encoder to avoid the downstream being notified of the
// Envoy-internal stream instance being ended.
if (stream.response_encoder_ != nullptr && (!stream.filter_manager_.decoderObservedEndStream() ||
!stream.state_.codec_saw_local_complete_)) {
if (stream.response_encoder_ != nullptr &&
(!stream.filter_manager_.hasLastDownstreamByteReceived() ||
!stream.state_.codec_saw_local_complete_)) {
// Indicate local is complete at this point so that if we reset during a continuation, we don't
// raise further data or trailers.
ENVOY_STREAM_LOG(debug, "doEndStream() resetting stream", stream);
Expand Down Expand Up @@ -294,7 +295,7 @@ void ConnectionManagerImpl::doEndStream(ActiveStream& stream, bool check_for_def
// fully read, as there's no race condition to avoid.
const bool connection_close =
stream.filter_manager_.streamInfo().shouldDrainConnectionUponCompletion();
const bool request_complete = stream.filter_manager_.decoderObservedEndStream();
const bool request_complete = stream.filter_manager_.hasLastDownstreamByteReceived();

// Don't do delay close for HTTP/1.0 or if the request is complete.
checkForDeferredClose(connection_close && (request_complete || http_10_sans_cl));
Expand Down Expand Up @@ -943,32 +944,35 @@ void ConnectionManagerImpl::ActiveStream::onIdleTimeout() {
connection_manager_.stats_.named_.downstream_rq_idle_timeout_.inc();

filter_manager_.streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::StreamIdleTimeout);
sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.decoderObservedEndStream()),
"stream timeout", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().StreamIdleTimeout);
sendLocalReply(
Http::Utility::maybeRequestTimeoutCode(filter_manager_.hasLastDownstreamByteReceived()),
"stream timeout", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().StreamIdleTimeout);
}

void ConnectionManagerImpl::ActiveStream::onRequestTimeout() {
connection_manager_.stats_.named_.downstream_rq_timeout_.inc();
sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.decoderObservedEndStream()),
"request timeout", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().RequestOverallTimeout);
sendLocalReply(
Http::Utility::maybeRequestTimeoutCode(filter_manager_.hasLastDownstreamByteReceived()),
"request timeout", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().RequestOverallTimeout);
}

void ConnectionManagerImpl::ActiveStream::onRequestHeaderTimeout() {
connection_manager_.stats_.named_.downstream_rq_header_timeout_.inc();
sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.decoderObservedEndStream()),
"request header timeout", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().RequestHeaderTimeout);
sendLocalReply(
Http::Utility::maybeRequestTimeoutCode(filter_manager_.hasLastDownstreamByteReceived()),
"request header timeout", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().RequestHeaderTimeout);
}

void ConnectionManagerImpl::ActiveStream::onStreamMaxDurationReached() {
ENVOY_STREAM_LOG(debug, "Stream max duration time reached", *this);
connection_manager_.stats_.named_.downstream_rq_max_duration_reached_.inc();
sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.decoderObservedEndStream()),
"downstream duration timeout", nullptr,
Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded,
StreamInfo::ResponseCodeDetails::get().MaxDurationTimeout);
sendLocalReply(
Http::Utility::maybeRequestTimeoutCode(filter_manager_.hasLastDownstreamByteReceived()),
"downstream duration timeout", nullptr, Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded,
StreamInfo::ResponseCodeDetails::get().MaxDurationTimeout);
}

void ConnectionManagerImpl::ActiveStream::chargeStats(const ResponseHeaderMap& headers) {
Expand Down Expand Up @@ -1086,15 +1090,15 @@ bool ConnectionManagerImpl::ActiveStream::validateHeaders() {
return true;
}

bool ConnectionManagerImpl::ActiveStream::validateTrailers() {
bool ConnectionManagerImpl::ActiveStream::validateTrailers(RequestTrailerMap& trailers) {
if (!header_validator_) {
return true;
}

auto validation_result = header_validator_->validateRequestTrailers(*request_trailers_);
auto validation_result = header_validator_->validateRequestTrailers(trailers);
std::string failure_details(validation_result.details());
if (validation_result.ok()) {
auto transformation_result = header_validator_->transformRequestTrailers(*request_trailers_);
auto transformation_result = header_validator_->transformRequestTrailers(trailers);
if (transformation_result.ok()) {
return true;
}
Expand Down Expand Up @@ -1129,12 +1133,12 @@ bool ConnectionManagerImpl::ActiveStream::validateTrailers() {
return false;
}

void ConnectionManagerImpl::ActiveStream::maybeEndDecode(bool end_stream) {
void ConnectionManagerImpl::ActiveStream::maybeRecordLastByteReceived(bool end_stream) {
// If recreateStream is called, the HCM rewinds state and may send more encodeData calls.
if (end_stream && !filter_manager_.decoderObservedEndStream()) {
if (end_stream && !filter_manager_.hasLastDownstreamByteReceived()) {
filter_manager_.streamInfo().downstreamTiming().onLastDownstreamRxByteReceived(
connection_manager_.dispatcher_->timeSource());
ENVOY_STREAM_LOG(debug, "request end stream", *this);
ENVOY_STREAM_LOG(debug, "request end stream timestamp recorded", *this);
}
}

Expand All @@ -1154,7 +1158,7 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapSharedPt
*headers);
// We only want to record this when reading the headers the first time, not when recreating
// a stream.
if (!filter_manager_.decoderObservedEndStream()) {
if (!filter_manager_.hasLastDownstreamByteReceived()) {
filter_manager_.streamInfo().downstreamTiming().onLastDownstreamHeaderRxByteReceived(
connection_manager_.dispatcher_->timeSource());
}
Expand All @@ -1180,7 +1184,7 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapSharedPt
filter_manager_.streamInfo().protocol(protocol);

// We end the decode here to mark that the downstream stream is complete.
maybeEndDecode(end_stream);
maybeRecordLastByteReceived(end_stream);

if (!validateHeaders()) {
ENVOY_STREAM_LOG(debug, "request headers validation failed\n{}", *this, *request_headers_);
Expand Down Expand Up @@ -1458,7 +1462,7 @@ void ConnectionManagerImpl::ActiveStream::traceRequest() {
void ConnectionManagerImpl::ActiveStream::decodeData(Buffer::Instance& data, bool end_stream) {
ScopeTrackerScopeState scope(this,
connection_manager_.read_callbacks_->connection().dispatcher());
maybeEndDecode(end_stream);
maybeRecordLastByteReceived(end_stream);
filter_manager_.streamInfo().addBytesReceived(data.length());
if (!state_.deferred_to_next_io_iteration_) {
filter_manager_.decodeData(data, end_stream);
Expand All @@ -1478,14 +1482,19 @@ void ConnectionManagerImpl::ActiveStream::decodeTrailers(RequestTrailerMapPtr&&
resetIdleTimer();

ASSERT(!request_trailers_);
request_trailers_ = std::move(trailers);
if (!validateTrailers()) {
ENVOY_STREAM_LOG(debug, "request trailers validation failed:\n{}", *this, *request_trailers_);
if (!validateTrailers(*trailers)) {
ENVOY_STREAM_LOG(debug, "request trailers validation failed:\n{}", *this, *trailers);
return;
}
maybeEndDecode(true);
maybeRecordLastByteReceived(true);
if (!state_.deferred_to_next_io_iteration_) {
request_trailers_ = std::move(trailers);
filter_manager_.decodeTrailers(*request_trailers_);
} else {
// Save trailers in a different variable since `request_trailers_` is available to the filter
// manager via `requestTrailers()` callback and makes filter manager see trailers prematurely
// when deferred request is processed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice comment! Thanks for adding this.

deferred_request_trailers_ = std::move(trailers);
}
}

Expand Down Expand Up @@ -1776,7 +1785,7 @@ void ConnectionManagerImpl::ActiveStream::encodeHeaders(ResponseHeaderMap& heade
// If we are destroying a stream before remote is complete and the connection does not support
// multiplexing, we should disconnect since we don't want to wait around for the request to
// finish.
if (!filter_manager_.decoderObservedEndStream()) {
if (!filter_manager_.hasLastDownstreamByteReceived()) {
if (connection_manager_.codec_->protocol() < Protocol::Http2) {
connection_manager_.drain_state_ = DrainState::Closing;
}
Expand Down Expand Up @@ -2221,7 +2230,7 @@ bool ConnectionManagerImpl::ActiveStream::onDeferredRequestProcessing() {
}
state_.deferred_to_next_io_iteration_ = false;
bool end_stream = state_.deferred_end_stream_ && deferred_data_ == nullptr &&
request_trailers_ == nullptr && deferred_metadata_.empty();
deferred_request_trailers_ == nullptr && deferred_metadata_.empty();
filter_manager_.decodeHeaders(*request_headers_, end_stream);
if (end_stream) {
return true;
Expand All @@ -2235,10 +2244,11 @@ bool ConnectionManagerImpl::ActiveStream::onDeferredRequestProcessing() {
// Filter manager will return early from decodeData and decodeTrailers if
// request has completed.
if (deferred_data_ != nullptr) {
end_stream = state_.deferred_end_stream_ && request_trailers_ == nullptr;
end_stream = state_.deferred_end_stream_ && deferred_request_trailers_ == nullptr;
filter_manager_.decodeData(*deferred_data_, end_stream);
}
if (request_trailers_ != nullptr) {
if (deferred_request_trailers_ != nullptr) {
request_trailers_ = std::move(deferred_request_trailers_);
filter_manager_.decodeTrailers(*request_trailers_);
}
return true;
Expand Down
7 changes: 4 additions & 3 deletions source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
void decodeData(Buffer::Instance& data, bool end_stream) override;
void decodeMetadata(MetadataMapPtr&&) override;

// Mark that the last downstream byte is received, and the downstream stream is complete.
void maybeEndDecode(bool end_stream);
// Record the timestamp of last downstream byte is received.
void maybeRecordLastByteReceived(bool end_stream);

// Http::RequestDecoder
void decodeHeaders(RequestHeaderMapSharedPtr&& headers, bool end_stream) override;
Expand Down Expand Up @@ -410,7 +410,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
// harmonize this behavior with H/1.
// 3. If the `stream_error_on_invalid_http_message` is set to `false` (it is by default) in the
// HTTP connection manager configuration, then the entire connection is closed.
bool validateTrailers();
bool validateTrailers(RequestTrailerMap& trailers);

std::weak_ptr<bool> stillAlive() { return {still_alive_}; }

Expand Down Expand Up @@ -508,6 +508,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
std::shared_ptr<bool> still_alive_ = std::make_shared<bool>(true);
std::unique_ptr<Buffer::OwnedImpl> deferred_data_;
std::queue<MetadataMapPtr> deferred_metadata_;
RequestTrailerMapPtr deferred_request_trailers_;
};

using ActiveStreamPtr = std::unique_ptr<ActiveStream>;
Expand Down
26 changes: 22 additions & 4 deletions source/common/http/filter_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -1121,7 +1121,9 @@ class DownstreamFilterManager : public FilterManager {
std::move(parent_filter_state)),
local_reply_(local_reply),
downstream_filter_load_shed_point_(overload_manager.getLoadShedPoint(
Server::LoadShedPointName::get().HttpDownstreamFilterCheck)) {
Server::LoadShedPointName::get().HttpDownstreamFilterCheck)),
use_filter_manager_state_for_downstream_end_stream_(Runtime::runtimeFeatureEnabled(
"envoy.reloadable_features.use_filter_manager_state_for_downstream_end_stream")) {
ENVOY_LOG_ONCE_IF(
trace, downstream_filter_load_shed_point_ == nullptr,
"LoadShedPoint envoy.load_shed_points.http_downstream_filter_check is not found. "
Expand Down Expand Up @@ -1153,11 +1155,24 @@ class DownstreamFilterManager : public FilterManager {
absl::string_view details) override;

/**
* Whether remote processing has been marked as complete.
* For the DownstreamFilterManager rely on external state, to handle the case
* of internal redirects.
* Whether downstream has observed end_stream.
*/
bool decoderObservedEndStream() const override {
// Set by the envoy.reloadable_features.use_filter_manager_state_for_downstream_end_stream
// runtime flag.
if (use_filter_manager_state_for_downstream_end_stream_) {
return state_.observed_decode_end_stream_;
}

return hasLastDownstreamByteReceived();
}

/**
* Return true if the timestamp of the downstream end_stream was recorded.
* For the HCM to handle the case of internal redirects, timeout error replies
* and stream resets on premature upstream response.
*/
bool hasLastDownstreamByteReceived() const {
return streamInfo().downstreamTiming() &&
streamInfo().downstreamTiming()->lastDownstreamRxByteReceived().has_value();
}
Expand Down Expand Up @@ -1206,6 +1221,9 @@ class DownstreamFilterManager : public FilterManager {
const LocalReply::LocalReply& local_reply_;
Utility::PreparedLocalReplyPtr prepared_local_reply_{nullptr};
Server::LoadShedPoint* downstream_filter_load_shed_point_{nullptr};
// Set by the envoy.reloadable_features.use_filter_manager_state_for_downstream_end_stream runtime
// flag.
const bool use_filter_manager_state_for_downstream_end_stream_{};
};

} // namespace Http
Expand Down
1 change: 1 addition & 0 deletions source/common/runtime/runtime_features.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ RUNTIME_GUARD(envoy_reloadable_features_udp_socket_apply_aggregated_read_limit);
RUNTIME_GUARD(envoy_reloadable_features_uhv_allow_malformed_url_encoding);
RUNTIME_GUARD(envoy_reloadable_features_upstream_remote_address_use_connection);
RUNTIME_GUARD(envoy_reloadable_features_use_config_in_happy_eyeballs);
RUNTIME_GUARD(envoy_reloadable_features_use_filter_manager_state_for_downstream_end_stream);
RUNTIME_GUARD(envoy_reloadable_features_use_http3_header_normalisation);
RUNTIME_GUARD(envoy_reloadable_features_use_route_host_mutation_for_auto_sni_san);
RUNTIME_GUARD(envoy_reloadable_features_use_typed_metadata_in_proxy_protocol_listener);
Expand Down
1 change: 1 addition & 0 deletions test/integration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,7 @@ envoy_cc_test(
"//test/integration/filters:response_metadata_filter_config_lib",
"//test/integration/filters:set_response_code_filter_config_proto_cc_proto",
"//test/integration/filters:set_response_code_filter_lib",
"//test/integration/filters:stop_in_headers_continue_in_data_filter_lib",
"//test/integration/filters:stop_iteration_and_continue",
"//test/mocks/http:http_mocks",
"//test/mocks/upstream:retry_priority_factory_mocks",
Expand Down
14 changes: 14 additions & 0 deletions test/integration/filters/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -962,3 +962,17 @@ envoy_cc_test_library(
"//source/extensions/filters/network/common:factory_base_lib",
],
)

envoy_cc_test_library(
name = "stop_in_headers_continue_in_data_filter_lib",
srcs = ["stop_in_headers_continue_in_data_filter.cc"],
deps = [
":common_lib",
"//envoy/http:filter_interface",
"//envoy/registry",
"//envoy/server:filter_config_interface",
"//source/common/protobuf",
"//source/extensions/filters/http/common:pass_through_filter_lib",
"//test/extensions/filters/http/common:empty_http_filter_config_lib",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#include <string>

#include "envoy/http/filter.h"
#include "envoy/registry/registry.h"
#include "envoy/server/filter_config.h"

#include "source/extensions/filters/http/common/pass_through_filter.h"

#include "test/extensions/filters/http/common/empty_http_filter_config.h"
#include "test/integration/filters/common.h"

namespace Envoy {

// A test filter that stops iteration in decodeHeaders then continues in decodeBody.
class StopInHeadersContinueInBodyFilter : public Http::PassThroughFilter {
public:
constexpr static char name[] = "stop-in-headers-continue-in-body-filter";

Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap&, bool /* end_stream */) override {
return Http::FilterHeadersStatus::StopIteration;
}

Http::FilterDataStatus decodeData(Buffer::Instance&, bool) override {
return Http::FilterDataStatus::Continue;
}
};

static Registry::RegisterFactory<SimpleFilterConfig<StopInHeadersContinueInBodyFilter>,
Server::Configuration::NamedHttpFilterConfigFactory>
register_;
static Registry::RegisterFactory<SimpleFilterConfig<StopInHeadersContinueInBodyFilter>,
Server::Configuration::UpstreamHttpFilterConfigFactory>
register_upstream_;
} // namespace Envoy
Loading