Skip to content

Commit

Permalink
http: unified the filter chain creation in the FilterManager (#37112)
Browse files Browse the repository at this point in the history
Commit Message: http: unified the filter chain creation in the
FilterManager
Additional Description:

This refactor ensure the all the http filter chain creation is done in
the FilterManager. (This also make sure we can do more control to the
filter chain creations.)

In the previous implementation, the `createFilterChain` method (of
`FilterManager`) is only used by the downstream filter chain. And the
upstream filter chain use another way to create filter chain. It make
that it's hard to control the behavior of filter chain creation in
uniform way.

The `createFilterChain` will return a value to tell whether the upgrade
is rejected. And use `upgradeFilterChainCreated` callback to tell
whether the upgrade is accepted. If both are none, then, that means no
upgrade.

In the new implementation, we unified the way of filter chain creation.
The `createFilterChain` method will be used for both downstream and
upstream. And a uniform way will be sued to report the upgrade results.

Risk Level: mid.
Testing: n/a.
Docs Changes: n/a.
Release Notes: n/a.
Platform Specific Features: n/a.

---------

Signed-off-by: wangbaiping(wbpcode) <[email protected]>
  • Loading branch information
wbpcode authored Dec 4, 2024
1 parent 240aa12 commit bc1ed87
Show file tree
Hide file tree
Showing 10 changed files with 144 additions and 82 deletions.
11 changes: 8 additions & 3 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1399,8 +1399,13 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapSharedPt
}

filter_manager_.streamInfo().setRequestHeaders(*request_headers_);

const bool upgrade_rejected = filter_manager_.createFilterChain() == false;
const FilterManager::CreateChainResult create_chain_result =
filter_manager_.createDownstreamFilterChain();
if (create_chain_result.upgradeAccepted()) {
connection_manager_.stats_.named_.downstream_cx_upgrades_total_.inc();
connection_manager_.stats_.named_.downstream_cx_upgrades_active_.inc();
state_.successful_upgrade_ = true;
}

if (connection_manager_.config_->flushAccessLogOnNewRequest()) {
log(AccessLog::AccessLogType::DownstreamStart);
Expand All @@ -1410,7 +1415,7 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapSharedPt
// should return 404. The current returns no response if there is no router filter.
if (hasCachedRoute()) {
// Do not allow upgrades if the route does not support it.
if (upgrade_rejected) {
if (create_chain_result.upgradeRejected()) {
// While downstream servers should not send upgrade payload without the upgrade being
// accepted, err on the side of caution and refuse to process any further requests on this
// connection, to avoid a class of HTTP/1.1 smuggling bugs where Upgrade or CONNECT payload
Expand Down
5 changes: 0 additions & 5 deletions source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -286,11 +286,6 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
}
void onDecoderFilterBelowWriteBufferLowWatermark() override;
void onDecoderFilterAboveWriteBufferHighWatermark() override;
void upgradeFilterChainCreated() override {
connection_manager_.stats_.named_.downstream_cx_upgrades_total_.inc();
connection_manager_.stats_.named_.downstream_cx_upgrades_active_.inc();
state_.successful_upgrade_ = true;
}
void disarmRequestTimeout() override;
void resetIdleTimer() override;
void recreateStream(StreamInfo::FilterStateSharedPtr filter_state) override;
Expand Down
77 changes: 50 additions & 27 deletions source/common/http/filter_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1042,7 +1042,7 @@ void DownstreamFilterManager::prepareLocalReplyViaFilterChain(
// For early error handling, do a best-effort attempt to create a filter chain
// to ensure access logging. If the filter chain already exists this will be
// a no-op.
createFilterChain();
createDownstreamFilterChain();

if (prepared_local_reply_) {
return;
Expand Down Expand Up @@ -1083,6 +1083,10 @@ void DownstreamFilterManager::executeLocalReplyIfPrepared() {
Utility::encodeLocalReply(state_.destroyed_, std::move(prepared_local_reply_));
}

FilterManager::CreateChainResult DownstreamFilterManager::createDownstreamFilterChain() {
return createFilterChain(filter_chain_factory_, false);
}

void DownstreamFilterManager::sendLocalReplyViaFilterChain(
bool is_grpc_request, Code code, absl::string_view body,
const std::function<void(ResponseHeaderMap& headers)>& modify_headers, bool is_head_request,
Expand All @@ -1092,7 +1096,7 @@ void DownstreamFilterManager::sendLocalReplyViaFilterChain(
// For early error handling, do a best-effort attempt to create a filter chain
// to ensure access logging. If the filter chain already exists this will be
// a no-op.
createFilterChain();
createDownstreamFilterChain();

Utility::sendLocalReply(
state_.destroyed_,
Expand Down Expand Up @@ -1629,11 +1633,9 @@ void FilterManager::contextOnContinue(ScopeTrackedObjectStack& tracked_object_st
tracked_object_stack.add(filter_manager_callbacks_.scope());
}

bool FilterManager::createFilterChain() {
if (state_.created_filter_chain_) {
return false;
}
bool upgrade_rejected = false;
FilterManager::UpgradeResult
FilterManager::createUpgradeFilterChain(const FilterChainFactory& filter_chain_factory,
const FilterChainOptionsImpl& options) {
const HeaderEntry* upgrade = nullptr;
if (filter_manager_callbacks_.requestHeaders()) {
upgrade = filter_manager_callbacks_.requestHeaders()->Upgrade();
Expand All @@ -1644,28 +1646,49 @@ bool FilterManager::createFilterChain() {
}
}

if (upgrade == nullptr) {
// No upgrade header, no upgrade filter chain.
return UpgradeResult::UpgradeUnneeded;
}

const Router::RouteEntry::UpgradeMap* upgrade_map = filter_manager_callbacks_.upgradeMap();
return filter_chain_factory.createUpgradeFilterChain(upgrade->value().getStringView(),
upgrade_map, *this, options)
? UpgradeResult::UpgradeAccepted
: UpgradeResult::UpgradeRejected;
}

FilterManager::CreateChainResult
FilterManager::createFilterChain(const FilterChainFactory& filter_chain_factory,
bool only_create_if_configured) {
if (state_.create_chain_result_.created()) {
return state_.create_chain_result_;
}

OptRef<DownstreamStreamFilterCallbacks> downstream_callbacks =
filter_manager_callbacks_.downstreamCallbacks();

// This filter chain options is only used for the downstream HTTP filter chains for now. So, try
// to set valid initial route only when the downstream callbacks is available.
FilterChainOptionsImpl options(
filter_manager_callbacks_.downstreamCallbacks().has_value() ? streamInfo().route() : nullptr);
FilterChainOptionsImpl options(downstream_callbacks.has_value() ? streamInfo().route() : nullptr);

state_.created_filter_chain_ = true;
if (upgrade != nullptr) {
const Router::RouteEntry::UpgradeMap* upgrade_map = filter_manager_callbacks_.upgradeMap();
UpgradeResult upgrade = UpgradeResult::UpgradeUnneeded;

if (filter_chain_factory_.createUpgradeFilterChain(upgrade->value().getStringView(),
upgrade_map, *this, options)) {
filter_manager_callbacks_.upgradeFilterChainCreated();
return true;
} else {
upgrade_rejected = true;
// Fall through to the default filter chain. The function calling this
// will send a local reply indicating that the upgrade failed.
// Only try the upgrade filter chain for downstream filter chains.
if (downstream_callbacks.has_value()) {
upgrade = createUpgradeFilterChain(filter_chain_factory, options);
if (upgrade == UpgradeResult::UpgradeAccepted) {
// Upgrade filter chain is created. Return the result directly.
state_.create_chain_result_ = CreateChainResult(true, upgrade);
return state_.create_chain_result_;
}
// If the upgrade is unnecessary or the upgrade filter chain is rejected, fall through to
// create the default filter chain.
}

filter_chain_factory_.createFilterChain(*this, false, options);
return !upgrade_rejected;
state_.create_chain_result_ = CreateChainResult(
filter_chain_factory.createFilterChain(*this, only_create_if_configured, options), upgrade);
return state_.create_chain_result_;
}

void ActiveStreamDecoderFilter::requestDataDrained() {
Expand Down Expand Up @@ -1720,11 +1743,11 @@ bool ActiveStreamDecoderFilter::recreateStream(const ResponseHeaderMap* headers)

if (headers != nullptr) {
// The call to setResponseHeaders is needed to ensure that the headers are properly logged in
// access logs before the stream is destroyed. Since the function expects a ResponseHeaderPtr&&,
// ownership of the headers must be passed. This cannot happen earlier in the flow (such as in
// the call to setupRedirect) because at that point it is still possible for the headers to be
// used in a different logical branch. We work around this by creating a copy and passing
// ownership of the copy instead.
// access logs before the stream is destroyed. Since the function expects a
// ResponseHeaderPtr&&, ownership of the headers must be passed. This cannot happen earlier in
// the flow (such as in the call to setupRedirect) because at that point it is still possible
// for the headers to be used in a different logical branch. We work around this by creating a
// copy and passing ownership of the copy instead.
ResponseHeaderMapPtr headers_copy = createHeaderMap<ResponseHeaderMapImpl>(*headers);
parent_.filter_manager_callbacks_.setResponseHeaders(std::move(headers_copy));
parent_.filter_manager_callbacks_.chargeStats(*headers);
Expand Down
74 changes: 58 additions & 16 deletions source/common/http/filter_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -465,11 +465,6 @@ class FilterManagerCallbacks {
*/
virtual void onDecoderFilterAboveWriteBufferHighWatermark() PURE;

/**
* Called when the FilterManager creates an Upgrade filter chain.
*/
virtual void upgradeFilterChainCreated() PURE;

/**
* Called when request activity indicates that the request timeout should be disarmed.
*/
Expand Down Expand Up @@ -648,11 +643,10 @@ class FilterManager : public ScopeTrackedObject,
FilterManager(FilterManagerCallbacks& filter_manager_callbacks, Event::Dispatcher& dispatcher,
OptRef<const Network::Connection> connection, uint64_t stream_id,
Buffer::BufferMemoryAccountSharedPtr account, bool proxy_100_continue,
uint32_t buffer_limit, const FilterChainFactory& filter_chain_factory)
uint32_t buffer_limit)
: filter_manager_callbacks_(filter_manager_callbacks), dispatcher_(dispatcher),
connection_(connection), stream_id_(stream_id), account_(std::move(account)),
proxy_100_continue_(proxy_100_continue), buffer_limit_(buffer_limit),
filter_chain_factory_(filter_chain_factory) {}
proxy_100_continue_(proxy_100_continue), buffer_limit_(buffer_limit) {}

~FilterManager() override {
ASSERT(state_.destroyed_);
Expand Down Expand Up @@ -839,15 +833,56 @@ class FilterManager : public ScopeTrackedObject,
* a local reply without the overhead of creating and traversing the filters.
*/
void skipFilterChainCreation() {
ASSERT(!state_.created_filter_chain_);
state_.created_filter_chain_ = true;
ASSERT(!state_.create_chain_result_.created());
state_.create_chain_result_ = CreateChainResult(true);
}

virtual StreamInfo::StreamInfo& streamInfo() PURE;
virtual const StreamInfo::StreamInfo& streamInfo() const PURE;

// Set up the Encoder/Decoder filter chain.
bool createFilterChain();
enum class UpgradeResult : uint8_t { UpgradeUnneeded, UpgradeAccepted, UpgradeRejected };

/**
* Filter chain creation result.
*/
class CreateChainResult {
public:
CreateChainResult() = default;

/**
* @param created whether the filter chain was created.
* @param upgrade the upgrade result.
*/
CreateChainResult(bool created, UpgradeResult upgrade = UpgradeResult::UpgradeUnneeded)
: created_(created), upgrade_(upgrade) {}

/**
* @return whether the filter chain was created.
*/
bool created() const { return created_; }
/**
* @return whether the upgrade was accepted.
*/
bool upgradeAccepted() const { return upgrade_ == UpgradeResult::UpgradeAccepted; }
/**
* @return whether the upgrade was rejected.
*/
bool upgradeRejected() const { return upgrade_ == UpgradeResult::UpgradeRejected; }

private:
bool created_ = false;
UpgradeResult upgrade_ = UpgradeResult::UpgradeUnneeded;
};

/**
* Set up the Encoder/Decoder filter chain.
* @param filter_chain_factory the factory to create the filter chain.
* @param only_create_if_configured whether to only create the filter chain if it is configured
* explicitly. This only makes sense for upstream HTTP filter chain.
*
*/
CreateChainResult createFilterChain(const FilterChainFactory& filter_chain_factory,
bool only_create_if_configured);

OptRef<const Network::Connection> connection() const { return connection_; }

Expand Down Expand Up @@ -900,7 +935,6 @@ class FilterManager : public ScopeTrackedObject,
// By default, we will assume there are no 1xx. If encode1xxHeaders
// is ever called, this is set to true so commonContinue resumes processing the 1xx.
bool has_1xx_headers_{};
bool created_filter_chain_{};
// These two are latched on initial header read, to determine if the original headers
// constituted a HEAD or gRPC request, respectively.
bool is_head_request_{};
Expand All @@ -923,6 +957,9 @@ class FilterManager : public ScopeTrackedObject,
bool decoder_filters_streaming_{true};
bool destroyed_{false};

// Result of filter chain creation.
CreateChainResult create_chain_result_{};

// Used to track which filter is the latest filter that has received data.
ActiveStreamEncoderFilter* latest_data_encoding_filter_{};
ActiveStreamDecoderFilter* latest_data_decoding_filter_{};
Expand Down Expand Up @@ -985,6 +1022,9 @@ class FilterManager : public ScopeTrackedObject,
// Indicates which filter to start the iteration with.
enum class FilterIterationStartState { AlwaysStartFromNext, CanStartFromCurrent };

UpgradeResult createUpgradeFilterChain(const FilterChainFactory& filter_chain_factory,
const FilterChainOptionsImpl& options);

// Returns the encoder filter to start iteration with.
std::list<ActiveStreamEncoderFilterPtr>::iterator
commonEncodePrefix(ActiveStreamEncoderFilter* filter, bool end_stream,
Expand Down Expand Up @@ -1072,7 +1112,6 @@ class FilterManager : public ScopeTrackedObject,
std::make_shared<Network::Socket::Options>();
absl::optional<Upstream::LoadBalancerContext::OverrideHost> upstream_override_host_;

const FilterChainFactory& filter_chain_factory_;
// TODO(snowp): Once FM has been moved to its own file we'll make these private classes of FM,
// at which point they no longer need to be friends.
friend ActiveStreamFilterBase;
Expand Down Expand Up @@ -1127,11 +1166,11 @@ class DownstreamFilterManager : public FilterManager {
StreamInfo::FilterStateSharedPtr parent_filter_state,
Server::OverloadManager& overload_manager)
: FilterManager(filter_manager_callbacks, dispatcher, connection, stream_id, account,
proxy_100_continue, buffer_limit, filter_chain_factory),
proxy_100_continue, buffer_limit),
stream_info_(protocol, time_source, connection.connectionInfoProviderSharedPtr(),
StreamInfo::FilterState::LifeSpan::FilterChain,
std::move(parent_filter_state)),
local_reply_(local_reply),
local_reply_(local_reply), filter_chain_factory_(filter_chain_factory),
downstream_filter_load_shed_point_(overload_manager.getLoadShedPoint(
Server::LoadShedPointName::get().HttpDownstreamFilterCheck)),
use_filter_manager_state_for_downstream_end_stream_(Runtime::runtimeFeatureEnabled(
Expand All @@ -1155,6 +1194,8 @@ class DownstreamFilterManager : public FilterManager {
stream_info_.setDownstreamRemoteAddress(downstream_remote_address);
}

CreateChainResult createDownstreamFilterChain();

/**
* Called before local reply is made by the filter manager.
* @param data the data associated with the local reply.
Expand Down Expand Up @@ -1231,6 +1272,7 @@ class DownstreamFilterManager : public FilterManager {
private:
OverridableRemoteConnectionInfoSetterStreamInfo stream_info_;
const LocalReply::LocalReply& local_reply_;
const FilterChainFactory& filter_chain_factory_;
Utility::PreparedLocalReplyPtr prepared_local_reply_{nullptr};
Server::LoadShedPoint* downstream_filter_load_shed_point_{nullptr};
// Set by the envoy.reloadable_features.use_filter_manager_state_for_downstream_end_stream runtime
Expand Down
2 changes: 1 addition & 1 deletion source/common/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ class FilterUtility {
/**
* Configuration for the router filter.
*/
class FilterConfig : Http::FilterChainFactory {
class FilterConfig : public Http::FilterChainFactory {
public:
FilterConfig(Server::Configuration::CommonFactoryContext& factory_context,
Stats::StatName stat_prefix, const LocalInfo::LocalInfo& local_info,
Expand Down
19 changes: 10 additions & 9 deletions source/common/router/upstream_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,9 @@ class UpstreamFilterManager : public Http::FilterManager {
UpstreamFilterManager(Http::FilterManagerCallbacks& filter_manager_callbacks,
Event::Dispatcher& dispatcher, OptRef<const Network::Connection> connection,
uint64_t stream_id, Buffer::BufferMemoryAccountSharedPtr account,
bool proxy_100_continue, uint32_t buffer_limit,
const Http::FilterChainFactory& filter_chain_factory,
UpstreamRequest& request)
bool proxy_100_continue, uint32_t buffer_limit, UpstreamRequest& request)
: FilterManager(filter_manager_callbacks, dispatcher, connection, stream_id, account,
proxy_100_continue, buffer_limit, filter_chain_factory),
proxy_100_continue, buffer_limit),
upstream_request_(request) {}

StreamInfo::StreamInfo& streamInfo() override {
Expand Down Expand Up @@ -142,18 +140,21 @@ UpstreamRequest::UpstreamRequest(RouterFilterInterface& parent,
filter_manager_ = std::make_unique<UpstreamFilterManager>(
*filter_manager_callbacks_, parent_.callbacks()->dispatcher(), UpstreamRequest::connection(),
parent_.callbacks()->streamId(), parent_.callbacks()->account(), true,
parent_.callbacks()->decoderBufferLimit(), *parent_.cluster(), *this);
parent_.callbacks()->decoderBufferLimit(), *this);
// Attempt to create custom cluster-specified filter chain
bool created = parent_.cluster()->createFilterChain(*filter_manager_,
/*only_create_if_configured=*/true);
bool created = filter_manager_
->createFilterChain(*parent_.cluster(),
/*only_create_if_configured=*/true)
.created();

if (!created) {
// Attempt to create custom router-specified filter chain.
created = parent_.config().createFilterChain(*filter_manager_);
created = filter_manager_->createFilterChain(parent_.config(), false).created();
}
if (!created) {
// Neither cluster nor router have a custom filter chain; add the default
// cluster filter chain, which only consists of the codec filter.
created = parent_.cluster()->createFilterChain(*filter_manager_, false);
created = filter_manager_->createFilterChain(*parent_.cluster(), false).created();
}
// There will always be a codec filter present, which sets the upstream
// interface. Fast-fail any tests that don't set up mocks correctly.
Expand Down
3 changes: 0 additions & 3 deletions source/common/router/upstream_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,6 @@ class UpstreamRequestFilterManagerCallbacks : public Http::FilterManagerCallback
void recreateStream(StreamInfo::FilterStateSharedPtr) override {
IS_ENVOY_BUG("recreateStream called from upstream HTTP filter");
}
void upgradeFilterChainCreated() override {
IS_ENVOY_BUG("upgradeFilterChainCreated called from upstream HTTP filter");
}
OptRef<UpstreamStreamFilterCallbacks> upstreamCallbacks() override { return {*this}; }

// Http::UpstreamStreamFilterCallbacks
Expand Down
Loading

0 comments on commit bc1ed87

Please sign in to comment.