From 94a69e5dc11ecc76e227ed5e366fa15e857aff9e Mon Sep 17 00:00:00 2001 From: ohadvano <49730675+ohadvano@users.noreply.github.com> Date: Fri, 22 Sep 2023 18:14:14 +0300 Subject: [PATCH] udp_session_filters: refactor to support multiple types of ActiveSession (#29676) refactor to support multiple types of ActiveSession Signed-off-by: ohadvano --- .../filters/udp/udp_proxy/udp_proxy_filter.cc | 160 +++++++++--------- .../filters/udp/udp_proxy/udp_proxy_filter.h | 88 ++++++---- .../udp/udp_proxy/udp_proxy_filter_test.cc | 4 +- 3 files changed, 139 insertions(+), 113 deletions(-) diff --git a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc index 0adb70307def..23e1c3754b69 100644 --- a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc +++ b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc @@ -143,11 +143,11 @@ UdpProxyFilter::ClusterInfo::createSession(Network::UdpRecvData::LocalPeerAddres if (defer_socket_creation) { ASSERT(!optional_host); - return createSessionWithOptionalHost(std::move(addresses), nullptr, true); + return createSessionWithOptionalHost(std::move(addresses), nullptr); } if (optional_host) { - return createSessionWithOptionalHost(std::move(addresses), optional_host, false); + return createSessionWithOptionalHost(std::move(addresses), optional_host); } auto host = chooseHost(addresses.peer_); @@ -157,22 +157,17 @@ UdpProxyFilter::ClusterInfo::createSession(Network::UdpRecvData::LocalPeerAddres return nullptr; } - return createSessionWithOptionalHost(std::move(addresses), host, defer_socket_creation); + return createSessionWithOptionalHost(std::move(addresses), host); } UdpProxyFilter::ActiveSession* UdpProxyFilter::ClusterInfo::createSessionWithOptionalHost( - Network::UdpRecvData::LocalPeerAddresses&& addresses, const Upstream::HostConstSharedPtr& host, - bool defer_socket_creation) { - ASSERT((defer_socket_creation && !host) || (!defer_socket_creation && host)); - auto new_session = - std::make_unique(*this, std::move(addresses), host, defer_socket_creation); + Network::UdpRecvData::LocalPeerAddresses&& addresses, + const Upstream::HostConstSharedPtr& host) { + auto new_session = std::make_unique(*this, std::move(addresses), host); new_session->createFilterChain(); new_session->onNewSession(); auto new_session_ptr = new_session.get(); sessions_.emplace(std::move(new_session)); - if (!defer_socket_creation) { - host_to_sessions_[host.get()].emplace(new_session_ptr); - } return new_session_ptr; } @@ -269,58 +264,26 @@ std::atomic UdpProxyFilter::ActiveSession::next_global_session_id_; UdpProxyFilter::ActiveSession::ActiveSession(ClusterInfo& cluster, Network::UdpRecvData::LocalPeerAddresses&& addresses, - const Upstream::HostConstSharedPtr& host, - bool defer_socket_creation) - : cluster_(cluster), use_original_src_ip_(cluster_.filter_.config_->usingOriginalSrcIp()), - addresses_(std::move(addresses)), host_(host), + const Upstream::HostConstSharedPtr& host) + : cluster_(cluster), addresses_(std::move(addresses)), host_(host), idle_timer_(cluster.filter_.read_callbacks_->udpListener().dispatcher().createTimer( [this] { onIdleTimer(); })), udp_session_info_( StreamInfo::StreamInfoImpl(cluster_.filter_.config_->timeSource(), nullptr)), session_id_(next_global_session_id_++) { - ASSERT((defer_socket_creation && !host) || (!defer_socket_creation && host)); cluster_.filter_.config_->stats().downstream_sess_total_.inc(); cluster_.filter_.config_->stats().downstream_sess_active_.inc(); cluster_.cluster_.info() ->resourceManager(Upstream::ResourcePriority::Default) .connections() .inc(); - - if (!defer_socket_creation) { - createSocket(host); - } } -void UdpProxyFilter::ActiveSession::createSocket(const Upstream::HostConstSharedPtr& host) { - // NOTE: The socket call can only fail due to memory/fd exhaustion. No local ephemeral port - // is bound until the first packet is sent to the upstream host. - socket_ = cluster_.filter_.createSocket(host); - socket_->ioHandle().initializeFileEvent( - cluster_.filter_.read_callbacks_->udpListener().dispatcher(), - [this](uint32_t) { onReadReady(); }, Event::PlatformDefaultTriggerType, - Event::FileReadyType::Read); - - ENVOY_LOG(debug, "creating new session: downstream={} local={} upstream={}", - addresses_.peer_->asStringView(), addresses_.local_->asStringView(), - host->address()->asStringView()); - - if (use_original_src_ip_) { - const Network::Socket::OptionsSharedPtr socket_options = - Network::SocketOptionFactory::buildIpTransparentOptions(); - const bool ok = Network::Socket::applyOptions( - socket_options, *socket_, envoy::config::core::v3::SocketOption::STATE_PREBIND); - - RELEASE_ASSERT(ok, "Should never occur!"); - ENVOY_LOG(debug, "The original src is enabled for address {}.", - addresses_.peer_->asStringView()); - } - - // TODO(mattklein123): Enable dropped packets socket option. In general the Socket abstraction - // does not work well right now for client sockets. It's too heavy weight and is aimed at listener - // sockets. We need to figure out how to either refactor Socket into something that works better - // for this use case or allow the socket option abstractions to work directly against an IO - // handle. -} +UdpProxyFilter::UdpActiveSession::UdpActiveSession( + ClusterInfo& cluster, Network::UdpRecvData::LocalPeerAddresses&& addresses, + const Upstream::HostConstSharedPtr& host) + : ActiveSession(cluster, std::move(addresses), std::move(host)), + use_original_src_ip_(cluster.filter_.config_->usingOriginalSrcIp()) {} UdpProxyFilter::ActiveSession::~ActiveSession() { ENVOY_LOG(debug, "deleting the session: downstream={} local={} upstream={}", @@ -381,24 +344,24 @@ void UdpProxyFilter::fillProxyStreamInfo() { udp_proxy_stats_.value().setDynamicMetadata("udp.proxy.proxy", stats_obj); } -void UdpProxyFilter::ActiveSession::onIdleTimer() { +void UdpProxyFilter::UdpActiveSession::onIdleTimer() { ENVOY_LOG(debug, "session idle timeout: downstream={} local={}", addresses_.peer_->asStringView(), addresses_.local_->asStringView()); cluster_.filter_.config_->stats().idle_timeout_.inc(); cluster_.removeSession(this); } -void UdpProxyFilter::ActiveSession::onReadReady() { - idle_timer_->enableTimer(cluster_.filter_.config_->sessionTimeout()); +void UdpProxyFilter::UdpActiveSession::onReadReady() { + resetIdleTimer(); // TODO(mattklein123): We should not be passing *addresses_.local_ to this function as we are // not trying to populate the local address for received packets. uint32_t packets_dropped = 0; const Api::IoErrorPtr result = Network::Utility::readPacketsFromSocket( - socket_->ioHandle(), *addresses_.local_, *this, cluster_.filter_.config_->timeSource(), + udp_socket_->ioHandle(), *addresses_.local_, *this, cluster_.filter_.config_->timeSource(), cluster_.filter_.config_->upstreamSocketConfig().prefer_gro_, packets_dropped); if (result == nullptr) { - socket_->ioHandle().activateFileEvents(Event::FileReadyType::Read); + udp_socket_->ioHandle().activateFileEvents(Event::FileReadyType::Read); return; } if (result->getErrorCode() != Api::IoError::IoErrorCode::Again) { @@ -423,21 +386,21 @@ void UdpProxyFilter::ActiveSession::onNewSession() { } } - createSocketDeferred(); + createUpstream(); } void UdpProxyFilter::ActiveSession::onData(Network::UdpRecvData& data) { - absl::string_view host = socket_ != nullptr ? host_->address()->asStringView() : "unknown"; ENVOY_LOG(trace, "received {} byte datagram from downstream: downstream={} local={} upstream={}", data.buffer_->length(), addresses_.peer_->asStringView(), - addresses_.local_->asStringView(), host); + addresses_.local_->asStringView(), + host_ != nullptr ? host_->address()->asStringView() : "unknown"); + const uint64_t rx_buffer_length = data.buffer_->length(); cluster_.filter_.config_->stats().downstream_sess_rx_bytes_.add(rx_buffer_length); session_stats_.downstream_sess_rx_bytes_ += rx_buffer_length; cluster_.filter_.config_->stats().downstream_sess_rx_datagrams_.inc(); ++session_stats_.downstream_sess_rx_datagrams_; - - idle_timer_->enableTimer(cluster_.filter_.config_->sessionTimeout()); + resetIdleTimer(); for (auto& active_read_filter : read_filters_) { auto status = active_read_filter->read_filter_->onData(data); @@ -449,8 +412,8 @@ void UdpProxyFilter::ActiveSession::onData(Network::UdpRecvData& data) { writeUpstream(data); } -void UdpProxyFilter::ActiveSession::writeUpstream(Network::UdpRecvData& data) { - if (!socket_) { +void UdpProxyFilter::UdpActiveSession::writeUpstream(Network::UdpRecvData& data) { + if (!udp_socket_) { ENVOY_LOG(debug, "cannot write upstream because the socket was not created."); return; } @@ -464,7 +427,7 @@ void UdpProxyFilter::ActiveSession::writeUpstream(Network::UdpRecvData& data) { // set. We allow the OS to select the right IP based on outbound routing rules if // use_original_src_ip_ is not set, else use downstream peer IP as local IP. if (!connected_ && !use_original_src_ip_) { - Api::SysCallIntResult rc = socket_->ioHandle().connect(host_->address()); + Api::SysCallIntResult rc = udp_socket_->ioHandle().connect(host_->address()); if (SOCKET_FAILURE(rc.return_value_)) { ENVOY_LOG(debug, "cannot connect: ({}) {}", rc.errno_, errorDetails(rc.errno_)); cluster_.cluster_stats_.sess_tx_errors_.inc(); @@ -474,7 +437,7 @@ void UdpProxyFilter::ActiveSession::writeUpstream(Network::UdpRecvData& data) { connected_ = true; } - ASSERT((connected_ || use_original_src_ip_) && socket_ && host_); + ASSERT((connected_ || use_original_src_ip_) && udp_socket_ && host_); const uint64_t tx_buffer_length = data.buffer_->length(); ENVOY_LOG(trace, "writing {} byte datagram upstream: downstream={} local={} upstream={}", @@ -482,8 +445,8 @@ void UdpProxyFilter::ActiveSession::writeUpstream(Network::UdpRecvData& data) { host_->address()->asStringView()); const Network::Address::Ip* local_ip = use_original_src_ip_ ? addresses_.peer_->ip() : nullptr; - Api::IoCallUint64Result rc = Network::Utility::writeToSocket(socket_->ioHandle(), *data.buffer_, - local_ip, *host_->address()); + Api::IoCallUint64Result rc = Network::Utility::writeToSocket( + udp_socket_->ioHandle(), *data.buffer_, local_ip, *host_->address()); if (!rc.ok()) { cluster_.cluster_stats_.sess_tx_errors_.inc(); @@ -509,25 +472,58 @@ void UdpProxyFilter::ActiveSession::onContinueFilterChain(ActiveReadFilter* filt } } - createSocketDeferred(); + createUpstream(); } -void UdpProxyFilter::ActiveSession::createSocketDeferred() { - if (socket_) { +void UdpProxyFilter::UdpActiveSession::createUpstream() { + if (udp_socket_) { // A session filter may call on continueFilterChain(), after already creating the socket, // so we first check that the socket was not created already. return; } - host_ = cluster_.chooseHost(addresses_.peer_); - if (host_ == nullptr) { - ENVOY_LOG(debug, "cannot find any valid host."); - cluster_.cluster_.info()->trafficStats()->upstream_cx_none_healthy_.inc(); - return; + if (!host_) { + host_ = cluster_.chooseHost(addresses_.peer_); + if (host_ == nullptr) { + ENVOY_LOG(debug, "cannot find any valid host."); + cluster_.cluster_.info()->trafficStats()->upstream_cx_none_healthy_.inc(); + return; + } } cluster_.addSession(host_.get(), this); - createSocket(host_); + createUdpSocket(host_); +} + +void UdpProxyFilter::UdpActiveSession::createUdpSocket(const Upstream::HostConstSharedPtr& host) { + // NOTE: The socket call can only fail due to memory/fd exhaustion. No local ephemeral port + // is bound until the first packet is sent to the upstream host. + udp_socket_ = cluster_.filter_.createUdpSocket(host); + udp_socket_->ioHandle().initializeFileEvent( + cluster_.filter_.read_callbacks_->udpListener().dispatcher(), + [this](uint32_t) { onReadReady(); }, Event::PlatformDefaultTriggerType, + Event::FileReadyType::Read); + + ENVOY_LOG(debug, "creating new session: downstream={} local={} upstream={}", + addresses_.peer_->asStringView(), addresses_.local_->asStringView(), + host->address()->asStringView()); + + if (use_original_src_ip_) { + const Network::Socket::OptionsSharedPtr socket_options = + Network::SocketOptionFactory::buildIpTransparentOptions(); + const bool ok = Network::Socket::applyOptions( + socket_options, *udp_socket_, envoy::config::core::v3::SocketOption::STATE_PREBIND); + + RELEASE_ASSERT(ok, "Should never occur!"); + ENVOY_LOG(debug, "The original src is enabled for address {}.", + addresses_.peer_->asStringView()); + } + + // TODO(mattklein123): Enable dropped packets socket option. In general the Socket abstraction + // does not work well right now for client sockets. It's too heavy weight and is aimed at listener + // sockets. We need to figure out how to either refactor Socket into something that works better + // for this use case or allow the socket option abstractions to work directly against an IO + // handle. } void UdpProxyFilter::ActiveSession::onInjectReadDatagramToFilterChain(ActiveReadFilter* filter, @@ -568,14 +564,14 @@ void UdpProxyFilter::ActiveSession::onInjectWriteDatagramToFilterChain(ActiveWri writeDownstream(data); } -void UdpProxyFilter::ActiveSession::processPacket( +void UdpProxyFilter::UdpActiveSession::processPacket( Network::Address::InstanceConstSharedPtr local_address, Network::Address::InstanceConstSharedPtr peer_address, Buffer::InstancePtr buffer, MonotonicTime receive_time) { const uint64_t rx_buffer_length = buffer->length(); ENVOY_LOG(trace, "received {} byte datagram from upstream: downstream={} local={} upstream={}", rx_buffer_length, addresses_.peer_->asStringView(), addresses_.local_->asStringView(), - host_->address()->asStringView()); + host_ != nullptr ? host_->address()->asStringView() : "unknown"); cluster_.cluster_stats_.sess_rx_datagrams_.inc(); cluster_.cluster_.info()->trafficStats()->upstream_cx_rx_bytes_total_.add(rx_buffer_length); @@ -592,11 +588,19 @@ void UdpProxyFilter::ActiveSession::processPacket( writeDownstream(recv_data); } +void UdpProxyFilter::ActiveSession::resetIdleTimer() { + if (idle_timer_ == nullptr) { + return; + } + + idle_timer_->enableTimer(cluster_.filter_.config_->sessionTimeout()); +} + void UdpProxyFilter::ActiveSession::writeDownstream(Network::UdpRecvData& recv_data) { const uint64_t tx_buffer_length = recv_data.buffer_->length(); ENVOY_LOG(trace, "writing {} byte datagram downstream: downstream={} local={} upstream={}", tx_buffer_length, addresses_.peer_->asStringView(), addresses_.local_->asStringView(), - host_->address()->asStringView()); + host_ != nullptr ? host_->address()->asStringView() : "unknown"); Network::UdpSendData data{addresses_.local_->ip(), *addresses_.peer_, *recv_data.buffer_}; const Api::IoCallUint64Result rc = cluster_.filter_.read_callbacks_->udpListener().send(data); diff --git a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h index f006bba45d02..003d03ab70d6 100644 --- a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h +++ b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h @@ -176,11 +176,12 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, * will be hashed to the same session and will be forwarded to the same upstream, using the same * local ephemeral IP/port. */ - class ActiveSession : public Network::UdpPacketProcessor, public FilterChainFactoryCallbacks { + class ActiveSession : public FilterChainFactoryCallbacks { public: ActiveSession(ClusterInfo& parent, Network::UdpRecvData::LocalPeerAddresses&& addresses, - const Upstream::HostConstSharedPtr& host, bool defer_socket_creation); + const Upstream::HostConstSharedPtr& host); ~ActiveSession() override; + const Network::UdpRecvData::LocalPeerAddresses& addresses() const { return addresses_; } absl::optional> host() const { if (host_) { @@ -189,12 +190,15 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, return absl::nullopt; } + void onNewSession(); void onData(Network::UdpRecvData& data); - void writeUpstream(Network::UdpRecvData& data); void writeDownstream(Network::UdpRecvData& data); - void createSocket(const Upstream::HostConstSharedPtr& host); - void createSocketDeferred(); + void resetIdleTimer(); + + virtual void createUpstream() PURE; + virtual void writeUpstream(Network::UdpRecvData& data) PURE; + virtual void onIdleTimer() PURE; void createFilterChain() { cluster_.filter_.config_->sessionFilterFactory().createFilterChain(*this); @@ -229,26 +233,9 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, LinkedList::moveIntoList(std::move(write_wrapper), write_filters_); }; - private: - void onIdleTimer(); - void onReadReady(); + protected: void fillSessionStreamInfo(); - // Network::UdpPacketProcessor - void processPacket(Network::Address::InstanceConstSharedPtr local_address, - Network::Address::InstanceConstSharedPtr peer_address, - Buffer::InstancePtr buffer, MonotonicTime receive_time) override; - uint64_t maxDatagramSize() const override { - return cluster_.filter_.config_->upstreamSocketConfig().max_rx_datagram_size_; - } - void onDatagramsDropped(uint32_t dropped) override { - cluster_.cluster_stats_.sess_rx_datagrams_dropped_.add(dropped); - } - size_t numPacketsExpectedPerEventLoop() const final { - // TODO(mattklein123) change this to a reasonable number if needed. - return Network::MAX_NUM_PACKETS_PER_EVENT_LOOP; - } - /** * Struct definition for session access logging. */ @@ -264,7 +251,6 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, static std::atomic next_global_session_id_; ClusterInfo& cluster_; - const bool use_original_src_ip_; const Network::UdpRecvData::LocalPeerAddresses addresses_; Upstream::HostConstSharedPtr host_; // TODO(mattklein123): Consider replacing an idle timer for each session with a last used @@ -273,12 +259,6 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, // idle timeouts work so we should consider unifying the implementation if we move to a time // stamp and scan approach. const Event::TimerPtr idle_timer_; - // The socket is used for writing packets to the selected upstream host as well as receiving - // packets from the upstream host. Note that a a local ephemeral port is bound on the first - // write to the upstream host. - Network::SocketPtr socket_; - // The socket has been connected to avoid port exhaustion. - bool connected_{}; UdpProxySessionStats session_stats_{}; StreamInfo::StreamInfoImpl udp_session_info_; @@ -289,6 +269,48 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, using ActiveSessionPtr = std::unique_ptr; + class UdpActiveSession : public Network::UdpPacketProcessor, public ActiveSession { + public: + UdpActiveSession(ClusterInfo& parent, Network::UdpRecvData::LocalPeerAddresses&& addresses, + const Upstream::HostConstSharedPtr& host); + ~UdpActiveSession() override = default; + + // ActiveSession + void createUpstream() override; + void writeUpstream(Network::UdpRecvData& data) override; + void onIdleTimer() override; + + // Network::UdpPacketProcessor + void processPacket(Network::Address::InstanceConstSharedPtr local_address, + Network::Address::InstanceConstSharedPtr peer_address, + Buffer::InstancePtr buffer, MonotonicTime receive_time) override; + + uint64_t maxDatagramSize() const override { + return cluster_.filter_.config_->upstreamSocketConfig().max_rx_datagram_size_; + } + + void onDatagramsDropped(uint32_t dropped) override { + cluster_.cluster_stats_.sess_rx_datagrams_dropped_.add(dropped); + } + + size_t numPacketsExpectedPerEventLoop() const final { + // TODO(mattklein123) change this to a reasonable number if needed. + return Network::MAX_NUM_PACKETS_PER_EVENT_LOOP; + } + + private: + void onReadReady(); + void createUdpSocket(const Upstream::HostConstSharedPtr& host); + + // The socket is used for writing packets to the selected upstream host as well as receiving + // packets from the upstream host. Note that a a local ephemeral port is bound on the first + // write to the upstream host. + Network::SocketPtr udp_socket_; + // The socket has been connected to avoid port exhaustion. + bool connected_{}; + const bool use_original_src_ip_; + }; + struct LocalPeerHostAddresses { const Network::UdpRecvData::LocalPeerAddresses& local_peer_addresses_; absl::optional> host_; @@ -390,10 +412,10 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, const auto final_prefix = "udp"; return {ALL_UDP_PROXY_UPSTREAM_STATS(POOL_COUNTER_PREFIX(scope, final_prefix))}; } + ActiveSession* createSessionWithOptionalHost(Network::UdpRecvData::LocalPeerAddresses&& addresses, - const Upstream::HostConstSharedPtr& host, - bool defer_socket_creation); + const Upstream::HostConstSharedPtr& host); Envoy::Common::CallbackHandlePtr member_update_cb_handle_; absl::flat_hash_map> @@ -423,7 +445,7 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, Network::FilterStatus onData(Network::UdpRecvData& data) override; }; - virtual Network::SocketPtr createSocket(const Upstream::HostConstSharedPtr& host) { + virtual Network::SocketPtr createUdpSocket(const Upstream::HostConstSharedPtr& host) { // Virtual so this can be overridden in unit tests. return std::make_unique(Network::Socket::Type::Datagram, host->address(), nullptr, Network::SocketCreationOptions{}); diff --git a/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc b/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc index 71c3a2af3fd5..cc7333c12c5f 100644 --- a/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc +++ b/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc @@ -43,7 +43,7 @@ class TestUdpProxyFilter : public UdpProxyFilter { public: using UdpProxyFilter::UdpProxyFilter; - MOCK_METHOD(Network::SocketPtr, createSocket, (const Upstream::HostConstSharedPtr& host)); + MOCK_METHOD(Network::SocketPtr, createUdpSocket, (const Upstream::HostConstSharedPtr& host)); }; Api::IoCallUint64Result makeNoError(uint64_t rc) { @@ -240,7 +240,7 @@ class UdpProxyFilterTest : public UdpProxyFilterBase { test_sessions_.emplace_back(*this, address); TestSession& new_session = test_sessions_.back(); new_session.idle_timer_ = new Event::MockTimer(&callbacks_.udp_listener_.dispatcher_); - EXPECT_CALL(*filter_, createSocket(_)) + EXPECT_CALL(*filter_, createUdpSocket(_)) .WillOnce(Return(ByMove(Network::SocketPtr{test_sessions_.back().socket_}))); EXPECT_CALL( *new_session.socket_->io_handle_,