Skip to content

Commit

Permalink
udp_session_filters: defer socket creation for after iterating filters (
Browse files Browse the repository at this point in the history
envoyproxy#29540)

Signed-off-by: ohadvano <[email protected]>
  • Loading branch information
ohadvano authored Sep 14, 2023
1 parent 5fc711e commit 1a41521
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 70 deletions.
2 changes: 2 additions & 0 deletions api/envoy/extensions/filters/udp/udp_proxy/v3/udp_proxy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ message UdpProxyConfig {
// Perform per packet load balancing (upstream host selection) on each received data chunk.
// The default if not specified is false, that means each data chunk is forwarded
// to upstream host selected on first chunk receival for that "session" (identified by source IP/port and local IP/port).
// Only one of use_per_packet_load_balancing or session_filters can be used.
bool use_per_packet_load_balancing = 7;

// Configuration for session access logs emitted by the UDP proxy. Note that certain UDP specific data is emitted as :ref:`Dynamic Metadata <config_access_log_format_dynamic_metadata>`.
Expand All @@ -125,5 +126,6 @@ message UdpProxyConfig {
repeated config.accesslog.v3.AccessLog proxy_access_log = 10;

// Optional session filters that will run for each UDP session.
// Only one of use_per_packet_load_balancing or session_filters can be used.
repeated SessionFilter session_filters = 11;
}
5 changes: 5 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ behavior_changes:
Switch from http_parser to BalsaParser for handling HTTP/1.1 traffic. See https://github.com/envoyproxy/envoy/issues/21245 for
details. This behavioral change can be reverted by setting runtime flag ``envoy.reloadable_features.http1_use_balsa_parser`` to
false.
- area: udp_proxy
change: |
When the UDP proxy has session filters, choosing the upstream host and creating a socket only happens after iterating all
``onNewSession()`` calls for all the filters in the chain. Upstream host health check for each downstream datagram does
not apply when there are session filters, and per-packet load balancing can't be used when there are session filters.
- area: zone-aware routing
change: |
Zone-aware routing is now enabled even when the originating and upstream cluster have different numbers of zones.
Expand Down
7 changes: 7 additions & 0 deletions docs/root/configuration/listeners/udp_filters/udp_proxy.rst
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ These kinds of filters run seprately on each upstream UDP session and support a
at the start of an upstream UDP session, when a UDP datagram is received from the downstream and when a UDP datagram is received from the
upstream, similar to network filters.

.. note::
When using session filters, choosing the upstream host only happens after completing the ``onNewSession()`` iteration for all
the filters in the chain. This allows choosing the host based on decisions made in one of the session filters, and prevents the
creation of upstream sockets in cases where one of the filters stopped the filter chain.
Additionally, since :ref:`per packet load balancing <envoy_v3_api_field_extensions.filters.udp.udp_proxy.v3.UdpProxyConfig.use_per_packet_load_balancing>` require
choosing the upstream host for each received datagram, session filters can't be used when this option is enabled.

Example configuration
---------------------

Expand Down
5 changes: 5 additions & 0 deletions source/extensions/filters/udp/udp_proxy/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ UdpProxyFilterConfigImpl::UdpProxyFilterConfigImpl(
// Default prefer_gro to true for upstream client traffic.
upstream_socket_config_(config.upstream_socket_config(), true),
random_(context.api().randomGenerator()) {
if (use_per_packet_load_balancing_ && config.session_filters().size() > 0) {
throw EnvoyException(
"Only one of use_per_packet_load_balancing or session_filters can be used.");
}

if (use_original_src_ip_ &&
!Api::OsSysCallsSingleton::get().supportsIpTransparent(
context.getServerFactoryContext().options().localAddressIpVersion())) {
Expand Down
1 change: 1 addition & 0 deletions source/extensions/filters/udp/udp_proxy/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class UdpProxyFilterConfigImpl : public UdpProxyFilterConfig,
return proxy_access_logs_;
}
const FilterChainFactory& sessionFilterFactory() const override { return *this; };
bool hasSessionFilters() const override { return !filter_factories_.empty(); }

// FilterChainFactory
void createFilterChain(FilterChainFactoryCallbacks& callbacks) const override {
Expand Down
153 changes: 104 additions & 49 deletions source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,14 @@ UdpProxyFilter::ClusterInfo::~ClusterInfo() {
}

void UdpProxyFilter::ClusterInfo::removeSession(const ActiveSession* session) {
// First remove from the host to sessions map.
ASSERT(host_to_sessions_[&session->host()].count(session) == 1);
auto host_sessions_it = host_to_sessions_.find(&session->host());
host_sessions_it->second.erase(session);
if (host_sessions_it->second.empty()) {
host_to_sessions_.erase(host_sessions_it);
if (session->host().has_value()) {
// First remove from the host to sessions map, in case the host was resolved.
ASSERT(host_to_sessions_[&session->host().value().get()].count(session) == 1);
auto host_sessions_it = host_to_sessions_.find(&session->host().value().get());
host_sessions_it->second.erase(session);
if (host_sessions_it->second.empty()) {
host_to_sessions_.erase(host_sessions_it);
}
}

// Now remove it from the primary map.
Expand All @@ -128,7 +130,8 @@ void UdpProxyFilter::ClusterInfo::removeSession(const ActiveSession* session) {

UdpProxyFilter::ActiveSession*
UdpProxyFilter::ClusterInfo::createSession(Network::UdpRecvData::LocalPeerAddresses&& addresses,
const Upstream::HostConstSharedPtr& optional_host) {
const Upstream::HostConstSharedPtr& optional_host,
bool defer_socket_creation) {
if (!cluster_.info()
->resourceManager(Upstream::ResourcePriority::Default)
.connections()
Expand All @@ -138,8 +141,13 @@ UdpProxyFilter::ClusterInfo::createSession(Network::UdpRecvData::LocalPeerAddres
return nullptr;
}

if (defer_socket_creation) {
ASSERT(!optional_host);
return createSessionWithOptionalHost(std::move(addresses), nullptr, true);
}

if (optional_host) {
return createSessionWithHost(std::move(addresses), optional_host);
return createSessionWithOptionalHost(std::move(addresses), optional_host, false);
}

auto host = chooseHost(addresses.peer_);
Expand All @@ -148,19 +156,24 @@ UdpProxyFilter::ClusterInfo::createSession(Network::UdpRecvData::LocalPeerAddres
cluster_.info()->trafficStats()->upstream_cx_none_healthy_.inc();
return nullptr;
}
return createSessionWithHost(std::move(addresses), host);

return createSessionWithOptionalHost(std::move(addresses), host, defer_socket_creation);
}

UdpProxyFilter::ActiveSession* UdpProxyFilter::ClusterInfo::createSessionWithHost(
Network::UdpRecvData::LocalPeerAddresses&& addresses,
const Upstream::HostConstSharedPtr& host) {
ASSERT(host);
auto new_session = std::make_unique<ActiveSession>(*this, std::move(addresses), host);
UdpProxyFilter::ActiveSession* UdpProxyFilter::ClusterInfo::createSessionWithOptionalHost(
Network::UdpRecvData::LocalPeerAddresses&& addresses, const Upstream::HostConstSharedPtr& host,
bool defer_socket_creation) {
ASSERT((defer_socket_creation && !host) || (!defer_socket_creation && host));
auto new_session =
std::make_unique<ActiveSession>(*this, std::move(addresses), host, defer_socket_creation);
new_session->createFilterChain();
new_session->onNewSession();
auto new_session_ptr = new_session.get();
sessions_.emplace(std::move(new_session));
host_to_sessions_[host.get()].emplace(new_session_ptr);
if (!defer_socket_creation) {
host_to_sessions_[host.get()].emplace(new_session_ptr);
}

return new_session_ptr;
}

Expand All @@ -178,25 +191,32 @@ UdpProxyFilter::StickySessionClusterInfo::StickySessionClusterInfo(
HeterogeneousActiveSessionEqual(false))) {}

Network::FilterStatus UdpProxyFilter::StickySessionClusterInfo::onData(Network::UdpRecvData& data) {
bool defer_socket = filter_.config_->hasSessionFilters();
const auto active_session_it = sessions_.find(data.addresses_);
ActiveSession* active_session;
if (active_session_it == sessions_.end()) {
active_session = createSession(std::move(data.addresses_));
active_session = createSession(std::move(data.addresses_), nullptr, defer_socket);
if (active_session == nullptr) {
return Network::FilterStatus::StopIteration;
}
} else {
active_session = active_session_it->get();
if (active_session->host().coarseHealth() == Upstream::Host::Health::Unhealthy) {
// We defer the socket creation when the session includes filters, so the filters can be
// iterated before choosing the host, to allow dynamically choosing upstream host. Due to this,
// we can't perform health checks during a session.
// TODO(ohadvano): add similar functionality that is performed after session filter chain
// iteration to signal filters about the unhealthy host, or replace the host during the session.
if (!defer_socket &&
active_session->host().value().get().coarseHealth() == Upstream::Host::Health::Unhealthy) {
// If a host becomes unhealthy, we optimally would like to replace it with a new session
// to a healthy host. We may eventually want to make this behavior configurable, but for now
// this will be the universal behavior.
auto host = chooseHost(data.addresses_.peer_);
if (host != nullptr && host->coarseHealth() != Upstream::Host::Health::Unhealthy &&
host.get() != &active_session->host()) {
host.get() != &active_session->host().value().get()) {
ENVOY_LOG(debug, "upstream session unhealthy, recreating the session");
removeSession(active_session);
active_session = createSession(std::move(data.addresses_), host);
active_session = createSession(std::move(data.addresses_), host, false);
} else {
// In this case we could not get a better host, so just keep using the current session.
ENVOY_LOG(trace, "upstream session unhealthy, but unable to get a better host");
Expand Down Expand Up @@ -230,14 +250,14 @@ UdpProxyFilter::PerPacketLoadBalancingClusterInfo::onData(Network::UdpRecvData&
const auto active_session_it = sessions_.find(key);
ActiveSession* active_session;
if (active_session_it == sessions_.end()) {
active_session = createSession(std::move(data.addresses_), host);
active_session = createSession(std::move(data.addresses_), host, false);
if (active_session == nullptr) {
return Network::FilterStatus::StopIteration;
}
} else {
active_session = active_session_it->get();
ENVOY_LOG(trace, "found already existing session on host {}.",
active_session->host().address()->asStringView());
active_session->host().value().get().address()->asStringView());
}

active_session->onData(data);
Expand All @@ -249,34 +269,41 @@ std::atomic<uint64_t> UdpProxyFilter::ActiveSession::next_global_session_id_;

UdpProxyFilter::ActiveSession::ActiveSession(ClusterInfo& cluster,
Network::UdpRecvData::LocalPeerAddresses&& addresses,
const Upstream::HostConstSharedPtr& host)
const Upstream::HostConstSharedPtr& host,
bool defer_socket_creation)
: cluster_(cluster), use_original_src_ip_(cluster_.filter_.config_->usingOriginalSrcIp()),
addresses_(std::move(addresses)), host_(host),
idle_timer_(cluster.filter_.read_callbacks_->udpListener().dispatcher().createTimer(
[this] { onIdleTimer(); })),
// NOTE: The socket call can only fail due to memory/fd exhaustion. No local ephemeral port
// is bound until the first packet is sent to the upstream host.
socket_(cluster.filter_.createSocket(host)),
udp_session_info_(
StreamInfo::StreamInfoImpl(cluster_.filter_.config_->timeSource(), nullptr)),
session_id_(next_global_session_id_++) {
ASSERT((defer_socket_creation && !host) || (!defer_socket_creation && host));
cluster_.filter_.config_->stats().downstream_sess_total_.inc();
cluster_.filter_.config_->stats().downstream_sess_active_.inc();
cluster_.cluster_.info()
->resourceManager(Upstream::ResourcePriority::Default)
.connections()
.inc();

if (!defer_socket_creation) {
createSocket(host);
}
}

void UdpProxyFilter::ActiveSession::createSocket(const Upstream::HostConstSharedPtr& host) {
// NOTE: The socket call can only fail due to memory/fd exhaustion. No local ephemeral port
// is bound until the first packet is sent to the upstream host.
socket_ = cluster_.filter_.createSocket(host);
socket_->ioHandle().initializeFileEvent(
cluster.filter_.read_callbacks_->udpListener().dispatcher(),
cluster_.filter_.read_callbacks_->udpListener().dispatcher(),
[this](uint32_t) { onReadReady(); }, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read);

ENVOY_LOG(debug, "creating new session: downstream={} local={} upstream={}",
addresses_.peer_->asStringView(), addresses_.local_->asStringView(),
host->address()->asStringView());

cluster_.filter_.config_->stats().downstream_sess_total_.inc();
cluster_.filter_.config_->stats().downstream_sess_active_.inc();
cluster_.cluster_.info()
->resourceManager(Upstream::ResourcePriority::Default)
.connections()
.inc();

if (use_original_src_ip_) {
const Network::Socket::OptionsSharedPtr socket_options =
Network::SocketOptionFactory::buildIpTransparentOptions();
Expand All @@ -298,7 +325,8 @@ UdpProxyFilter::ActiveSession::ActiveSession(ClusterInfo& cluster,
UdpProxyFilter::ActiveSession::~ActiveSession() {
ENVOY_LOG(debug, "deleting the session: downstream={} local={} upstream={}",
addresses_.peer_->asStringView(), addresses_.local_->asStringView(),
host_->address()->asStringView());
host_ != nullptr ? host_->address()->asStringView() : "unknown");

cluster_.filter_.config_->stats().downstream_sess_active_.dec();
cluster_.cluster_.info()
->resourceManager(Upstream::ResourcePriority::Default)
Expand Down Expand Up @@ -394,13 +422,15 @@ void UdpProxyFilter::ActiveSession::onNewSession() {
return;
}
}

createSocketDeferred();
}

void UdpProxyFilter::ActiveSession::onData(Network::UdpRecvData& data) {
absl::string_view host = socket_ != nullptr ? host_->address()->asStringView() : "unknown";
ENVOY_LOG(trace, "received {} byte datagram from downstream: downstream={} local={} upstream={}",
data.buffer_->length(), addresses_.peer_->asStringView(),
addresses_.local_->asStringView(), host_->address()->asStringView());

addresses_.local_->asStringView(), host);
const uint64_t rx_buffer_length = data.buffer_->length();
cluster_.filter_.config_->stats().downstream_sess_rx_bytes_.add(rx_buffer_length);
session_stats_.downstream_sess_rx_bytes_ += rx_buffer_length;
Expand All @@ -409,6 +439,22 @@ void UdpProxyFilter::ActiveSession::onData(Network::UdpRecvData& data) {

idle_timer_->enableTimer(cluster_.filter_.config_->sessionTimeout());

for (auto& active_read_filter : read_filters_) {
auto status = active_read_filter->read_filter_->onData(data);
if (status == ReadFilterStatus::StopIteration) {
return;
}
}

writeUpstream(data);
}

void UdpProxyFilter::ActiveSession::writeUpstream(Network::UdpRecvData& data) {
if (!socket_) {
ENVOY_LOG(debug, "cannot write upstream because the socket was not created.");
return;
}

// NOTE: On the first write, a local ephemeral port is bound, and thus this write can fail due to
// port exhaustion. To avoid exhaustion, UDP sockets will be connected and associated with
// a 4-tuple including the local IP, and the UDP port may be reused for multiple
Expand All @@ -417,7 +463,7 @@ void UdpProxyFilter::ActiveSession::onData(Network::UdpRecvData& data) {
// NOTE: We do not specify the local IP to use for the sendmsg call if use_original_src_ip_ is not
// set. We allow the OS to select the right IP based on outbound routing rules if
// use_original_src_ip_ is not set, else use downstream peer IP as local IP.
if (!use_original_src_ip_ && !connected_) {
if (!connected_ && !use_original_src_ip_) {
Api::SysCallIntResult rc = socket_->ioHandle().connect(host_->address());
if (SOCKET_FAILURE(rc.return_value_)) {
ENVOY_LOG(debug, "cannot connect: ({}) {}", rc.errno_, errorDetails(rc.errno_));
Expand All @@ -428,18 +474,7 @@ void UdpProxyFilter::ActiveSession::onData(Network::UdpRecvData& data) {
connected_ = true;
}

for (auto& active_read_filter : read_filters_) {
auto status = active_read_filter->read_filter_->onData(data);
if (status == ReadFilterStatus::StopIteration) {
return;
}
}

writeUpstream(data);
}

void UdpProxyFilter::ActiveSession::writeUpstream(Network::UdpRecvData& data) {
ASSERT(connected_ || use_original_src_ip_);
ASSERT((connected_ || use_original_src_ip_) && socket_ && host_);

const uint64_t tx_buffer_length = data.buffer_->length();
ENVOY_LOG(trace, "writing {} byte datagram upstream: downstream={} local={} upstream={}",
Expand Down Expand Up @@ -473,6 +508,26 @@ void UdpProxyFilter::ActiveSession::onContinueFilterChain(ActiveReadFilter* filt
break;
}
}

createSocketDeferred();
}

void UdpProxyFilter::ActiveSession::createSocketDeferred() {
if (socket_) {
// A session filter may call on continueFilterChain(), after already creating the socket,
// so we first check that the socket was not created already.
return;
}

host_ = cluster_.chooseHost(addresses_.peer_);
if (host_ == nullptr) {
ENVOY_LOG(debug, "cannot find any valid host.");
cluster_.cluster_.info()->trafficStats()->upstream_cx_none_healthy_.inc();
return;
}

cluster_.addSession(host_.get(), this);
createSocket(host_);
}

void UdpProxyFilter::ActiveSession::onInjectReadDatagramToFilterChain(ActiveReadFilter* filter,
Expand Down
Loading

0 comments on commit 1a41521

Please sign in to comment.