Skip to content

Commit

Permalink
Make encoding filters in the middle of the filter chain work. p2
Browse files Browse the repository at this point in the history
Signed-off-by: Yan Avlasov <[email protected]>
  • Loading branch information
yanavlasov committed Aug 9, 2024
1 parent 9e9428f commit 71ce1f0
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 10 deletions.
21 changes: 15 additions & 6 deletions source/common/http/filter_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ void ActiveStreamDecoderFilter::encode1xxHeaders(ResponseHeaderMapPtr&& headers)

void ActiveStreamDecoderFilter::encodeHeaders(ResponseHeaderMapPtr&& headers, bool end_stream,
absl::string_view details) {
encoded_end_stream_ = end_stream;
parent_.streamInfo().setResponseCodeDetails(details);
parent_.filter_manager_callbacks_.setResponseHeaders(std::move(headers));
if (end_stream && end_stream_) {
Expand All @@ -477,6 +478,7 @@ void ActiveStreamDecoderFilter::encodeHeaders(ResponseHeaderMapPtr&& headers, bo
}

void ActiveStreamDecoderFilter::encodeData(Buffer::Instance& data, bool end_stream) {
encoded_end_stream_ = end_stream;
if (end_stream && end_stream_) {
parent_.state_.decoder_filter_chain_aborted_ = true;
}
Expand All @@ -485,6 +487,7 @@ void ActiveStreamDecoderFilter::encodeData(Buffer::Instance& data, bool end_stre
}

void ActiveStreamDecoderFilter::encodeTrailers(ResponseTrailerMapPtr&& trailers) {
encoded_end_stream_ = true;
if (end_stream_) {
parent_.state_.decoder_filter_chain_aborted_ = true;
}
Expand Down Expand Up @@ -545,8 +548,6 @@ void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHead
if ((*entry)->end_stream_) {
state_.filter_call_state_ |= FilterCallState::EndOfStream;
}
last_filter_saw_end_stream =
(*entry)->end_stream_ && std::next(entry) == decoder_filters_.end();
FilterHeadersStatus status = (*entry)->decodeHeaders(headers, (*entry)->end_stream_);
state_.filter_call_state_ &= ~FilterCallState::DecodeHeaders;
if ((*entry)->end_stream_) {
Expand Down Expand Up @@ -612,6 +613,9 @@ void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHead
if (end_stream && buffered_request_data_ && continue_data_entry == decoder_filters_.end()) {
continue_data_entry = entry;
}
last_filter_saw_end_stream =
(end_stream && continue_data_entry == decoder_filters_.end()) &&
(std::next(entry) == decoder_filters_.end() || (*entry)->encoded_end_stream_);
}

maybeContinueDecoding(continue_data_entry);
Expand Down Expand Up @@ -692,8 +696,6 @@ void FilterManager::decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instan

state_.filter_call_state_ |= FilterCallState::DecodeData;
(*entry)->end_stream_ = end_stream && !filter_manager_callbacks_.requestTrailers();
last_filter_saw_end_stream =
(*entry)->end_stream_ && std::next(entry) == decoder_filters_.end();
FilterDataStatus status = (*entry)->handle_->decodeData(data, (*entry)->end_stream_);
if ((*entry)->end_stream_) {
(*entry)->handle_->decodeComplete();
Expand All @@ -719,6 +721,9 @@ void FilterManager::decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instan
trailers_added_entry = entry;
}

last_filter_saw_end_stream =
end_stream && (std::next(entry) == decoder_filters_.end() || (*entry)->encoded_end_stream_);

if (!(*entry)->commonHandleAfterDataCallback(status, data, state_.decoder_filters_streaming_) &&
std::next(entry) != decoder_filters_.end()) {
// Stop iteration IFF this is not the last filter. If it is the last filter, continue with
Expand Down Expand Up @@ -794,8 +799,6 @@ void FilterManager::decodeTrailers(ActiveStreamDecoderFilter* filter, RequestTra
FilterTrailersStatus status = (*entry)->handle_->decodeTrailers(trailers);
(*entry)->handle_->decodeComplete();
(*entry)->end_stream_ = true;
last_filter_saw_end_stream =
(*entry)->end_stream_ && std::next(entry) == decoder_filters_.end();
state_.filter_call_state_ &= ~FilterCallState::DecodeTrailers;
ENVOY_STREAM_LOG(trace, "decode trailers called: filter={} status={}", *this,
(*entry)->filter_context_.config_name, static_cast<uint64_t>(status));
Expand All @@ -808,6 +811,12 @@ void FilterManager::decodeTrailers(ActiveStreamDecoderFilter* filter, RequestTra
}

processNewlyAddedMetadata();
last_filter_saw_end_stream =
std::next(entry) == decoder_filters_.end() || (*entry)->encoded_end_stream_;

if (last_filter_saw_end_stream) {
break;
}

if (!(*entry)->commonHandleAfterTrailersCallback(status) &&
std::next(entry) != decoder_filters_.end()) {
Expand Down
1 change: 1 addition & 0 deletions source/common/http/filter_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ struct ActiveStreamDecoderFilter : public ActiveStreamFilterBase,

StreamDecoderFilterSharedPtr handle_;
bool is_grpc_request_{};
bool encoded_end_stream_{false};
};

using ActiveStreamDecoderFilterPtr = std::unique_ptr<ActiveStreamDecoderFilter>;
Expand Down
148 changes: 146 additions & 2 deletions test/common/http/conn_manager_impl_test_2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4348,7 +4348,7 @@ TEST_F(HttpConnectionManagerImplTest, PassMatchUpstreamSchemeHintToStreamInfo) {
}

// Validate that incomplete request is terminated when a non terminal filter
// initates encoding of the response (i.e. the cache filter).
// initiates encoding of the response (i.e. the cache filter).
// This only works when independent half-close mode is DISABLED.
TEST_F(HttpConnectionManagerImplTest, EncodingByNonTerminalFilter) {
TestScopedRuntime scoped_runtime;
Expand Down Expand Up @@ -4400,7 +4400,7 @@ TEST_F(HttpConnectionManagerImplTest, EncodingByNonTerminalFilter) {
}

// Validate that when independent half-close is enabled, encoding end_stream by a
// non-final filter ends the request iff the filter that intiated encoding of the end_stream has
// non-final filter ends the request iff the filter that initiated encoding of the end_stream has
// already observed the request end_stream.
TEST_F(HttpConnectionManagerImplTest, EncodingByNonTerminalFilterWithIndependentHalfClose) {
TestScopedRuntime scoped_runtime;
Expand Down Expand Up @@ -4453,5 +4453,149 @@ TEST_F(HttpConnectionManagerImplTest, EncodingByNonTerminalFilterWithIndependent
decoder_filters_[ecoder_filter_index]->callbacks_->encodeData(fake_response, true);
}

// Validate that when independent half-close is enabled, encoding end_stream by a
// non-final filter with incomplete request makes the encoding filter the terminal filter.
// In this case decoding end_stream from the client only reaches the filter that encoded the
// end_stream after which the request is completed.
TEST_F(HttpConnectionManagerImplTest, DecodingByNonTerminalEncoderFilterWithIndependentHalfClose) {
TestScopedRuntime scoped_runtime;
scoped_runtime.mergeValues(
{{"envoy.reloadable_features.allow_multiplexed_upstream_half_close", "true"}});
setup(false, "");
constexpr int total_filters = 3;
constexpr int ecoder_filter_index = 1;
setupFilterChain(total_filters, total_filters);

EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::StopIteration));

// Send incomplete request.
startRequest(false);

// For encode direction
EXPECT_CALL(*encoder_filters_[2], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(*encoder_filters_[1], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));

// Second decoder filter (there are 3 in total) initiates encoding
ResponseHeaderMapPtr response_headers{new TestResponseHeaderMapImpl{{":status", "200"}}};
decoder_filters_[ecoder_filter_index]->callbacks_->encodeHeaders(std::move(response_headers),
false, "details");

EXPECT_CALL(*encoder_filters_[2], encodeData(_, true))
.WillOnce(Return(FilterDataStatus::Continue));
EXPECT_CALL(*encoder_filters_[2], encodeComplete());
EXPECT_CALL(*encoder_filters_[1], encodeData(_, true))
.WillOnce(Return(FilterDataStatus::Continue));
EXPECT_CALL(*encoder_filters_[1], encodeComplete());
EXPECT_CALL(*encoder_filters_[0], encodeData(_, true))
.WillOnce(Return(FilterDataStatus::Continue));
EXPECT_CALL(*encoder_filters_[0], encodeComplete());

// Second decoder filter then completes encoding with data
Buffer::OwnedImpl fake_response("world");
decoder_filters_[ecoder_filter_index]->callbacks_->encodeData(fake_response, true);

// Request is still be alive with the half-close enabled.
// Verify that once the end_stream from the client reaches the filter that encoded the end_stream
// the request will end.
EXPECT_CALL(*decoder_filters_[0], decodeData(_, true))
.WillOnce(Return(FilterDataStatus::Continue));
EXPECT_CALL(*decoder_filters_[0], decodeComplete());
EXPECT_CALL(*decoder_filters_[1], decodeData(_, true))
.WillOnce(Return(FilterDataStatus::StopIterationNoBuffer));
EXPECT_CALL(*decoder_filters_[1], decodeComplete());
expectOnDestroy();

EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance& data) -> Http::Status {
decoder_->decodeData(data, true);
data.drain(4);
return Http::okStatus();
}));

Buffer::OwnedImpl fake_input("5678");
conn_manager_->onData(fake_input, false);
}

// Validate that when independent half-close is enabled, encoding end_stream by a
// non-final filter with incomplete request makes the encoding filter the terminal filter.
// In this case decoding end_stream from the client only reaches the filter that encoded the
// end_stream after which the request is completed.
TEST_F(HttpConnectionManagerImplTest, DecodingWithAddedTrailersByNonTerminalEncoderFilter) {
TestScopedRuntime scoped_runtime;
scoped_runtime.mergeValues(
{{"envoy.reloadable_features.allow_multiplexed_upstream_half_close", "true"}});
setup(false, "");
constexpr int total_filters = 3;
constexpr int ecoder_filter_index = 1;
setupFilterChain(total_filters, total_filters);

EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::StopIteration));

// Send incomplete request.
startRequest(false);

// For encode direction
EXPECT_CALL(*encoder_filters_[2], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(*encoder_filters_[1], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));

// Second decoder filter (there are 3 in total) initiates encoding
ResponseHeaderMapPtr response_headers{new TestResponseHeaderMapImpl{{":status", "200"}}};
decoder_filters_[ecoder_filter_index]->callbacks_->encodeHeaders(std::move(response_headers),
false, "details");

EXPECT_CALL(*encoder_filters_[2], encodeData(_, true))
.WillOnce(Return(FilterDataStatus::Continue));
EXPECT_CALL(*encoder_filters_[2], encodeComplete());
EXPECT_CALL(*encoder_filters_[1], encodeData(_, true))
.WillOnce(Return(FilterDataStatus::Continue));
EXPECT_CALL(*encoder_filters_[1], encodeComplete());
EXPECT_CALL(*encoder_filters_[0], encodeData(_, true))
.WillOnce(Return(FilterDataStatus::Continue));
EXPECT_CALL(*encoder_filters_[0], encodeComplete());

// Second decoder filter then completes encoding with data
Buffer::OwnedImpl fake_response("world");
decoder_filters_[ecoder_filter_index]->callbacks_->encodeData(fake_response, true);

// Request is still be alive with the half-close enabled.
// Verify that once the end_stream from the client reaches the filter that encoded the end_stream
// the request will end.
EXPECT_CALL(*decoder_filters_[0], decodeData(_, true))
.WillOnce(InvokeWithoutArgs([&]() -> FilterDataStatus {
decoder_filters_[1]->callbacks_->addDecodedTrailers().addCopy(Http::LowerCaseString("foo"),
"bar");
return FilterDataStatus::Continue;
}));
EXPECT_CALL(*decoder_filters_[0], decodeComplete());
EXPECT_CALL(*decoder_filters_[1], decodeData(_, false))
.WillOnce(Return(FilterDataStatus::StopIterationNoBuffer));
EXPECT_CALL(*decoder_filters_[1], decodeTrailers(_))
.WillOnce(Return(FilterTrailersStatus::StopIteration));
EXPECT_CALL(*decoder_filters_[1], decodeComplete());
expectOnDestroy();

EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance& data) -> Http::Status {
decoder_->decodeData(data, true);
data.drain(4);
return Http::okStatus();
}));

Buffer::OwnedImpl fake_input("5678");
conn_manager_->onData(fake_input, false);
}

} // namespace Http
} // namespace Envoy
6 changes: 4 additions & 2 deletions test/common/http/conn_manager_impl_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -257,14 +257,16 @@ void HttpConnectionManagerImplMixin::setupFilterChain(int num_decoder_filters,
for (int i = 0; i < num_decoder_filters; i++) {
auto factory = createDecoderFilterFactoryCb(
StreamDecoderFilterSharedPtr{decoder_filters_[req * num_decoder_filters + i]});
manager.applyFilterFactoryCb({}, factory);
std::string name = absl::StrCat(req * num_decoder_filters + i);
manager.applyFilterFactoryCb({name, name}, factory);
applied_filters = true;
}

for (int i = 0; i < num_encoder_filters; i++) {
auto factory = createEncoderFilterFactoryCb(
StreamEncoderFilterSharedPtr{encoder_filters_[req * num_encoder_filters + i]});
manager.applyFilterFactoryCb({}, factory);
std::string name = absl::StrCat(req * num_decoder_filters + i);
manager.applyFilterFactoryCb({name, name}, factory);
applied_filters = true;
}
return applied_filters;
Expand Down

0 comments on commit 71ce1f0

Please sign in to comment.