Skip to content

Commit

Permalink
udp_session_filters: refactor to support multiple types of ActiveSess…
Browse files Browse the repository at this point in the history
…ion (envoyproxy#29676)

refactor to support multiple types of ActiveSession

Signed-off-by: ohadvano <[email protected]>
  • Loading branch information
ohadvano authored Sep 22, 2023
1 parent 9ecbbbe commit 94a69e5
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 113 deletions.
160 changes: 82 additions & 78 deletions source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,11 @@ UdpProxyFilter::ClusterInfo::createSession(Network::UdpRecvData::LocalPeerAddres

if (defer_socket_creation) {
ASSERT(!optional_host);
return createSessionWithOptionalHost(std::move(addresses), nullptr, true);
return createSessionWithOptionalHost(std::move(addresses), nullptr);
}

if (optional_host) {
return createSessionWithOptionalHost(std::move(addresses), optional_host, false);
return createSessionWithOptionalHost(std::move(addresses), optional_host);
}

auto host = chooseHost(addresses.peer_);
Expand All @@ -157,22 +157,17 @@ UdpProxyFilter::ClusterInfo::createSession(Network::UdpRecvData::LocalPeerAddres
return nullptr;
}

return createSessionWithOptionalHost(std::move(addresses), host, defer_socket_creation);
return createSessionWithOptionalHost(std::move(addresses), host);
}

UdpProxyFilter::ActiveSession* UdpProxyFilter::ClusterInfo::createSessionWithOptionalHost(
Network::UdpRecvData::LocalPeerAddresses&& addresses, const Upstream::HostConstSharedPtr& host,
bool defer_socket_creation) {
ASSERT((defer_socket_creation && !host) || (!defer_socket_creation && host));
auto new_session =
std::make_unique<ActiveSession>(*this, std::move(addresses), host, defer_socket_creation);
Network::UdpRecvData::LocalPeerAddresses&& addresses,
const Upstream::HostConstSharedPtr& host) {
auto new_session = std::make_unique<UdpActiveSession>(*this, std::move(addresses), host);
new_session->createFilterChain();
new_session->onNewSession();
auto new_session_ptr = new_session.get();
sessions_.emplace(std::move(new_session));
if (!defer_socket_creation) {
host_to_sessions_[host.get()].emplace(new_session_ptr);
}

return new_session_ptr;
}
Expand Down Expand Up @@ -269,58 +264,26 @@ std::atomic<uint64_t> UdpProxyFilter::ActiveSession::next_global_session_id_;

UdpProxyFilter::ActiveSession::ActiveSession(ClusterInfo& cluster,
Network::UdpRecvData::LocalPeerAddresses&& addresses,
const Upstream::HostConstSharedPtr& host,
bool defer_socket_creation)
: cluster_(cluster), use_original_src_ip_(cluster_.filter_.config_->usingOriginalSrcIp()),
addresses_(std::move(addresses)), host_(host),
const Upstream::HostConstSharedPtr& host)
: cluster_(cluster), addresses_(std::move(addresses)), host_(host),
idle_timer_(cluster.filter_.read_callbacks_->udpListener().dispatcher().createTimer(
[this] { onIdleTimer(); })),
udp_session_info_(
StreamInfo::StreamInfoImpl(cluster_.filter_.config_->timeSource(), nullptr)),
session_id_(next_global_session_id_++) {
ASSERT((defer_socket_creation && !host) || (!defer_socket_creation && host));
cluster_.filter_.config_->stats().downstream_sess_total_.inc();
cluster_.filter_.config_->stats().downstream_sess_active_.inc();
cluster_.cluster_.info()
->resourceManager(Upstream::ResourcePriority::Default)
.connections()
.inc();

if (!defer_socket_creation) {
createSocket(host);
}
}

void UdpProxyFilter::ActiveSession::createSocket(const Upstream::HostConstSharedPtr& host) {
// NOTE: The socket call can only fail due to memory/fd exhaustion. No local ephemeral port
// is bound until the first packet is sent to the upstream host.
socket_ = cluster_.filter_.createSocket(host);
socket_->ioHandle().initializeFileEvent(
cluster_.filter_.read_callbacks_->udpListener().dispatcher(),
[this](uint32_t) { onReadReady(); }, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read);

ENVOY_LOG(debug, "creating new session: downstream={} local={} upstream={}",
addresses_.peer_->asStringView(), addresses_.local_->asStringView(),
host->address()->asStringView());

if (use_original_src_ip_) {
const Network::Socket::OptionsSharedPtr socket_options =
Network::SocketOptionFactory::buildIpTransparentOptions();
const bool ok = Network::Socket::applyOptions(
socket_options, *socket_, envoy::config::core::v3::SocketOption::STATE_PREBIND);

RELEASE_ASSERT(ok, "Should never occur!");
ENVOY_LOG(debug, "The original src is enabled for address {}.",
addresses_.peer_->asStringView());
}

// TODO(mattklein123): Enable dropped packets socket option. In general the Socket abstraction
// does not work well right now for client sockets. It's too heavy weight and is aimed at listener
// sockets. We need to figure out how to either refactor Socket into something that works better
// for this use case or allow the socket option abstractions to work directly against an IO
// handle.
}
UdpProxyFilter::UdpActiveSession::UdpActiveSession(
ClusterInfo& cluster, Network::UdpRecvData::LocalPeerAddresses&& addresses,
const Upstream::HostConstSharedPtr& host)
: ActiveSession(cluster, std::move(addresses), std::move(host)),
use_original_src_ip_(cluster.filter_.config_->usingOriginalSrcIp()) {}

UdpProxyFilter::ActiveSession::~ActiveSession() {
ENVOY_LOG(debug, "deleting the session: downstream={} local={} upstream={}",
Expand Down Expand Up @@ -381,24 +344,24 @@ void UdpProxyFilter::fillProxyStreamInfo() {
udp_proxy_stats_.value().setDynamicMetadata("udp.proxy.proxy", stats_obj);
}

void UdpProxyFilter::ActiveSession::onIdleTimer() {
void UdpProxyFilter::UdpActiveSession::onIdleTimer() {
ENVOY_LOG(debug, "session idle timeout: downstream={} local={}", addresses_.peer_->asStringView(),
addresses_.local_->asStringView());
cluster_.filter_.config_->stats().idle_timeout_.inc();
cluster_.removeSession(this);
}

void UdpProxyFilter::ActiveSession::onReadReady() {
idle_timer_->enableTimer(cluster_.filter_.config_->sessionTimeout());
void UdpProxyFilter::UdpActiveSession::onReadReady() {
resetIdleTimer();

// TODO(mattklein123): We should not be passing *addresses_.local_ to this function as we are
// not trying to populate the local address for received packets.
uint32_t packets_dropped = 0;
const Api::IoErrorPtr result = Network::Utility::readPacketsFromSocket(
socket_->ioHandle(), *addresses_.local_, *this, cluster_.filter_.config_->timeSource(),
udp_socket_->ioHandle(), *addresses_.local_, *this, cluster_.filter_.config_->timeSource(),
cluster_.filter_.config_->upstreamSocketConfig().prefer_gro_, packets_dropped);
if (result == nullptr) {
socket_->ioHandle().activateFileEvents(Event::FileReadyType::Read);
udp_socket_->ioHandle().activateFileEvents(Event::FileReadyType::Read);
return;
}
if (result->getErrorCode() != Api::IoError::IoErrorCode::Again) {
Expand All @@ -423,21 +386,21 @@ void UdpProxyFilter::ActiveSession::onNewSession() {
}
}

createSocketDeferred();
createUpstream();
}

void UdpProxyFilter::ActiveSession::onData(Network::UdpRecvData& data) {
absl::string_view host = socket_ != nullptr ? host_->address()->asStringView() : "unknown";
ENVOY_LOG(trace, "received {} byte datagram from downstream: downstream={} local={} upstream={}",
data.buffer_->length(), addresses_.peer_->asStringView(),
addresses_.local_->asStringView(), host);
addresses_.local_->asStringView(),
host_ != nullptr ? host_->address()->asStringView() : "unknown");

const uint64_t rx_buffer_length = data.buffer_->length();
cluster_.filter_.config_->stats().downstream_sess_rx_bytes_.add(rx_buffer_length);
session_stats_.downstream_sess_rx_bytes_ += rx_buffer_length;
cluster_.filter_.config_->stats().downstream_sess_rx_datagrams_.inc();
++session_stats_.downstream_sess_rx_datagrams_;

idle_timer_->enableTimer(cluster_.filter_.config_->sessionTimeout());
resetIdleTimer();

for (auto& active_read_filter : read_filters_) {
auto status = active_read_filter->read_filter_->onData(data);
Expand All @@ -449,8 +412,8 @@ void UdpProxyFilter::ActiveSession::onData(Network::UdpRecvData& data) {
writeUpstream(data);
}

void UdpProxyFilter::ActiveSession::writeUpstream(Network::UdpRecvData& data) {
if (!socket_) {
void UdpProxyFilter::UdpActiveSession::writeUpstream(Network::UdpRecvData& data) {
if (!udp_socket_) {
ENVOY_LOG(debug, "cannot write upstream because the socket was not created.");
return;
}
Expand All @@ -464,7 +427,7 @@ void UdpProxyFilter::ActiveSession::writeUpstream(Network::UdpRecvData& data) {
// set. We allow the OS to select the right IP based on outbound routing rules if
// use_original_src_ip_ is not set, else use downstream peer IP as local IP.
if (!connected_ && !use_original_src_ip_) {
Api::SysCallIntResult rc = socket_->ioHandle().connect(host_->address());
Api::SysCallIntResult rc = udp_socket_->ioHandle().connect(host_->address());
if (SOCKET_FAILURE(rc.return_value_)) {
ENVOY_LOG(debug, "cannot connect: ({}) {}", rc.errno_, errorDetails(rc.errno_));
cluster_.cluster_stats_.sess_tx_errors_.inc();
Expand All @@ -474,16 +437,16 @@ void UdpProxyFilter::ActiveSession::writeUpstream(Network::UdpRecvData& data) {
connected_ = true;
}

ASSERT((connected_ || use_original_src_ip_) && socket_ && host_);
ASSERT((connected_ || use_original_src_ip_) && udp_socket_ && host_);

const uint64_t tx_buffer_length = data.buffer_->length();
ENVOY_LOG(trace, "writing {} byte datagram upstream: downstream={} local={} upstream={}",
tx_buffer_length, addresses_.peer_->asStringView(), addresses_.local_->asStringView(),
host_->address()->asStringView());

const Network::Address::Ip* local_ip = use_original_src_ip_ ? addresses_.peer_->ip() : nullptr;
Api::IoCallUint64Result rc = Network::Utility::writeToSocket(socket_->ioHandle(), *data.buffer_,
local_ip, *host_->address());
Api::IoCallUint64Result rc = Network::Utility::writeToSocket(
udp_socket_->ioHandle(), *data.buffer_, local_ip, *host_->address());

if (!rc.ok()) {
cluster_.cluster_stats_.sess_tx_errors_.inc();
Expand All @@ -509,25 +472,58 @@ void UdpProxyFilter::ActiveSession::onContinueFilterChain(ActiveReadFilter* filt
}
}

createSocketDeferred();
createUpstream();
}

void UdpProxyFilter::ActiveSession::createSocketDeferred() {
if (socket_) {
void UdpProxyFilter::UdpActiveSession::createUpstream() {
if (udp_socket_) {
// A session filter may call on continueFilterChain(), after already creating the socket,
// so we first check that the socket was not created already.
return;
}

host_ = cluster_.chooseHost(addresses_.peer_);
if (host_ == nullptr) {
ENVOY_LOG(debug, "cannot find any valid host.");
cluster_.cluster_.info()->trafficStats()->upstream_cx_none_healthy_.inc();
return;
if (!host_) {
host_ = cluster_.chooseHost(addresses_.peer_);
if (host_ == nullptr) {
ENVOY_LOG(debug, "cannot find any valid host.");
cluster_.cluster_.info()->trafficStats()->upstream_cx_none_healthy_.inc();
return;
}
}

cluster_.addSession(host_.get(), this);
createSocket(host_);
createUdpSocket(host_);
}

void UdpProxyFilter::UdpActiveSession::createUdpSocket(const Upstream::HostConstSharedPtr& host) {
// NOTE: The socket call can only fail due to memory/fd exhaustion. No local ephemeral port
// is bound until the first packet is sent to the upstream host.
udp_socket_ = cluster_.filter_.createUdpSocket(host);
udp_socket_->ioHandle().initializeFileEvent(
cluster_.filter_.read_callbacks_->udpListener().dispatcher(),
[this](uint32_t) { onReadReady(); }, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read);

ENVOY_LOG(debug, "creating new session: downstream={} local={} upstream={}",
addresses_.peer_->asStringView(), addresses_.local_->asStringView(),
host->address()->asStringView());

if (use_original_src_ip_) {
const Network::Socket::OptionsSharedPtr socket_options =
Network::SocketOptionFactory::buildIpTransparentOptions();
const bool ok = Network::Socket::applyOptions(
socket_options, *udp_socket_, envoy::config::core::v3::SocketOption::STATE_PREBIND);

RELEASE_ASSERT(ok, "Should never occur!");
ENVOY_LOG(debug, "The original src is enabled for address {}.",
addresses_.peer_->asStringView());
}

// TODO(mattklein123): Enable dropped packets socket option. In general the Socket abstraction
// does not work well right now for client sockets. It's too heavy weight and is aimed at listener
// sockets. We need to figure out how to either refactor Socket into something that works better
// for this use case or allow the socket option abstractions to work directly against an IO
// handle.
}

void UdpProxyFilter::ActiveSession::onInjectReadDatagramToFilterChain(ActiveReadFilter* filter,
Expand Down Expand Up @@ -568,14 +564,14 @@ void UdpProxyFilter::ActiveSession::onInjectWriteDatagramToFilterChain(ActiveWri
writeDownstream(data);
}

void UdpProxyFilter::ActiveSession::processPacket(
void UdpProxyFilter::UdpActiveSession::processPacket(
Network::Address::InstanceConstSharedPtr local_address,
Network::Address::InstanceConstSharedPtr peer_address, Buffer::InstancePtr buffer,
MonotonicTime receive_time) {
const uint64_t rx_buffer_length = buffer->length();
ENVOY_LOG(trace, "received {} byte datagram from upstream: downstream={} local={} upstream={}",
rx_buffer_length, addresses_.peer_->asStringView(), addresses_.local_->asStringView(),
host_->address()->asStringView());
host_ != nullptr ? host_->address()->asStringView() : "unknown");

cluster_.cluster_stats_.sess_rx_datagrams_.inc();
cluster_.cluster_.info()->trafficStats()->upstream_cx_rx_bytes_total_.add(rx_buffer_length);
Expand All @@ -592,11 +588,19 @@ void UdpProxyFilter::ActiveSession::processPacket(
writeDownstream(recv_data);
}

void UdpProxyFilter::ActiveSession::resetIdleTimer() {
if (idle_timer_ == nullptr) {
return;
}

idle_timer_->enableTimer(cluster_.filter_.config_->sessionTimeout());
}

void UdpProxyFilter::ActiveSession::writeDownstream(Network::UdpRecvData& recv_data) {
const uint64_t tx_buffer_length = recv_data.buffer_->length();
ENVOY_LOG(trace, "writing {} byte datagram downstream: downstream={} local={} upstream={}",
tx_buffer_length, addresses_.peer_->asStringView(), addresses_.local_->asStringView(),
host_->address()->asStringView());
host_ != nullptr ? host_->address()->asStringView() : "unknown");

Network::UdpSendData data{addresses_.local_->ip(), *addresses_.peer_, *recv_data.buffer_};
const Api::IoCallUint64Result rc = cluster_.filter_.read_callbacks_->udpListener().send(data);
Expand Down
Loading

0 comments on commit 94a69e5

Please sign in to comment.