Skip to content

Commit

Permalink
udp_proxy: implement idle timeout and some stats (envoyproxy#8999)
Browse files Browse the repository at this point in the history
Another bunch of work towards
envoyproxy#492.

The remaining work is proper wiring up of upstream cluster
management, host health, etc. and documentation. This will
be done in the next PR.

Signed-off-by: Matt Klein <[email protected]>
  • Loading branch information
mattklein123 authored Nov 25, 2019
1 parent 95dfc51 commit 647c1ee
Show file tree
Hide file tree
Showing 23 changed files with 480 additions and 77 deletions.
12 changes: 10 additions & 2 deletions api/envoy/config/filter/udp/udp_proxy/v2alpha/udp_proxy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,19 @@ import "validate/validate.proto";

// TODO(mattklein123): docs

// Configuration for the UDP proxy filter.
message UdpProxyConfig {
oneof cluster_specifier {
// The stat prefix used when emitting UDP proxy filter stats.
string stat_prefix = 1 [(validate.rules).string = {min_bytes: 1}];

oneof route_specifier {
option (validate.required) = true;

// The upstream cluster to connect to.
string cluster = 1 [(validate.rules).string = {min_bytes: 1}];
string cluster = 2 [(validate.rules).string = {min_bytes: 1}];
}

// The idle timeout for sessions. Idle is defined as no datagrams between received or sent by
// the session. The default if not specified is 1 minute.
google.protobuf.Duration idle_timeout = 3;
}
7 changes: 7 additions & 0 deletions include/envoy/network/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,13 @@ class UdpListenerReadFilter {
*/
virtual void onData(UdpRecvData& data) PURE;

/**
* Called when there is an error event in the receive data path.
*
* @param error_code supplies the received error on the listener.
*/
virtual void onReceiveError(Api::IoError::IoErrorCode error_code) PURE;

protected:
/**
* @param callbacks supplies the read filter callbacks used to interact with the filter manager.
Expand Down
7 changes: 2 additions & 5 deletions include/envoy/network/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,6 @@ struct UdpSendData {
*/
class UdpListenerCallbacks {
public:
enum class ErrorCode { SyscallError, UnknownError };

virtual ~UdpListenerCallbacks() = default;

/**
Expand All @@ -225,10 +223,9 @@ class UdpListenerCallbacks {
* Called when there is an error event in the receive data path.
* The send side error is a return type on the send method.
*
* @param error_code ErrorCode for the error event.
* @param error_number System error number.
* @param error_code supplies the received error on the listener.
*/
virtual void onReceiveError(const ErrorCode& error_code, Api::IoError::IoErrorCode err) PURE;
virtual void onReceiveError(Api::IoError::IoErrorCode error_code) PURE;
};

/**
Expand Down
6 changes: 4 additions & 2 deletions source/common/network/udp_listener_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,11 @@ void UdpListenerImpl::handleReadCallback() {
socket_->ioHandle(), *socket_->localAddress(), *this, time_source_, packets_dropped_);
// TODO(mattklein123): Handle no error when we limit the number of packets read.
if (result->getErrorCode() != Api::IoError::IoErrorCode::Again) {
ENVOY_UDP_LOG(error, "recvmsg result {}: {}", static_cast<int>(result->getErrorCode()),
// TODO(mattklein123): When rate limited logging is implemented log this at error level
// on a periodic basis.
ENVOY_UDP_LOG(debug, "recvmsg result {}: {}", static_cast<int>(result->getErrorCode()),
result->getErrorDetails());
cb_.onReceiveError(UdpListenerCallbacks::ErrorCode::SyscallError, result->getErrorCode());
cb_.onReceiveError(result->getErrorCode());
}
}
Expand Down
2 changes: 1 addition & 1 deletion source/common/network/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ Api::IoCallUint64Result Utility::writeToSocket(IoHandle& handle, Buffer::RawSlic
send_result.err_->getErrorCode() == Api::IoError::IoErrorCode::Interrupt);

if (send_result.ok()) {
ENVOY_LOG_MISC(trace, "sendmsg sent:{} bytes", send_result.rc_);
ENVOY_LOG_MISC(trace, "sendmsg bytes {}", send_result.rc_);
} else {
ENVOY_LOG_MISC(debug, "sendmsg failed with error code {}: {}",
static_cast<int>(send_result.err_->getErrorCode()),
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/filters/udp/udp_proxy/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class UdpProxyFilterConfigFactory
createFilterFactoryFromProto(const Protobuf::Message& config,
Server::Configuration::ListenerFactoryContext& context) override {
auto shared_config = std::make_shared<UdpProxyFilterConfig>(
context.clusterManager(), context.timeSource(),
context.clusterManager(), context.timeSource(), context.scope(),
MessageUtil::downcastAndValidate<
const envoy::config::filter::udp::udp_proxy::v2alpha::UdpProxyConfig&>(
config, context.messageValidationVisitor()));
Expand Down
67 changes: 52 additions & 15 deletions source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ namespace Extensions {
namespace UdpFilters {
namespace UdpProxy {

// TODO(mattklein123): Logging
// TODO(mattklein123): Stats

void UdpProxyFilter::onData(Network::UdpRecvData& data) {
const auto active_session_it = sessions_.find(data.addresses_);
ActiveSession* active_session;
Expand All @@ -18,7 +15,7 @@ void UdpProxyFilter::onData(Network::UdpRecvData& data) {
// TODO(mattklein123): Instead of looking up the cluster each time, keep track of it via
// cluster manager callbacks.
Upstream::ThreadLocalCluster* cluster = config_->getCluster();
// TODO(mattklein123): Handle the case where the cluster does not exist.
// TODO(mattklein123): Handle the case where the cluster does not exist and add stat.
ASSERT(cluster != nullptr);

// TODO(mattklein123): Pass a context and support hash based routing.
Expand All @@ -37,18 +34,26 @@ void UdpProxyFilter::onData(Network::UdpRecvData& data) {
active_session->write(*data.buffer_);
}

void UdpProxyFilter::onReceiveError(Api::IoError::IoErrorCode) {
config_->stats().downstream_sess_rx_errors_.inc();
}

UdpProxyFilter::ActiveSession::ActiveSession(UdpProxyFilter& parent,
Network::UdpRecvData::LocalPeerAddresses&& addresses,
const Upstream::HostConstSharedPtr& host)
: parent_(parent), addresses_(std::move(addresses)), host_(host),
idle_timer_(parent.read_callbacks_->udpListener().dispatcher().createTimer(
[this] { onIdleTimer(); })),
// NOTE: The socket call can only fail due to memory/fd exhaustion. No local ephemeral port
// is bound until the first packet is sent to the upstream host.
io_handle_(host->address()->socket(Network::Address::SocketType::Datagram)),
io_handle_(parent.createIoHandle(host)),
socket_event_(parent.read_callbacks_->udpListener().dispatcher().createFileEvent(
io_handle_->fd(), [this](uint32_t) { onReadReady(); }, Event::FileTriggerType::Edge,
Event::FileReadyType::Read)) {
ENVOY_LOG(debug, "creating new session: downstream={} local={}", addresses_.peer_->asStringView(),
addresses_.local_->asStringView());
parent_.config_->stats().downstream_sess_total_.inc();
parent_.config_->stats().downstream_sess_active_.inc();

// TODO(mattklein123): Enable dropped packets socket option. In general the Socket abstraction
// does not work well right now for client sockets. It's too heavy weight and is aimed at listener
Expand All @@ -57,39 +62,71 @@ UdpProxyFilter::ActiveSession::ActiveSession(UdpProxyFilter& parent,
// handle.
}

UdpProxyFilter::ActiveSession::~ActiveSession() {
parent_.config_->stats().downstream_sess_active_.dec();
}

void UdpProxyFilter::ActiveSession::onIdleTimer() {
ENVOY_LOG(debug, "session idle timeout: downstream={} local={}", addresses_.peer_->asStringView(),
addresses_.local_->asStringView());
parent_.config_->stats().idle_timeout_.inc();
parent_.sessions_.erase(addresses_);
}

void UdpProxyFilter::ActiveSession::onReadReady() {
// TODO(mattklein123): Refresh idle timer.
idle_timer_->enableTimer(parent_.config_->sessionTimeout());

// TODO(mattklein123): We should not be passing *addresses_.local_ to this function as we are
// not trying to populate the local address for received packets.
uint32_t packets_dropped = 0;
const Api::IoErrorPtr result = Network::Utility::readPacketsFromSocket(
*io_handle_, *addresses_.local_, *this, parent_.config_->timeSource(), packets_dropped);
// TODO(mattklein123): Handle no error when we limit the number of packets read.
// TODO(mattklein123): Increment stat on failure.
ASSERT(result->getErrorCode() == Api::IoError::IoErrorCode::Again);
if (result->getErrorCode() != Api::IoError::IoErrorCode::Again) {
// TODO(mattklein123): Upstream cluster RX error stat.
}
}

void UdpProxyFilter::ActiveSession::write(const Buffer::Instance& buffer) {
ENVOY_LOG(trace, "writing {} byte datagram: downstream={} local={} upstream={}", buffer.length(),
addresses_.peer_->asStringView(), addresses_.local_->asStringView(),
ENVOY_LOG(trace, "writing {} byte datagram upstream: downstream={} local={} upstream={}",
buffer.length(), addresses_.peer_->asStringView(), addresses_.local_->asStringView(),
host_->address()->asStringView());
parent_.config_->stats().downstream_sess_rx_bytes_.add(buffer.length());
parent_.config_->stats().downstream_sess_rx_datagrams_.inc();

idle_timer_->enableTimer(parent_.config_->sessionTimeout());

// TODO(mattklein123): Refresh idle timer.
// NOTE: On the first write, a local ephemeral port is bound, and thus this write can fail due to
// port exhaustion.
// NOTE: We do not specify the local IP to use for the sendmsg call. We allow the OS to select
// the right IP based on outbound routing rules.
Api::IoCallUint64Result rc =
Network::Utility::writeToSocket(*io_handle_, buffer, nullptr, *host_->address());
// TODO(mattklein123): Increment stat on failure.
ASSERT(rc.ok());
if (!rc.ok()) {
// TODO(mattklein123): Upstream cluster TX error stat.
} else {
// TODO(mattklein123): Upstream cluster TX byte/datagram stats.
}
}

void UdpProxyFilter::ActiveSession::processPacket(Network::Address::InstanceConstSharedPtr,
Network::Address::InstanceConstSharedPtr,
Buffer::InstancePtr buffer, MonotonicTime) {
ENVOY_LOG(trace, "writing {} byte datagram downstream: downstream={} local={} upstream={}",
buffer->length(), addresses_.peer_->asStringView(), addresses_.local_->asStringView(),
host_->address()->asStringView());
const uint64_t buffer_length = buffer->length();

// TODO(mattklein123): Upstream cluster RX byte/datagram stats.

Network::UdpSendData data{addresses_.local_->ip(), *addresses_.peer_, *buffer};
const Api::IoCallUint64Result rc = parent_.read_callbacks_->udpListener().send(data);
// TODO(mattklein123): Increment stat on failure.
ASSERT(rc.ok());
if (!rc.ok()) {
parent_.config_->stats().downstream_sess_tx_errors_.inc();
} else {
parent_.config_->stats().downstream_sess_tx_bytes_.add(buffer_length);
parent_.config_->stats().downstream_sess_tx_datagrams_.inc();
}
}

} // namespace UdpProxy
Expand Down
56 changes: 51 additions & 5 deletions source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,53 @@ namespace Extensions {
namespace UdpFilters {
namespace UdpProxy {

/**
* All UDP proxy stats. @see stats_macros.h
*/
#define ALL_UDP_PROXY_STATS(COUNTER, GAUGE) \
COUNTER(downstream_sess_rx_bytes) \
COUNTER(downstream_sess_rx_datagrams) \
COUNTER(downstream_sess_rx_errors) \
COUNTER(downstream_sess_total) \
COUNTER(downstream_sess_tx_bytes) \
COUNTER(downstream_sess_tx_datagrams) \
COUNTER(downstream_sess_tx_errors) \
COUNTER(idle_timeout) \
GAUGE(downstream_sess_active, Accumulate)

/**
* Struct definition for all UDP proxy stats. @see stats_macros.h
*/
struct UdpProxyStats {
ALL_UDP_PROXY_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT)
};

class UdpProxyFilterConfig {
public:
UdpProxyFilterConfig(Upstream::ClusterManager& cluster_manager, TimeSource& time_source,
Stats::Scope& root_scope,
const envoy::config::filter::udp::udp_proxy::v2alpha::UdpProxyConfig& config)
: cluster_manager_(cluster_manager), time_source_(time_source), config_(config) {}
: cluster_manager_(cluster_manager), time_source_(time_source), cluster_(config.cluster()),
session_timeout_(PROTOBUF_GET_MS_OR_DEFAULT(config, idle_timeout, 60 * 1000)),
stats_(generateStats(config.stat_prefix(), root_scope)) {}

Upstream::ThreadLocalCluster* getCluster() const {
return cluster_manager_.get(config_.cluster());
}
Upstream::ThreadLocalCluster* getCluster() const { return cluster_manager_.get(cluster_); }
std::chrono::milliseconds sessionTimeout() const { return session_timeout_; }
UdpProxyStats& stats() const { return stats_; }
TimeSource& timeSource() const { return time_source_; }

private:
static UdpProxyStats generateStats(const std::string& stat_prefix, Stats::Scope& scope) {
const auto final_prefix = fmt::format("udp.{}", stat_prefix);
return {ALL_UDP_PROXY_STATS(POOL_COUNTER_PREFIX(scope, final_prefix),
POOL_GAUGE_PREFIX(scope, final_prefix))};
}

Upstream::ClusterManager& cluster_manager_;
TimeSource& time_source_;
const envoy::config::filter::udp::udp_proxy::v2alpha::UdpProxyConfig config_;
const std::string cluster_;
const std::chrono::milliseconds session_timeout_;
mutable UdpProxyStats stats_;
};

using UdpProxyFilterConfigSharedPtr = std::shared_ptr<const UdpProxyFilterConfig>;
Expand All @@ -42,6 +74,7 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, Logger::Loggable<L

// Network::UdpListenerReadFilter
void onData(Network::UdpRecvData& data) override;
void onReceiveError(Api::IoError::IoErrorCode error_code) override;

private:
/**
Expand All @@ -56,10 +89,12 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, Logger::Loggable<L
public:
ActiveSession(UdpProxyFilter& parent, Network::UdpRecvData::LocalPeerAddresses&& addresses,
const Upstream::HostConstSharedPtr& host);
~ActiveSession();
const Network::UdpRecvData::LocalPeerAddresses& addresses() { return addresses_; }
void write(const Buffer::Instance& buffer);

private:
void onIdleTimer();
void onReadReady();

// Network::UdpPacketProcessor
Expand All @@ -76,6 +111,12 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, Logger::Loggable<L
UdpProxyFilter& parent_;
const Network::UdpRecvData::LocalPeerAddresses addresses_;
const Upstream::HostConstSharedPtr host_;
// TODO(mattklein123): Consider replacing an idle timer for each session with a last used
// time stamp and a periodic scan of all sessions to look for timeouts. This solution is simple,
// though it might not perform well for high volume traffic. Note that this is how TCP proxy
// idle timeouts work so we should consider unifying the implementation if we move to a time
// stamp and scan approach.
const Event::TimerPtr idle_timer_;
// The IO handle is used for writing packets to the selected upstream host as well as receiving
// packets from the upstream host. Note that a a local ephemeral port is bound on the first
// write to the upstream host.
Expand Down Expand Up @@ -116,6 +157,11 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, Logger::Loggable<L
}
};

virtual Network::IoHandlePtr createIoHandle(const Upstream::HostConstSharedPtr& host) {
// Virtual so this can be overridden in unit tests.
return host->address()->socket(Network::Address::SocketType::Datagram);
}

const UdpProxyFilterConfigSharedPtr config_;
absl::flat_hash_set<ActiveSessionPtr, HeterogeneousActiveSessionHash,
HeterogeneousActiveSessionEqual>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ class ActiveQuicListener : public Network::UdpListenerCallbacks,
// Network::UdpListenerCallbacks
void onData(Network::UdpRecvData& data) override;
void onWriteReady(const Network::Socket& socket) override;
void onReceiveError(const Network::UdpListenerCallbacks::ErrorCode& /*error_code*/,
Api::IoError::IoErrorCode /*err*/) override {
void onReceiveError(Api::IoError::IoErrorCode /*error_code*/) override {
// No-op. Quic can't do anything upon listener error.
}

Expand Down
7 changes: 2 additions & 5 deletions source/server/connection_handler_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -438,11 +438,8 @@ void ActiveUdpListener::onWriteReady(const Network::Socket&) {
// data
}

void ActiveUdpListener::onReceiveError(const Network::UdpListenerCallbacks::ErrorCode&,
Api::IoError::IoErrorCode) {
// TODO(sumukhs): Determine what to do on receive error.
// Would the filters need to know on error? Can't foresee a scenario where they
// would take an action
void ActiveUdpListener::onReceiveError(Api::IoError::IoErrorCode error_code) {
read_filter_->onReceiveError(error_code);
}

void ActiveUdpListener::addReadFilter(Network::UdpListenerReadFilterPtr&& filter) {
Expand Down
3 changes: 1 addition & 2 deletions source/server/connection_handler_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,7 @@ class ActiveUdpListener : public Network::UdpListenerCallbacks,
// Network::UdpListenerCallbacks
void onData(Network::UdpRecvData& data) override;
void onWriteReady(const Network::Socket& socket) override;
void onReceiveError(const Network::UdpListenerCallbacks::ErrorCode& error_code,
Api::IoError::IoErrorCode err) override;
void onReceiveError(Api::IoError::IoErrorCode error_code) override;

// ActiveListenerImplBase
Network::Listener* listener() override { return udp_listener_.get(); }
Expand Down
Loading

0 comments on commit 647c1ee

Please sign in to comment.