Skip to content

Commit

Permalink
Refactor state tracking variables in filter manager (envoyproxy#35528)
Browse files Browse the repository at this point in the history
Summary of changes:

Fix dual use of the local_complete_ flag. Presently it used for tracking both:
end_stream value in iteration of encoder filter chain.
stopping decoder filter chain when half close semantics are disabled.
The local_complete_ is renamed to observed_encode_end_stream_ and is now used for tracking end_stream value in encoder filter chain iteration only.

The decode filter chain is stopped by using the existing flag decoder_filter_chain_aborted_, which up till now was only used to stop filter chain when reset was received.

Rename remote_decode_complete_ flag to observed_decode_end_stream_ to match its purpose.
Rename remoteDecodeComplete() to decoderObservedEndStream() to better match its meaning.
Rename the complete() method to observedEndStream() to better reflect its use.
Add stopDecoderFilterChain() (which returns the decoder_filter_chain_aborted_) and use it to check if the decoder filter chain should be stopped (instead of using the local_complete_ flag).

---------

Signed-off-by: Yan Avlasov <[email protected]>
  • Loading branch information
yanavlasov authored Aug 2, 2024
1 parent 9ea5239 commit 37bdbd1
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 52 deletions.
18 changes: 9 additions & 9 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ void ConnectionManagerImpl::doEndStream(ActiveStream& stream, bool check_for_def
// here is when Envoy "ends" the stream by calling recreateStream at which point recreateStream
// explicitly nulls out response_encoder to avoid the downstream being notified of the
// Envoy-internal stream instance being ended.
if (stream.response_encoder_ != nullptr && (!stream.filter_manager_.remoteDecodeComplete() ||
if (stream.response_encoder_ != nullptr && (!stream.filter_manager_.decoderObservedEndStream() ||
!stream.state_.codec_saw_local_complete_)) {
// Indicate local is complete at this point so that if we reset during a continuation, we don't
// raise further data or trailers.
Expand Down Expand Up @@ -290,7 +290,7 @@ void ConnectionManagerImpl::doEndStream(ActiveStream& stream, bool check_for_def
// fully read, as there's no race condition to avoid.
const bool connection_close =
stream.filter_manager_.streamInfo().shouldDrainConnectionUponCompletion();
const bool request_complete = stream.filter_manager_.remoteDecodeComplete();
const bool request_complete = stream.filter_manager_.decoderObservedEndStream();

// Don't do delay close for HTTP/1.0 or if the request is complete.
checkForDeferredClose(connection_close && (request_complete || http_10_sans_cl));
Expand Down Expand Up @@ -925,29 +925,29 @@ void ConnectionManagerImpl::ActiveStream::onIdleTimeout() {
connection_manager_.stats_.named_.downstream_rq_idle_timeout_.inc();

filter_manager_.streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::StreamIdleTimeout);
sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.remoteDecodeComplete()),
sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.decoderObservedEndStream()),
"stream timeout", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().StreamIdleTimeout);
}

void ConnectionManagerImpl::ActiveStream::onRequestTimeout() {
connection_manager_.stats_.named_.downstream_rq_timeout_.inc();
sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.remoteDecodeComplete()),
sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.decoderObservedEndStream()),
"request timeout", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().RequestOverallTimeout);
}

void ConnectionManagerImpl::ActiveStream::onRequestHeaderTimeout() {
connection_manager_.stats_.named_.downstream_rq_header_timeout_.inc();
sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.remoteDecodeComplete()),
sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.decoderObservedEndStream()),
"request header timeout", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().RequestHeaderTimeout);
}

void ConnectionManagerImpl::ActiveStream::onStreamMaxDurationReached() {
ENVOY_STREAM_LOG(debug, "Stream max duration time reached", *this);
connection_manager_.stats_.named_.downstream_rq_max_duration_reached_.inc();
sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.remoteDecodeComplete()),
sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.decoderObservedEndStream()),
"downstream duration timeout", nullptr,
Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded,
StreamInfo::ResponseCodeDetails::get().MaxDurationTimeout);
Expand Down Expand Up @@ -1113,7 +1113,7 @@ bool ConnectionManagerImpl::ActiveStream::validateTrailers() {

void ConnectionManagerImpl::ActiveStream::maybeEndDecode(bool end_stream) {
// If recreateStream is called, the HCM rewinds state and may send more encodeData calls.
if (end_stream && !filter_manager_.remoteDecodeComplete()) {
if (end_stream && !filter_manager_.decoderObservedEndStream()) {
filter_manager_.streamInfo().downstreamTiming().onLastDownstreamRxByteReceived(
connection_manager_.dispatcher_->timeSource());
ENVOY_STREAM_LOG(debug, "request end stream", *this);
Expand All @@ -1136,7 +1136,7 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapSharedPt
*headers);
// We only want to record this when reading the headers the first time, not when recreating
// a stream.
if (!filter_manager_.remoteDecodeComplete()) {
if (!filter_manager_.decoderObservedEndStream()) {
filter_manager_.streamInfo().downstreamTiming().onLastDownstreamHeaderRxByteReceived(
connection_manager_.dispatcher_->timeSource());
}
Expand Down Expand Up @@ -1764,7 +1764,7 @@ void ConnectionManagerImpl::ActiveStream::encodeHeaders(ResponseHeaderMap& heade
// If we are destroying a stream before remote is complete and the connection does not support
// multiplexing, we should disconnect since we don't want to wait around for the request to
// finish.
if (!filter_manager_.remoteDecodeComplete()) {
if (!filter_manager_.decoderObservedEndStream()) {
if (connection_manager_.codec_->protocol() < Protocol::Http2) {
connection_manager_.drain_state_ = DrainState::Closing;
}
Expand Down
53 changes: 31 additions & 22 deletions source/common/http/filter_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ void ActiveStreamFilterBase::commonContinue() {
// future.
if (!headers_continued_) {
headers_continued_ = true;
doHeaders(complete() && !bufferedData() && !hasTrailers());
doHeaders(observedEndStream() && !bufferedData() && !hasTrailers());
}

doMetadata();
Expand All @@ -102,7 +102,7 @@ void ActiveStreamFilterBase::commonContinue() {
// on doData() to do so.
const bool had_trailers_before_data = hasTrailers();
if (bufferedData()) {
doData(complete() && !had_trailers_before_data);
doData(observedEndStream() && !had_trailers_before_data);
}

if (had_trailers_before_data) {
Expand Down Expand Up @@ -197,7 +197,7 @@ bool ActiveStreamFilterBase::commonHandleAfterDataCallback(FilterDataStatus stat
status == FilterDataStatus::StopIterationAndWatermark) {
buffer_was_streaming = status == FilterDataStatus::StopIterationAndWatermark;
commonHandleBufferData(provided_data);
} else if (complete() && !hasTrailers() && !bufferedData() &&
} else if (observedEndStream() && !hasTrailers() && !bufferedData() &&
// If the stream is destroyed, no need to handle the data buffer or trailers.
// This can occur if the filter calls sendLocalReply.
!parent_.state_.destroyed_) {
Expand Down Expand Up @@ -347,13 +347,13 @@ bool ActiveStreamDecoderFilter::canContinue() {
// continue to further filters. A concrete example of this is a filter buffering data, the
// last data frame comes in and the filter continues, but the final buffering takes the stream
// over the high watermark such that a 413 is returned.
return !parent_.state_.local_complete_;
return !parent_.stopDecoderFilterChain();
}

bool ActiveStreamEncoderFilter::canContinue() {
// As with ActiveStreamDecoderFilter::canContinue() make sure we do not
// continue if a local reply has been sent.
return !parent_.state_.remote_encode_complete_;
return !parent_.state_.encoder_filter_chain_complete_;
}

Buffer::InstancePtr ActiveStreamDecoderFilter::createBuffer() {
Expand All @@ -369,7 +369,7 @@ Buffer::InstancePtr& ActiveStreamDecoderFilter::bufferedData() {
return parent_.buffered_request_data_;
}

bool ActiveStreamDecoderFilter::complete() { return parent_.remoteDecodeComplete(); }
bool ActiveStreamDecoderFilter::observedEndStream() { return parent_.decoderObservedEndStream(); }

void ActiveStreamDecoderFilter::doHeaders(bool end_stream) {
parent_.decodeHeaders(this, *parent_.filter_manager_callbacks_.requestHeaders(), end_stream);
Expand Down Expand Up @@ -557,7 +557,7 @@ void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHead
(*entry)->processed_headers_ = true;

const auto continue_iteration = (*entry)->commonHandleAfterHeadersCallback(status, end_stream);
ENVOY_BUG(!continue_iteration || !state_.local_complete_,
ENVOY_BUG(!continue_iteration || !stopDecoderFilterChain(),
fmt::format(
"filter={} did not return StopAll or StopIteration after sending a local reply.",
(*entry)->filter_context_.config_name));
Expand All @@ -568,7 +568,7 @@ void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHead
}

// Skip processing metadata after sending local reply
if (state_.local_complete_ && std::next(entry) != decoder_filters_.end()) {
if (stopDecoderFilterChain() && std::next(entry) != decoder_filters_.end()) {
maybeContinueDecoding(continue_data_entry);
return;
}
Expand Down Expand Up @@ -616,7 +616,7 @@ void FilterManager::decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instan

// If a response is complete or a reset has been sent, filters do not care about further body
// data. Just drop it.
if (state_.local_complete_) {
if (stopDecoderFilterChain()) {
return;
}

Expand Down Expand Up @@ -755,8 +755,9 @@ void FilterManager::addDecodedData(ActiveStreamDecoderFilter& filter, Buffer::In
MetadataMapVector& FilterManager::addDecodedMetadata() { return *getRequestMetadataMapVector(); }

void FilterManager::decodeTrailers(ActiveStreamDecoderFilter* filter, RequestTrailerMap& trailers) {
// See decodeData() above for why we check local_complete_ here.
if (state_.local_complete_) {
// If a response is complete or a reset has been sent, filters do not care about further body
// data. Just drop it.
if (stopDecoderFilterChain()) {
return;
}

Expand Down Expand Up @@ -858,13 +859,19 @@ FilterManager::commonEncodePrefix(ActiveStreamEncoderFilter* filter, bool end_st
ENVOY_STREAM_LOG(trace, "commonEncodePrefix end_stream: {}, isHalfCloseEnabled: {}", *this,
end_stream, filter_manager_callbacks_.isHalfCloseEnabled());
if (filter == nullptr) {
// half close is enabled in case tcp proxying is done with http1 encoder. In this case, we
// should not set the local_complete_ flag to true when end_stream is true.
// setting local_complete_ to true will cause any data sent in the upstream direction to be
// dropped.
if (end_stream && !filter_manager_callbacks_.isHalfCloseEnabled()) {
ASSERT(!state_.local_complete_);
state_.local_complete_ = true;
if (end_stream) {
ASSERT(!state_.observed_encode_end_stream_);
state_.observed_encode_end_stream_ = true;

// When half close semantics are disabled, receiving end stream from the upstream causes
// decoder filter to stop, as neither filters nor upstream is interested in downstream data.
// half close is enabled in case tcp proxying is done with http1 encoder. In this case, we
// should not stop decoder filter chain when end_stream is true, as it will cause any data
// sent in the upstream direction to be
// dropped.
if (!filter_manager_callbacks_.isHalfCloseEnabled()) {
state_.decoder_filter_chain_aborted_ = true;
}
}
return encoder_filters_.begin();
}
Expand Down Expand Up @@ -1439,8 +1446,8 @@ void FilterManager::encodeTrailers(ActiveStreamEncoderFilter* filter,

void FilterManager::maybeEndEncode(bool end_stream) {
if (end_stream) {
ASSERT(!state_.remote_encode_complete_);
state_.remote_encode_complete_ = true;
ASSERT(!state_.encoder_filter_chain_complete_);
state_.encoder_filter_chain_complete_ = true;
filter_manager_callbacks_.endStream();
}
}
Expand Down Expand Up @@ -1579,7 +1586,7 @@ bool ActiveStreamDecoderFilter::recreateStream(const ResponseHeaderMap* headers)
// Because the filter's and the HCM view of if the stream has a body and if
// the stream is complete may differ, re-check bytesReceived() to make sure
// there was no body from the HCM's point of view.
if (!complete()) {
if (!observedEndStream()) {
return false;
}

Expand Down Expand Up @@ -1624,7 +1631,9 @@ Buffer::InstancePtr ActiveStreamEncoderFilter::createBuffer() {
Buffer::InstancePtr& ActiveStreamEncoderFilter::bufferedData() {
return parent_.buffered_response_data_;
}
bool ActiveStreamEncoderFilter::complete() { return parent_.state_.local_complete_; }
bool ActiveStreamEncoderFilter::observedEndStream() {
return parent_.state_.observed_encode_end_stream_;
}
bool ActiveStreamEncoderFilter::has1xxHeaders() {
return parent_.state_.has_1xx_headers_ && !continued_1xx_headers_;
}
Expand Down
53 changes: 34 additions & 19 deletions source/common/http/filter_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ struct ActiveStreamFilterBase : public virtual StreamFilterCallbacks,
virtual bool canContinue() PURE;
virtual Buffer::InstancePtr createBuffer() PURE;
virtual Buffer::InstancePtr& bufferedData() PURE;
virtual bool complete() PURE;
virtual bool observedEndStream() PURE;
virtual bool has1xxHeaders() PURE;
virtual void do1xxHeaders() PURE;
virtual void doHeaders(bool end_stream) PURE;
Expand Down Expand Up @@ -212,7 +212,7 @@ struct ActiveStreamDecoderFilter : public ActiveStreamFilterBase,
bool canContinue() override;
Buffer::InstancePtr createBuffer() override;
Buffer::InstancePtr& bufferedData() override;
bool complete() override;
bool observedEndStream() override;
bool has1xxHeaders() override { return false; }
void do1xxHeaders() override { IS_ENVOY_BUG("unexpected 1xx headers"); }
void doHeaders(bool end_stream) override;
Expand Down Expand Up @@ -304,7 +304,7 @@ struct ActiveStreamEncoderFilter : public ActiveStreamFilterBase,
bool canContinue() override;
Buffer::InstancePtr createBuffer() override;
Buffer::InstancePtr& bufferedData() override;
bool complete() override;
bool observedEndStream() override;
bool has1xxHeaders() override;
void do1xxHeaders() override;
void doHeaders(bool end_stream) override;
Expand Down Expand Up @@ -745,7 +745,7 @@ class FilterManager : public ScopeTrackedObject,
* @param end_stream whether the request is header only.
*/
void decodeHeaders(RequestHeaderMap& headers, bool end_stream) {
state_.remote_decode_complete_ = end_stream;
state_.observed_decode_end_stream_ = end_stream;
decodeHeaders(nullptr, headers, end_stream);
}

Expand All @@ -755,7 +755,7 @@ class FilterManager : public ScopeTrackedObject,
* @param end_stream whether this data is the end of the request.
*/
void decodeData(Buffer::Instance& data, bool end_stream) {
state_.remote_decode_complete_ = end_stream;
state_.observed_decode_end_stream_ = end_stream;
decodeData(nullptr, data, end_stream, FilterIterationStartState::CanStartFromCurrent);
}

Expand All @@ -764,7 +764,7 @@ class FilterManager : public ScopeTrackedObject,
* @param trailers the trailers to decode.
*/
void decodeTrailers(RequestTrailerMap& trailers) {
state_.remote_decode_complete_ = true;
state_.observed_decode_end_stream_ = true;
decodeTrailers(nullptr, trailers);
}

Expand Down Expand Up @@ -819,8 +819,12 @@ class FilterManager : public ScopeTrackedObject,

/**
* Marks local processing as complete.
* TODO(yanvlasov): deprecate and decommission this function.
*/
void setLocalComplete() { state_.local_complete_ = true; }
void setLocalComplete() {
state_.observed_encode_end_stream_ = true;
state_.decoder_filter_chain_aborted_ = true;
}

/**
* Whether the filters have been destroyed.
Expand All @@ -830,7 +834,7 @@ class FilterManager : public ScopeTrackedObject,
/**
* Whether remote processing has been marked as complete.
*/
virtual bool remoteDecodeComplete() const { return state_.remote_decode_complete_; }
virtual bool decoderObservedEndStream() const { return state_.observed_decode_end_stream_; }

/**
* Instructs the FilterManager to not create a filter chain. This makes it possible to issue
Expand Down Expand Up @@ -864,18 +868,27 @@ class FilterManager : public ScopeTrackedObject,
protected:
struct State {
State()
: remote_decode_complete_(false), remote_encode_complete_(false), local_complete_(false),
has_1xx_headers_(false), created_filter_chain_(false), is_head_request_(false),
is_grpc_request_(false), non_100_response_headers_encoded_(false),
under_on_local_reply_(false), decoder_filter_chain_aborted_(false),
encoder_filter_chain_aborted_(false), saw_downstream_reset_(false) {}
: encoder_filter_chain_complete_(false), observed_decode_end_stream_(false),
observed_encode_end_stream_(false), has_1xx_headers_(false), created_filter_chain_(false),
is_head_request_(false), is_grpc_request_(false),
non_100_response_headers_encoded_(false), under_on_local_reply_(false),
decoder_filter_chain_aborted_(false), encoder_filter_chain_aborted_(false),
saw_downstream_reset_(false) {}
uint32_t filter_call_state_{0};

bool remote_decode_complete_ : 1;
bool remote_encode_complete_ : 1;
bool local_complete_ : 1; // This indicates that local is complete prior to filter processing.
// A filter can still stop the stream from being complete as seen
// by the codec.
// Set after encoder filter chain has completed iteration. Prevents further calls to encoder
// filters.
bool encoder_filter_chain_complete_ : 1;

// Set `true` when the filter manager observes end stream on the decoder path (from downstream
// client) before iteration of the decoder filter chain begins. This flag is used for setting
// end_stream value when resuming decoder filter chain iteration.
bool observed_decode_end_stream_ : 1;
// Set `true` when the filter manager observes end stream on the encoder path (from upstream
// server or Envoy's local reply) before iteration of the encoder filter chain begins. This flag
// is used for setting end_stream value when resuming encoder filter chain iteration.
bool observed_encode_end_stream_ : 1;

// By default, we will assume there are no 1xx. If encode1xxHeaders
// is ever called, this is set to true so commonContinue resumes processing the 1xx.
bool has_1xx_headers_ : 1;
Expand Down Expand Up @@ -1018,6 +1031,8 @@ class FilterManager : public ScopeTrackedObject,
return request_metadata_map_vector_.get();
}

bool stopDecoderFilterChain() { return state_.decoder_filter_chain_aborted_; }

FilterManagerCallbacks& filter_manager_callbacks_;
Event::Dispatcher& dispatcher_;
// This is unset if there is no downstream connection, e.g. for health check or
Expand Down Expand Up @@ -1142,7 +1157,7 @@ class DownstreamFilterManager : public FilterManager {
* For the DownstreamFilterManager rely on external state, to handle the case
* of internal redirects.
*/
bool remoteDecodeComplete() const override {
bool decoderObservedEndStream() const override {
return streamInfo().downstreamTiming() &&
streamInfo().downstreamTiming()->lastDownstreamRxByteReceived().has_value();
}
Expand Down
4 changes: 2 additions & 2 deletions source/common/router/upstream_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ class UpstreamFilterManager : public Http::FilterManager {
absl::string_view details) override {
state().decoder_filter_chain_aborted_ = true;
state().encoder_filter_chain_aborted_ = true;
state().remote_encode_complete_ = true;
state().local_complete_ = true;
state().encoder_filter_chain_complete_ = true;
state().observed_encode_end_stream_ = true;
// TODO(alyssawilk) this should be done through the router to play well with hedging.
upstream_request_.parent_.callbacks()->sendLocalReply(code, body, modify_headers, grpc_status,
details);
Expand Down

0 comments on commit 37bdbd1

Please sign in to comment.