From 1a41521b9fb8840d93d809be022287aadf96b6fb Mon Sep 17 00:00:00 2001 From: ohadvano <49730675+ohadvano@users.noreply.github.com> Date: Thu, 14 Sep 2023 21:06:18 +0300 Subject: [PATCH] udp_session_filters: defer socket creation for after iterating filters (#29540) Signed-off-by: ohadvano --- .../filters/udp/udp_proxy/v3/udp_proxy.proto | 2 + changelogs/current.yaml | 5 + .../listeners/udp_filters/udp_proxy.rst | 7 + .../filters/udp/udp_proxy/config.cc | 5 + .../extensions/filters/udp/udp_proxy/config.h | 1 + .../filters/udp/udp_proxy/udp_proxy_filter.cc | 153 ++++++++++++------ .../filters/udp/udp_proxy/udp_proxy_filter.h | 40 +++-- .../session_filters/drainer_filter.h | 3 + .../udp/udp_proxy/udp_proxy_filter_test.cc | 22 +++ .../udp_proxy/udp_proxy_integration_test.cc | 30 ++-- 10 files changed, 198 insertions(+), 70 deletions(-) diff --git a/api/envoy/extensions/filters/udp/udp_proxy/v3/udp_proxy.proto b/api/envoy/extensions/filters/udp/udp_proxy/v3/udp_proxy.proto index 4e4ecfc60ff3..25560b5c8c08 100644 --- a/api/envoy/extensions/filters/udp/udp_proxy/v3/udp_proxy.proto +++ b/api/envoy/extensions/filters/udp/udp_proxy/v3/udp_proxy.proto @@ -116,6 +116,7 @@ message UdpProxyConfig { // Perform per packet load balancing (upstream host selection) on each received data chunk. // The default if not specified is false, that means each data chunk is forwarded // to upstream host selected on first chunk receival for that "session" (identified by source IP/port and local IP/port). + // Only one of use_per_packet_load_balancing or session_filters can be used. bool use_per_packet_load_balancing = 7; // Configuration for session access logs emitted by the UDP proxy. Note that certain UDP specific data is emitted as :ref:`Dynamic Metadata `. @@ -125,5 +126,6 @@ message UdpProxyConfig { repeated config.accesslog.v3.AccessLog proxy_access_log = 10; // Optional session filters that will run for each UDP session. + // Only one of use_per_packet_load_balancing or session_filters can be used. repeated SessionFilter session_filters = 11; } diff --git a/changelogs/current.yaml b/changelogs/current.yaml index e0a2e786f824..9f7c1ade0bfc 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -25,6 +25,11 @@ behavior_changes: Switch from http_parser to BalsaParser for handling HTTP/1.1 traffic. See https://github.com/envoyproxy/envoy/issues/21245 for details. This behavioral change can be reverted by setting runtime flag ``envoy.reloadable_features.http1_use_balsa_parser`` to false. +- area: udp_proxy + change: | + When the UDP proxy has session filters, choosing the upstream host and creating a socket only happens after iterating all + ``onNewSession()`` calls for all the filters in the chain. Upstream host health check for each downstream datagram does + not apply when there are session filters, and per-packet load balancing can't be used when there are session filters. - area: zone-aware routing change: | Zone-aware routing is now enabled even when the originating and upstream cluster have different numbers of zones. diff --git a/docs/root/configuration/listeners/udp_filters/udp_proxy.rst b/docs/root/configuration/listeners/udp_filters/udp_proxy.rst index c4f0f040af08..209db74c26a3 100644 --- a/docs/root/configuration/listeners/udp_filters/udp_proxy.rst +++ b/docs/root/configuration/listeners/udp_filters/udp_proxy.rst @@ -82,6 +82,13 @@ These kinds of filters run seprately on each upstream UDP session and support a at the start of an upstream UDP session, when a UDP datagram is received from the downstream and when a UDP datagram is received from the upstream, similar to network filters. +.. note:: + When using session filters, choosing the upstream host only happens after completing the ``onNewSession()`` iteration for all + the filters in the chain. This allows choosing the host based on decisions made in one of the session filters, and prevents the + creation of upstream sockets in cases where one of the filters stopped the filter chain. + Additionally, since :ref:`per packet load balancing ` require + choosing the upstream host for each received datagram, session filters can't be used when this option is enabled. + Example configuration --------------------- diff --git a/source/extensions/filters/udp/udp_proxy/config.cc b/source/extensions/filters/udp/udp_proxy/config.cc index da1fcb56419f..570b71eb7248 100644 --- a/source/extensions/filters/udp/udp_proxy/config.cc +++ b/source/extensions/filters/udp/udp_proxy/config.cc @@ -17,6 +17,11 @@ UdpProxyFilterConfigImpl::UdpProxyFilterConfigImpl( // Default prefer_gro to true for upstream client traffic. upstream_socket_config_(config.upstream_socket_config(), true), random_(context.api().randomGenerator()) { + if (use_per_packet_load_balancing_ && config.session_filters().size() > 0) { + throw EnvoyException( + "Only one of use_per_packet_load_balancing or session_filters can be used."); + } + if (use_original_src_ip_ && !Api::OsSysCallsSingleton::get().supportsIpTransparent( context.getServerFactoryContext().options().localAddressIpVersion())) { diff --git a/source/extensions/filters/udp/udp_proxy/config.h b/source/extensions/filters/udp/udp_proxy/config.h index d840222476cd..d0f6f943d1c4 100644 --- a/source/extensions/filters/udp/udp_proxy/config.h +++ b/source/extensions/filters/udp/udp_proxy/config.h @@ -45,6 +45,7 @@ class UdpProxyFilterConfigImpl : public UdpProxyFilterConfig, return proxy_access_logs_; } const FilterChainFactory& sessionFilterFactory() const override { return *this; }; + bool hasSessionFilters() const override { return !filter_factories_.empty(); } // FilterChainFactory void createFilterChain(FilterChainFactoryCallbacks& callbacks) const override { diff --git a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc index 608764d0f262..0adb70307def 100644 --- a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc +++ b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc @@ -113,12 +113,14 @@ UdpProxyFilter::ClusterInfo::~ClusterInfo() { } void UdpProxyFilter::ClusterInfo::removeSession(const ActiveSession* session) { - // First remove from the host to sessions map. - ASSERT(host_to_sessions_[&session->host()].count(session) == 1); - auto host_sessions_it = host_to_sessions_.find(&session->host()); - host_sessions_it->second.erase(session); - if (host_sessions_it->second.empty()) { - host_to_sessions_.erase(host_sessions_it); + if (session->host().has_value()) { + // First remove from the host to sessions map, in case the host was resolved. + ASSERT(host_to_sessions_[&session->host().value().get()].count(session) == 1); + auto host_sessions_it = host_to_sessions_.find(&session->host().value().get()); + host_sessions_it->second.erase(session); + if (host_sessions_it->second.empty()) { + host_to_sessions_.erase(host_sessions_it); + } } // Now remove it from the primary map. @@ -128,7 +130,8 @@ void UdpProxyFilter::ClusterInfo::removeSession(const ActiveSession* session) { UdpProxyFilter::ActiveSession* UdpProxyFilter::ClusterInfo::createSession(Network::UdpRecvData::LocalPeerAddresses&& addresses, - const Upstream::HostConstSharedPtr& optional_host) { + const Upstream::HostConstSharedPtr& optional_host, + bool defer_socket_creation) { if (!cluster_.info() ->resourceManager(Upstream::ResourcePriority::Default) .connections() @@ -138,8 +141,13 @@ UdpProxyFilter::ClusterInfo::createSession(Network::UdpRecvData::LocalPeerAddres return nullptr; } + if (defer_socket_creation) { + ASSERT(!optional_host); + return createSessionWithOptionalHost(std::move(addresses), nullptr, true); + } + if (optional_host) { - return createSessionWithHost(std::move(addresses), optional_host); + return createSessionWithOptionalHost(std::move(addresses), optional_host, false); } auto host = chooseHost(addresses.peer_); @@ -148,19 +156,24 @@ UdpProxyFilter::ClusterInfo::createSession(Network::UdpRecvData::LocalPeerAddres cluster_.info()->trafficStats()->upstream_cx_none_healthy_.inc(); return nullptr; } - return createSessionWithHost(std::move(addresses), host); + + return createSessionWithOptionalHost(std::move(addresses), host, defer_socket_creation); } -UdpProxyFilter::ActiveSession* UdpProxyFilter::ClusterInfo::createSessionWithHost( - Network::UdpRecvData::LocalPeerAddresses&& addresses, - const Upstream::HostConstSharedPtr& host) { - ASSERT(host); - auto new_session = std::make_unique(*this, std::move(addresses), host); +UdpProxyFilter::ActiveSession* UdpProxyFilter::ClusterInfo::createSessionWithOptionalHost( + Network::UdpRecvData::LocalPeerAddresses&& addresses, const Upstream::HostConstSharedPtr& host, + bool defer_socket_creation) { + ASSERT((defer_socket_creation && !host) || (!defer_socket_creation && host)); + auto new_session = + std::make_unique(*this, std::move(addresses), host, defer_socket_creation); new_session->createFilterChain(); new_session->onNewSession(); auto new_session_ptr = new_session.get(); sessions_.emplace(std::move(new_session)); - host_to_sessions_[host.get()].emplace(new_session_ptr); + if (!defer_socket_creation) { + host_to_sessions_[host.get()].emplace(new_session_ptr); + } + return new_session_ptr; } @@ -178,25 +191,32 @@ UdpProxyFilter::StickySessionClusterInfo::StickySessionClusterInfo( HeterogeneousActiveSessionEqual(false))) {} Network::FilterStatus UdpProxyFilter::StickySessionClusterInfo::onData(Network::UdpRecvData& data) { + bool defer_socket = filter_.config_->hasSessionFilters(); const auto active_session_it = sessions_.find(data.addresses_); ActiveSession* active_session; if (active_session_it == sessions_.end()) { - active_session = createSession(std::move(data.addresses_)); + active_session = createSession(std::move(data.addresses_), nullptr, defer_socket); if (active_session == nullptr) { return Network::FilterStatus::StopIteration; } } else { active_session = active_session_it->get(); - if (active_session->host().coarseHealth() == Upstream::Host::Health::Unhealthy) { + // We defer the socket creation when the session includes filters, so the filters can be + // iterated before choosing the host, to allow dynamically choosing upstream host. Due to this, + // we can't perform health checks during a session. + // TODO(ohadvano): add similar functionality that is performed after session filter chain + // iteration to signal filters about the unhealthy host, or replace the host during the session. + if (!defer_socket && + active_session->host().value().get().coarseHealth() == Upstream::Host::Health::Unhealthy) { // If a host becomes unhealthy, we optimally would like to replace it with a new session // to a healthy host. We may eventually want to make this behavior configurable, but for now // this will be the universal behavior. auto host = chooseHost(data.addresses_.peer_); if (host != nullptr && host->coarseHealth() != Upstream::Host::Health::Unhealthy && - host.get() != &active_session->host()) { + host.get() != &active_session->host().value().get()) { ENVOY_LOG(debug, "upstream session unhealthy, recreating the session"); removeSession(active_session); - active_session = createSession(std::move(data.addresses_), host); + active_session = createSession(std::move(data.addresses_), host, false); } else { // In this case we could not get a better host, so just keep using the current session. ENVOY_LOG(trace, "upstream session unhealthy, but unable to get a better host"); @@ -230,14 +250,14 @@ UdpProxyFilter::PerPacketLoadBalancingClusterInfo::onData(Network::UdpRecvData& const auto active_session_it = sessions_.find(key); ActiveSession* active_session; if (active_session_it == sessions_.end()) { - active_session = createSession(std::move(data.addresses_), host); + active_session = createSession(std::move(data.addresses_), host, false); if (active_session == nullptr) { return Network::FilterStatus::StopIteration; } } else { active_session = active_session_it->get(); ENVOY_LOG(trace, "found already existing session on host {}.", - active_session->host().address()->asStringView()); + active_session->host().value().get().address()->asStringView()); } active_session->onData(data); @@ -249,20 +269,34 @@ std::atomic UdpProxyFilter::ActiveSession::next_global_session_id_; UdpProxyFilter::ActiveSession::ActiveSession(ClusterInfo& cluster, Network::UdpRecvData::LocalPeerAddresses&& addresses, - const Upstream::HostConstSharedPtr& host) + const Upstream::HostConstSharedPtr& host, + bool defer_socket_creation) : cluster_(cluster), use_original_src_ip_(cluster_.filter_.config_->usingOriginalSrcIp()), addresses_(std::move(addresses)), host_(host), idle_timer_(cluster.filter_.read_callbacks_->udpListener().dispatcher().createTimer( [this] { onIdleTimer(); })), - // NOTE: The socket call can only fail due to memory/fd exhaustion. No local ephemeral port - // is bound until the first packet is sent to the upstream host. - socket_(cluster.filter_.createSocket(host)), udp_session_info_( StreamInfo::StreamInfoImpl(cluster_.filter_.config_->timeSource(), nullptr)), session_id_(next_global_session_id_++) { + ASSERT((defer_socket_creation && !host) || (!defer_socket_creation && host)); + cluster_.filter_.config_->stats().downstream_sess_total_.inc(); + cluster_.filter_.config_->stats().downstream_sess_active_.inc(); + cluster_.cluster_.info() + ->resourceManager(Upstream::ResourcePriority::Default) + .connections() + .inc(); + if (!defer_socket_creation) { + createSocket(host); + } +} + +void UdpProxyFilter::ActiveSession::createSocket(const Upstream::HostConstSharedPtr& host) { + // NOTE: The socket call can only fail due to memory/fd exhaustion. No local ephemeral port + // is bound until the first packet is sent to the upstream host. + socket_ = cluster_.filter_.createSocket(host); socket_->ioHandle().initializeFileEvent( - cluster.filter_.read_callbacks_->udpListener().dispatcher(), + cluster_.filter_.read_callbacks_->udpListener().dispatcher(), [this](uint32_t) { onReadReady(); }, Event::PlatformDefaultTriggerType, Event::FileReadyType::Read); @@ -270,13 +304,6 @@ UdpProxyFilter::ActiveSession::ActiveSession(ClusterInfo& cluster, addresses_.peer_->asStringView(), addresses_.local_->asStringView(), host->address()->asStringView()); - cluster_.filter_.config_->stats().downstream_sess_total_.inc(); - cluster_.filter_.config_->stats().downstream_sess_active_.inc(); - cluster_.cluster_.info() - ->resourceManager(Upstream::ResourcePriority::Default) - .connections() - .inc(); - if (use_original_src_ip_) { const Network::Socket::OptionsSharedPtr socket_options = Network::SocketOptionFactory::buildIpTransparentOptions(); @@ -298,7 +325,8 @@ UdpProxyFilter::ActiveSession::ActiveSession(ClusterInfo& cluster, UdpProxyFilter::ActiveSession::~ActiveSession() { ENVOY_LOG(debug, "deleting the session: downstream={} local={} upstream={}", addresses_.peer_->asStringView(), addresses_.local_->asStringView(), - host_->address()->asStringView()); + host_ != nullptr ? host_->address()->asStringView() : "unknown"); + cluster_.filter_.config_->stats().downstream_sess_active_.dec(); cluster_.cluster_.info() ->resourceManager(Upstream::ResourcePriority::Default) @@ -394,13 +422,15 @@ void UdpProxyFilter::ActiveSession::onNewSession() { return; } } + + createSocketDeferred(); } void UdpProxyFilter::ActiveSession::onData(Network::UdpRecvData& data) { + absl::string_view host = socket_ != nullptr ? host_->address()->asStringView() : "unknown"; ENVOY_LOG(trace, "received {} byte datagram from downstream: downstream={} local={} upstream={}", data.buffer_->length(), addresses_.peer_->asStringView(), - addresses_.local_->asStringView(), host_->address()->asStringView()); - + addresses_.local_->asStringView(), host); const uint64_t rx_buffer_length = data.buffer_->length(); cluster_.filter_.config_->stats().downstream_sess_rx_bytes_.add(rx_buffer_length); session_stats_.downstream_sess_rx_bytes_ += rx_buffer_length; @@ -409,6 +439,22 @@ void UdpProxyFilter::ActiveSession::onData(Network::UdpRecvData& data) { idle_timer_->enableTimer(cluster_.filter_.config_->sessionTimeout()); + for (auto& active_read_filter : read_filters_) { + auto status = active_read_filter->read_filter_->onData(data); + if (status == ReadFilterStatus::StopIteration) { + return; + } + } + + writeUpstream(data); +} + +void UdpProxyFilter::ActiveSession::writeUpstream(Network::UdpRecvData& data) { + if (!socket_) { + ENVOY_LOG(debug, "cannot write upstream because the socket was not created."); + return; + } + // NOTE: On the first write, a local ephemeral port is bound, and thus this write can fail due to // port exhaustion. To avoid exhaustion, UDP sockets will be connected and associated with // a 4-tuple including the local IP, and the UDP port may be reused for multiple @@ -417,7 +463,7 @@ void UdpProxyFilter::ActiveSession::onData(Network::UdpRecvData& data) { // NOTE: We do not specify the local IP to use for the sendmsg call if use_original_src_ip_ is not // set. We allow the OS to select the right IP based on outbound routing rules if // use_original_src_ip_ is not set, else use downstream peer IP as local IP. - if (!use_original_src_ip_ && !connected_) { + if (!connected_ && !use_original_src_ip_) { Api::SysCallIntResult rc = socket_->ioHandle().connect(host_->address()); if (SOCKET_FAILURE(rc.return_value_)) { ENVOY_LOG(debug, "cannot connect: ({}) {}", rc.errno_, errorDetails(rc.errno_)); @@ -428,18 +474,7 @@ void UdpProxyFilter::ActiveSession::onData(Network::UdpRecvData& data) { connected_ = true; } - for (auto& active_read_filter : read_filters_) { - auto status = active_read_filter->read_filter_->onData(data); - if (status == ReadFilterStatus::StopIteration) { - return; - } - } - - writeUpstream(data); -} - -void UdpProxyFilter::ActiveSession::writeUpstream(Network::UdpRecvData& data) { - ASSERT(connected_ || use_original_src_ip_); + ASSERT((connected_ || use_original_src_ip_) && socket_ && host_); const uint64_t tx_buffer_length = data.buffer_->length(); ENVOY_LOG(trace, "writing {} byte datagram upstream: downstream={} local={} upstream={}", @@ -473,6 +508,26 @@ void UdpProxyFilter::ActiveSession::onContinueFilterChain(ActiveReadFilter* filt break; } } + + createSocketDeferred(); +} + +void UdpProxyFilter::ActiveSession::createSocketDeferred() { + if (socket_) { + // A session filter may call on continueFilterChain(), after already creating the socket, + // so we first check that the socket was not created already. + return; + } + + host_ = cluster_.chooseHost(addresses_.peer_); + if (host_ == nullptr) { + ENVOY_LOG(debug, "cannot find any valid host."); + cluster_.cluster_.info()->trafficStats()->upstream_cx_none_healthy_.inc(); + return; + } + + cluster_.addSession(host_.get(), this); + createSocket(host_); } void UdpProxyFilter::ActiveSession::onInjectReadDatagramToFilterChain(ActiveReadFilter* filter, diff --git a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h index 494dde6e6dae..f006bba45d02 100644 --- a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h +++ b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h @@ -93,6 +93,7 @@ class UdpProxyFilterConfig { virtual const std::vector& sessionAccessLogs() const PURE; virtual const std::vector& proxyAccessLogs() const PURE; virtual const FilterChainFactory& sessionFilterFactory() const PURE; + virtual bool hasSessionFilters() const PURE; }; using UdpProxyFilterConfigSharedPtr = std::shared_ptr; @@ -178,14 +179,22 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, class ActiveSession : public Network::UdpPacketProcessor, public FilterChainFactoryCallbacks { public: ActiveSession(ClusterInfo& parent, Network::UdpRecvData::LocalPeerAddresses&& addresses, - const Upstream::HostConstSharedPtr& host); + const Upstream::HostConstSharedPtr& host, bool defer_socket_creation); ~ActiveSession() override; const Network::UdpRecvData::LocalPeerAddresses& addresses() const { return addresses_; } - const Upstream::Host& host() const { return *host_; } + absl::optional> host() const { + if (host_) { + return *host_; + } + + return absl::nullopt; + } void onNewSession(); void onData(Network::UdpRecvData& data); void writeUpstream(Network::UdpRecvData& data); void writeDownstream(Network::UdpRecvData& data); + void createSocket(const Upstream::HostConstSharedPtr& host); + void createSocketDeferred(); void createFilterChain() { cluster_.filter_.config_->sessionFilterFactory().createFilterChain(*this); @@ -257,7 +266,7 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, ClusterInfo& cluster_; const bool use_original_src_ip_; const Network::UdpRecvData::LocalPeerAddresses addresses_; - const Upstream::HostConstSharedPtr host_; + Upstream::HostConstSharedPtr host_; // TODO(mattklein123): Consider replacing an idle timer for each session with a last used // time stamp and a periodic scan of all sessions to look for timeouts. This solution is simple, // though it might not perform well for high volume traffic. Note that this is how TCP proxy @@ -267,7 +276,7 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, // The socket is used for writing packets to the selected upstream host as well as receiving // packets from the upstream host. Note that a a local ephemeral port is bound on the first // write to the upstream host. - const Network::SocketPtr socket_; + Network::SocketPtr socket_; // The socket has been connected to avoid port exhaustion. bool connected_{}; @@ -282,7 +291,7 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, struct LocalPeerHostAddresses { const Network::UdpRecvData::LocalPeerAddresses& local_peer_addresses_; - const Upstream::Host& host_; + absl::optional> host_; }; struct HeterogeneousActiveSessionHash { @@ -303,7 +312,7 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, size_t operator()(const LocalPeerHostAddresses& value) const { auto hash = this->operator()(value.local_peer_addresses_); if (consider_host_) { - hash = absl::HashOf(hash, value.host_.address()->asStringView()); + hash = absl::HashOf(hash, value.host_.value().get().address()->asStringView()); } return hash; } @@ -329,7 +338,7 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, } bool operator()(const ActiveSessionPtr& lhs, const LocalPeerHostAddresses& rhs) const { return this->operator()(lhs, rhs.local_peer_addresses_) && - (consider_host_ ? &lhs->host() == &rhs.host_ : true); + (consider_host_ ? &lhs->host().value().get() == &rhs.host_.value().get() : true); } bool operator()(const ActiveSessionPtr& lhs, const ActiveSession* rhs) const { LocalPeerHostAddresses key{rhs->addresses(), rhs->host()}; @@ -358,6 +367,12 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, virtual ~ClusterInfo(); virtual Network::FilterStatus onData(Network::UdpRecvData& data) PURE; void removeSession(const ActiveSession* session); + void addSession(const Upstream::Host* host, const ActiveSession* session) { + host_to_sessions_[host].emplace(session); + } + + Upstream::HostConstSharedPtr + chooseHost(const Network::Address::InstanceConstSharedPtr& peer_address) const; UdpProxyFilter& filter_; Upstream::ThreadLocalCluster& cluster_; @@ -365,9 +380,8 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, protected: ActiveSession* createSession(Network::UdpRecvData::LocalPeerAddresses&& addresses, - const Upstream::HostConstSharedPtr& optional_host = nullptr); - Upstream::HostConstSharedPtr - chooseHost(const Network::Address::InstanceConstSharedPtr& peer_address) const; + const Upstream::HostConstSharedPtr& optional_host, + bool defer_socket_creation); SessionStorageType sessions_; @@ -376,8 +390,10 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, const auto final_prefix = "udp"; return {ALL_UDP_PROXY_UPSTREAM_STATS(POOL_COUNTER_PREFIX(scope, final_prefix))}; } - ActiveSession* createSessionWithHost(Network::UdpRecvData::LocalPeerAddresses&& addresses, - const Upstream::HostConstSharedPtr& host); + ActiveSession* + createSessionWithOptionalHost(Network::UdpRecvData::LocalPeerAddresses&& addresses, + const Upstream::HostConstSharedPtr& host, + bool defer_socket_creation); Envoy::Common::CallbackHandlePtr member_update_cb_handle_; absl::flat_hash_map> diff --git a/test/extensions/filters/udp/udp_proxy/session_filters/drainer_filter.h b/test/extensions/filters/udp/udp_proxy/session_filters/drainer_filter.h index 22c940f8f1cd..438de329d5a0 100644 --- a/test/extensions/filters/udp/udp_proxy/session_filters/drainer_filter.h +++ b/test/extensions/filters/udp/udp_proxy/session_filters/drainer_filter.h @@ -39,6 +39,9 @@ class DrainerUdpSessionReadFilter : public virtual ReadFilter { downstream_bytes_to_drain_++; return ReadFilterStatus::StopIteration; } else { + // Make sure that we are able to call continueFilterChain() here without + // impacting the filter chain iteration. + read_callbacks_->continueFilterChain(); return ReadFilterStatus::Continue; } } diff --git a/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc b/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc index 834f847cd4f8..59e2d45b09cd 100644 --- a/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc +++ b/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc @@ -937,6 +937,28 @@ TEST_F(UdpProxyFilterTest, SocketOptionForUseOriginalSrcIp) { ensureIpTransparentSocketOptions(upstream_address_, "10.0.0.2:80", 1, 0); } +TEST_F(UdpProxyFilterTest, MutualExcludePerPacketLoadBalancingAndSessionFilters) { + auto config = R"EOF( +stat_prefix: foo +matcher: + on_no_match: + action: + name: route + typed_config: + '@type': type.googleapis.com/envoy.extensions.filters.udp.udp_proxy.v3.Route + cluster: fake_cluster +use_per_packet_load_balancing: true +session_filters: +- name: foo + typed_config: + '@type': type.googleapis.com/google.protobuf.Struct + )EOF"; + + EXPECT_THROW_WITH_MESSAGE( + setup(readConfig(config)), EnvoyException, + "Only one of use_per_packet_load_balancing or session_filters can be used."); +} + // Verify that on second data packet sent from the client, another upstream host is selected. TEST_F(UdpProxyFilterTest, PerPacketLoadBalancingBasicFlow) { InSequence s; diff --git a/test/extensions/filters/udp/udp_proxy/udp_proxy_integration_test.cc b/test/extensions/filters/udp/udp_proxy/udp_proxy_integration_test.cc index db1962639c85..6b3c0b23a032 100644 --- a/test/extensions/filters/udp/udp_proxy/udp_proxy_integration_test.cc +++ b/test/extensions/filters/udp/udp_proxy/udp_proxy_integration_test.cc @@ -530,21 +530,18 @@ TEST_P(UdpProxyIntegrationTest, BidirectionalSessionFilter) { requestResponseWithListenerAddress(*listener_address, "hello", "lo", "world1", "ld1"); } -TEST_P(UdpProxyIntegrationTest, ReadSessionFilterStopOnNewConnection) { - // In the test filter, the onNewSession() call will increase the amount of bytes to drain by 1, - // if it's set to return StopIteration. - // Therefore we have two read filters where the first one should return StopIteration, - // so we expect the overall amount of bytes to drain to increase by 1, instead of 2. +TEST_P(UdpProxyIntegrationTest, ReadSessionFilterStopOnNewSession) { + // In both filters, the onNewSession() call will increase the amount of bytes to drain by 1. setup(1, absl::nullopt, - getDrainerSessionFilterConfig({{"read", 2, 0, true, false, false, false}, - {"read", 0, 0, true, false, false, false}})); + getDrainerSessionFilterConfig( + {{"read", 2, 0, true, false, true, false}, {"read", 0, 0, true, false, true, false}})); const uint32_t port = lookupPort("listener_0"); const auto listener_address = Network::Utility::resolveUrl( fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(version_), port)); std::string request = "hello"; - std::string expected_request = "lo"; // We expect 3 bytes to drain. + std::string expected_request = "o"; // We expect 4 bytes to drain. std::string response = "world"; std::string expected_response = "world"; @@ -593,7 +590,22 @@ TEST_P(UdpProxyIntegrationTest, ReadSessionFilterStopOnRead) { EXPECT_EQ(expected_response, response_datagram.buffer_->toString()); } -TEST_P(UdpProxyIntegrationTest, ReadSessionFilterStopOnNewConnectionAndLaterContinue) { +TEST_P(UdpProxyIntegrationTest, ReadSessionFilterStopOnNewSessionButNotOnData) { + setup(1, absl::nullopt, getDrainerSessionFilterConfig({{"read", 0, 0, true}})); + const uint32_t port = lookupPort("listener_0"); + const auto listener_address = Network::Utility::resolveUrl( + fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(version_), port)); + + // Filter chain did not iteration all onNewSession(), so socket is not created. + Network::Test::UdpSyncPeer client(version_, Network::DEFAULT_UDP_MAX_DATAGRAM_SIZE); + client.write("hello1", *listener_address); + + // One datagram should be received, but none sent upstream because socket was not created. + test_server_->waitForCounterEq("udp.foo.downstream_sess_rx_datagrams", 1); + EXPECT_EQ(0, test_server_->counter("cluster.cluster_0.udp.sess_tx_datagrams")->value()); +} + +TEST_P(UdpProxyIntegrationTest, ReadSessionFilterStopOnNewSessionAndLaterContinue) { // In the test filter, the onNewSession() call will increase the amount of bytes to drain by 1, // if it's set to return StopIteration. // The first filter will StopIteration in onNewSession(), so the count will increase by 1. Then,