-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow multiplexed upstream servers to half close the stream before the downstream #34461
Changes from 25 commits
7263c53
fd17760
723ed65
1d8172c
3ba7cc4
6133efc
fcf2a76
f24bc34
0e881dc
b7b746f
af60f0b
01ae68b
1f269a4
1b961df
8ae88a0
d8aabba
2166fcb
e2deca2
9e9428f
71ce1f0
68c807e
9c308ba
2d8cb15
e1c00d0
018a3e4
9a692f3
bd30f81
b9948ab
e224738
e4ae6b4
d5320da
b804cb5
4b305d9
9155e49
49b6162
c02ac47
d965ff6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -283,7 +283,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>, | |
OptRef<const Tracing::Config> tracingConfig() const override; | ||
const ScopeTrackedObject& scope() override; | ||
OptRef<DownstreamStreamFilterCallbacks> downstreamCallbacks() override { return *this; } | ||
bool isHalfCloseEnabled() override { return false; } | ||
bool isHalfCloseEnabled() override { return connection_manager_.allow_upstream_half_close_; } | ||
|
||
// DownstreamStreamFilterCallbacks | ||
void setRoute(Router::RouteConstSharedPtr route) override; | ||
|
@@ -640,6 +640,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>, | |
uint32_t requests_during_dispatch_count_{0}; | ||
const uint32_t max_requests_during_dispatch_{UINT32_MAX}; | ||
Event::SchedulableCallbackPtr deferred_request_processing_callback_; | ||
const bool allow_upstream_half_close_{}; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we have half close behavior commented anywhere? comment link to it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have added docs and more comments. |
||
}; | ||
|
||
} // namespace Http | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -466,19 +466,35 @@ void ActiveStreamDecoderFilter::encode1xxHeaders(ResponseHeaderMapPtr&& headers) | |
} | ||
} | ||
|
||
void ActiveStreamDecoderFilter::maybeMarkDecoderFilterTerminal(bool encoded_end_stream) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so I think this is fine, as discussed, but if we don't really want to support half close for anything but router (and maybe codec filter upstream) responses with half-close, do we need to support other filters being terminal? If any other filter generates the response we could treat it as full close, no? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed code so that only terminal filter can work with independent half-close. One runtime key is removed the FM will agnostic to the half-close behavior - it will always assume independent half-close. And semantics of closing incomplete streams will move into codecs. |
||
// If this filter encoded end_stream and the decoder filter chain has not yet been finished | ||
// then make this filter terminal. Decoding will not go past this filter. | ||
filter_encoded_end_stream_ = encoded_end_stream; | ||
|
||
// If the filter that encoded end_stream has also already observed request (downstream) | ||
// end_stream, but the decoder filter chain has not been completed, then decoding is aborted as | ||
// there is no need to iterate over remaining decoder filters any more. | ||
if (encoded_end_stream && end_stream_ && !parent_.state_.decoder_filter_chain_complete_) { | ||
parent_.state_.decoder_filter_chain_aborted_ = true; | ||
} | ||
} | ||
|
||
void ActiveStreamDecoderFilter::encodeHeaders(ResponseHeaderMapPtr&& headers, bool end_stream, | ||
absl::string_view details) { | ||
maybeMarkDecoderFilterTerminal(end_stream); | ||
parent_.streamInfo().setResponseCodeDetails(details); | ||
parent_.filter_manager_callbacks_.setResponseHeaders(std::move(headers)); | ||
parent_.encodeHeaders(nullptr, *parent_.filter_manager_callbacks_.responseHeaders(), end_stream); | ||
} | ||
|
||
void ActiveStreamDecoderFilter::encodeData(Buffer::Instance& data, bool end_stream) { | ||
maybeMarkDecoderFilterTerminal(end_stream); | ||
parent_.encodeData(nullptr, data, end_stream, | ||
FilterManager::FilterIterationStartState::CanStartFromCurrent); | ||
} | ||
|
||
void ActiveStreamDecoderFilter::encodeTrailers(ResponseTrailerMapPtr&& trailers) { | ||
maybeMarkDecoderFilterTerminal(true); | ||
parent_.filter_manager_callbacks_.setResponseTrailers(std::move(trailers)); | ||
parent_.encodeTrailers(nullptr, *parent_.filter_manager_callbacks_.responseTrailers()); | ||
} | ||
|
@@ -526,6 +542,8 @@ void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHead | |
std::list<ActiveStreamDecoderFilterPtr>::iterator entry = | ||
commonDecodePrefix(filter, FilterIterationStartState::AlwaysStartFromNext); | ||
std::list<ActiveStreamDecoderFilterPtr>::iterator continue_data_entry = decoder_filters_.end(); | ||
// Terminal filter is either the last one or filter that encoded end_stream. | ||
bool terminal_filter_decoded_end_stream = false; | ||
|
||
for (; entry != decoder_filters_.end(); entry++) { | ||
ASSERT(!(state_.filter_call_state_ & FilterCallState::DecodeHeaders)); | ||
|
@@ -599,13 +617,17 @@ void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHead | |
if (end_stream && buffered_request_data_ && continue_data_entry == decoder_filters_.end()) { | ||
continue_data_entry = entry; | ||
} | ||
terminal_filter_decoded_end_stream = | ||
(end_stream && continue_data_entry == decoder_filters_.end()) && | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you comment what this check does? it's unclear at a glance why we need the continue_data_entry here but not below. If it's for the body added case can't we have decodeData handle it and put this check in an else block for "no body added"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
(std::next(entry) == decoder_filters_.end() || (*entry)->filter_encoded_end_stream_); | ||
} | ||
|
||
maybeContinueDecoding(continue_data_entry); | ||
|
||
if (end_stream) { | ||
disarmRequestTimeout(); | ||
} | ||
maybeEndDecode(terminal_filter_decoded_end_stream); | ||
} | ||
|
||
void FilterManager::decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instance& data, | ||
|
@@ -625,6 +647,7 @@ void FilterManager::decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instan | |
// Filter iteration may start at the current filter. | ||
std::list<ActiveStreamDecoderFilterPtr>::iterator entry = | ||
commonDecodePrefix(filter, filter_iteration_start_state); | ||
bool terminal_filter_decoded_end_stream = false; | ||
KBaichoo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
for (; entry != decoder_filters_.end(); entry++) { | ||
// If the filter pointed by entry has stopped for all frame types, return now. | ||
|
@@ -702,6 +725,10 @@ void FilterManager::decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instan | |
trailers_added_entry = entry; | ||
} | ||
|
||
terminal_filter_decoded_end_stream = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's definitely comment each of these blocks There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
end_stream && | ||
(std::next(entry) == decoder_filters_.end() || (*entry)->filter_encoded_end_stream_); | ||
|
||
if (!(*entry)->commonHandleAfterDataCallback(status, data, state_.decoder_filters_streaming_) && | ||
std::next(entry) != decoder_filters_.end()) { | ||
// Stop iteration IFF this is not the last filter. If it is the last filter, continue with | ||
|
@@ -720,6 +747,7 @@ void FilterManager::decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instan | |
if (end_stream) { | ||
disarmRequestTimeout(); | ||
} | ||
maybeEndDecode(terminal_filter_decoded_end_stream); | ||
} | ||
|
||
RequestTrailerMap& FilterManager::addDecodedTrailers() { | ||
|
@@ -764,6 +792,7 @@ void FilterManager::decodeTrailers(ActiveStreamDecoderFilter* filter, RequestTra | |
// Filter iteration may start at the current filter. | ||
std::list<ActiveStreamDecoderFilterPtr>::iterator entry = | ||
commonDecodePrefix(filter, FilterIterationStartState::CanStartFromCurrent); | ||
bool terminal_filter_decoded_end_stream = false; | ||
|
||
for (; entry != decoder_filters_.end(); entry++) { | ||
// If the filter pointed by entry has stopped for all frame type, return now. | ||
|
@@ -787,12 +816,20 @@ void FilterManager::decodeTrailers(ActiveStreamDecoderFilter* filter, RequestTra | |
} | ||
|
||
processNewlyAddedMetadata(); | ||
terminal_filter_decoded_end_stream = | ||
std::next(entry) == decoder_filters_.end() || (*entry)->filter_encoded_end_stream_; | ||
|
||
if (!(*entry)->commonHandleAfterTrailersCallback(status)) { | ||
if (terminal_filter_decoded_end_stream) { | ||
break; | ||
KBaichoo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
if (!(*entry)->commonHandleAfterTrailersCallback(status) && | ||
std::next(entry) != decoder_filters_.end()) { | ||
return; | ||
} | ||
} | ||
disarmRequestTimeout(); | ||
maybeEndDecode(terminal_filter_decoded_end_stream); | ||
} | ||
|
||
void FilterManager::decodeMetadata(ActiveStreamDecoderFilter* filter, MetadataMap& metadata_map) { | ||
|
@@ -922,9 +959,9 @@ void DownstreamFilterManager::sendLocalReply( | |
|
||
// Stop filter chain iteration if local reply was sent while filter decoding or encoding callbacks | ||
// are running. | ||
if (state_.filter_call_state_ & FilterCallState::IsDecodingMask) { | ||
state_.decoder_filter_chain_aborted_ = true; | ||
} else if (state_.filter_call_state_ & FilterCallState::IsEncodingMask) { | ||
// Local reply always stops decoder filter chain. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this looks like a functional change. Should it be behind this or a separate runtime guard? maybe split out and land ahead? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Made it flag protected. I will create separate PR to make it so that decoder filter is always aborted on local reply. |
||
state_.decoder_filter_chain_aborted_ = true; | ||
if (state_.filter_call_state_ & FilterCallState::IsEncodingMask) { | ||
state_.encoder_filter_chain_aborted_ = true; | ||
} | ||
|
||
|
@@ -1448,6 +1485,56 @@ void FilterManager::maybeEndEncode(bool end_stream) { | |
if (end_stream) { | ||
ASSERT(!state_.encoder_filter_chain_complete_); | ||
state_.encoder_filter_chain_complete_ = true; | ||
if (filter_manager_callbacks_.isHalfCloseEnabled()) { | ||
// If independent half close is enabled the stream is closed when both decoder and encoder | ||
// filter chains has completed or were aborted. | ||
checkAndCloseStreamIfFullyClosed(); | ||
} else { | ||
// Otherwise encoding end_stream always closes the stream (and resets it if request was not | ||
// complete yet). | ||
filter_manager_callbacks_.endStream(); | ||
} | ||
} | ||
} | ||
|
||
void FilterManager::maybeEndDecode(bool terminal_filter_decoded_end_stream) { | ||
if (terminal_filter_decoded_end_stream) { | ||
ASSERT(!state_.decoder_filter_chain_complete_); | ||
state_.decoder_filter_chain_complete_ = true; | ||
// If the decoder filter chain was aborted (i.e. due to local reply) | ||
// we rely on the encoding of end_stream to close the stream. | ||
if (filter_manager_callbacks_.isHalfCloseEnabled() && !stopDecoderFilterChain()) { | ||
checkAndCloseStreamIfFullyClosed(); | ||
} | ||
} | ||
} | ||
|
||
void FilterManager::checkAndCloseStreamIfFullyClosed() { | ||
// This function is only used when half close semantics are enabled. | ||
if (!filter_manager_callbacks_.isHalfCloseEnabled()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. right now it's only called if half close is enabled right? should it be an envoy bug if we early return? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added ASSERT |
||
return; | ||
} | ||
// When the independent half close is enabled the stream is always closed on error responses | ||
// from the server. | ||
if (filter_manager_callbacks_.responseHeaders().has_value()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should definitely have a doc section on how half close works as I don't recall this being mentioned before (but IMO sounds reasonable) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added |
||
const uint64_t response_status = | ||
Http::Utility::getResponseStatus(filter_manager_callbacks_.responseHeaders().ref()); | ||
bool error_response = | ||
KBaichoo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
!(Http::CodeUtility::is2xx(response_status) || Http::CodeUtility::is1xx(response_status)); | ||
// Abort the decoder filter if it has not yet been completed. | ||
if (error_response && !state_.decoder_filter_chain_complete_) { | ||
state_.decoder_filter_chain_aborted_ = true; | ||
} | ||
} | ||
|
||
// If independent half close is enabled then close the stream when | ||
// 1. Both encoder and decoder filter chains has completed. | ||
// 2. Encoder filter chain has completed and decoder filter chain was aborted (i.e. local reply). | ||
// There is no need to check for aborted encoder filter chain as the filter will either be | ||
// completed or stream is reset. | ||
if (state_.encoder_filter_chain_complete_ && | ||
(state_.decoder_filter_chain_complete_ || state_.decoder_filter_chain_aborted_)) { | ||
ENVOY_STREAM_LOG(trace, "closing stream", *this); | ||
filter_manager_callbacks_.endStream(); | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optional but I tend to leave runtime guards out of docs as it's easy to forget to clean them up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed