diff --git a/api/envoy/config/filter/udp/udp_proxy/v2alpha/udp_proxy.proto b/api/envoy/config/filter/udp/udp_proxy/v2alpha/udp_proxy.proto index 5f2db4a91814..68603f38d327 100644 --- a/api/envoy/config/filter/udp/udp_proxy/v2alpha/udp_proxy.proto +++ b/api/envoy/config/filter/udp/udp_proxy/v2alpha/udp_proxy.proto @@ -12,11 +12,19 @@ import "validate/validate.proto"; // TODO(mattklein123): docs +// Configuration for the UDP proxy filter. message UdpProxyConfig { - oneof cluster_specifier { + // The stat prefix used when emitting UDP proxy filter stats. + string stat_prefix = 1 [(validate.rules).string = {min_bytes: 1}]; + + oneof route_specifier { option (validate.required) = true; // The upstream cluster to connect to. - string cluster = 1 [(validate.rules).string = {min_bytes: 1}]; + string cluster = 2 [(validate.rules).string = {min_bytes: 1}]; } + + // The idle timeout for sessions. Idle is defined as no datagrams between received or sent by + // the session. The default if not specified is 1 minute. + google.protobuf.Duration idle_timeout = 3; } diff --git a/include/envoy/network/filter.h b/include/envoy/network/filter.h index 0019ccedcec3..42cd0a945779 100644 --- a/include/envoy/network/filter.h +++ b/include/envoy/network/filter.h @@ -378,6 +378,13 @@ class UdpListenerReadFilter { */ virtual void onData(UdpRecvData& data) PURE; + /** + * Called when there is an error event in the receive data path. + * + * @param error_code supplies the received error on the listener. + */ + virtual void onReceiveError(Api::IoError::IoErrorCode error_code) PURE; + protected: /** * @param callbacks supplies the read filter callbacks used to interact with the filter manager. diff --git a/include/envoy/network/listener.h b/include/envoy/network/listener.h index 44067f5ce202..162c79aae80f 100644 --- a/include/envoy/network/listener.h +++ b/include/envoy/network/listener.h @@ -201,8 +201,6 @@ struct UdpSendData { */ class UdpListenerCallbacks { public: - enum class ErrorCode { SyscallError, UnknownError }; - virtual ~UdpListenerCallbacks() = default; /** @@ -225,10 +223,9 @@ class UdpListenerCallbacks { * Called when there is an error event in the receive data path. * The send side error is a return type on the send method. * - * @param error_code ErrorCode for the error event. - * @param error_number System error number. + * @param error_code supplies the received error on the listener. */ - virtual void onReceiveError(const ErrorCode& error_code, Api::IoError::IoErrorCode err) PURE; + virtual void onReceiveError(Api::IoError::IoErrorCode error_code) PURE; }; /** diff --git a/source/common/network/udp_listener_impl.cc b/source/common/network/udp_listener_impl.cc index 66fd27ef978d..d864c85032a7 100644 --- a/source/common/network/udp_listener_impl.cc +++ b/source/common/network/udp_listener_impl.cc @@ -72,9 +72,11 @@ void UdpListenerImpl::handleReadCallback() { socket_->ioHandle(), *socket_->localAddress(), *this, time_source_, packets_dropped_); // TODO(mattklein123): Handle no error when we limit the number of packets read. if (result->getErrorCode() != Api::IoError::IoErrorCode::Again) { - ENVOY_UDP_LOG(error, "recvmsg result {}: {}", static_cast(result->getErrorCode()), + // TODO(mattklein123): When rate limited logging is implemented log this at error level + // on a periodic basis. + ENVOY_UDP_LOG(debug, "recvmsg result {}: {}", static_cast(result->getErrorCode()), result->getErrorDetails()); - cb_.onReceiveError(UdpListenerCallbacks::ErrorCode::SyscallError, result->getErrorCode()); + cb_.onReceiveError(result->getErrorCode()); } } diff --git a/source/common/network/utility.cc b/source/common/network/utility.cc index 4392de6e9792..b288bf41a93d 100644 --- a/source/common/network/utility.cc +++ b/source/common/network/utility.cc @@ -514,7 +514,7 @@ Api::IoCallUint64Result Utility::writeToSocket(IoHandle& handle, Buffer::RawSlic send_result.err_->getErrorCode() == Api::IoError::IoErrorCode::Interrupt); if (send_result.ok()) { - ENVOY_LOG_MISC(trace, "sendmsg sent:{} bytes", send_result.rc_); + ENVOY_LOG_MISC(trace, "sendmsg bytes {}", send_result.rc_); } else { ENVOY_LOG_MISC(debug, "sendmsg failed with error code {}: {}", static_cast(send_result.err_->getErrorCode()), diff --git a/source/extensions/filters/udp/udp_proxy/config.h b/source/extensions/filters/udp/udp_proxy/config.h index df69432eef07..2e6178c99b11 100644 --- a/source/extensions/filters/udp/udp_proxy/config.h +++ b/source/extensions/filters/udp/udp_proxy/config.h @@ -22,7 +22,7 @@ class UdpProxyFilterConfigFactory createFilterFactoryFromProto(const Protobuf::Message& config, Server::Configuration::ListenerFactoryContext& context) override { auto shared_config = std::make_shared( - context.clusterManager(), context.timeSource(), + context.clusterManager(), context.timeSource(), context.scope(), MessageUtil::downcastAndValidate< const envoy::config::filter::udp::udp_proxy::v2alpha::UdpProxyConfig&>( config, context.messageValidationVisitor())); diff --git a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc index 315277b7253d..ad2eef579090 100644 --- a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc +++ b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc @@ -7,9 +7,6 @@ namespace Extensions { namespace UdpFilters { namespace UdpProxy { -// TODO(mattklein123): Logging -// TODO(mattklein123): Stats - void UdpProxyFilter::onData(Network::UdpRecvData& data) { const auto active_session_it = sessions_.find(data.addresses_); ActiveSession* active_session; @@ -18,7 +15,7 @@ void UdpProxyFilter::onData(Network::UdpRecvData& data) { // TODO(mattklein123): Instead of looking up the cluster each time, keep track of it via // cluster manager callbacks. Upstream::ThreadLocalCluster* cluster = config_->getCluster(); - // TODO(mattklein123): Handle the case where the cluster does not exist. + // TODO(mattklein123): Handle the case where the cluster does not exist and add stat. ASSERT(cluster != nullptr); // TODO(mattklein123): Pass a context and support hash based routing. @@ -37,18 +34,26 @@ void UdpProxyFilter::onData(Network::UdpRecvData& data) { active_session->write(*data.buffer_); } +void UdpProxyFilter::onReceiveError(Api::IoError::IoErrorCode) { + config_->stats().downstream_sess_rx_errors_.inc(); +} + UdpProxyFilter::ActiveSession::ActiveSession(UdpProxyFilter& parent, Network::UdpRecvData::LocalPeerAddresses&& addresses, const Upstream::HostConstSharedPtr& host) : parent_(parent), addresses_(std::move(addresses)), host_(host), + idle_timer_(parent.read_callbacks_->udpListener().dispatcher().createTimer( + [this] { onIdleTimer(); })), // NOTE: The socket call can only fail due to memory/fd exhaustion. No local ephemeral port // is bound until the first packet is sent to the upstream host. - io_handle_(host->address()->socket(Network::Address::SocketType::Datagram)), + io_handle_(parent.createIoHandle(host)), socket_event_(parent.read_callbacks_->udpListener().dispatcher().createFileEvent( io_handle_->fd(), [this](uint32_t) { onReadReady(); }, Event::FileTriggerType::Edge, Event::FileReadyType::Read)) { ENVOY_LOG(debug, "creating new session: downstream={} local={}", addresses_.peer_->asStringView(), addresses_.local_->asStringView()); + parent_.config_->stats().downstream_sess_total_.inc(); + parent_.config_->stats().downstream_sess_active_.inc(); // TODO(mattklein123): Enable dropped packets socket option. In general the Socket abstraction // does not work well right now for client sockets. It's too heavy weight and is aimed at listener @@ -57,39 +62,71 @@ UdpProxyFilter::ActiveSession::ActiveSession(UdpProxyFilter& parent, // handle. } +UdpProxyFilter::ActiveSession::~ActiveSession() { + parent_.config_->stats().downstream_sess_active_.dec(); +} + +void UdpProxyFilter::ActiveSession::onIdleTimer() { + ENVOY_LOG(debug, "session idle timeout: downstream={} local={}", addresses_.peer_->asStringView(), + addresses_.local_->asStringView()); + parent_.config_->stats().idle_timeout_.inc(); + parent_.sessions_.erase(addresses_); +} + void UdpProxyFilter::ActiveSession::onReadReady() { - // TODO(mattklein123): Refresh idle timer. + idle_timer_->enableTimer(parent_.config_->sessionTimeout()); + + // TODO(mattklein123): We should not be passing *addresses_.local_ to this function as we are + // not trying to populate the local address for received packets. uint32_t packets_dropped = 0; const Api::IoErrorPtr result = Network::Utility::readPacketsFromSocket( *io_handle_, *addresses_.local_, *this, parent_.config_->timeSource(), packets_dropped); // TODO(mattklein123): Handle no error when we limit the number of packets read. - // TODO(mattklein123): Increment stat on failure. - ASSERT(result->getErrorCode() == Api::IoError::IoErrorCode::Again); + if (result->getErrorCode() != Api::IoError::IoErrorCode::Again) { + // TODO(mattklein123): Upstream cluster RX error stat. + } } void UdpProxyFilter::ActiveSession::write(const Buffer::Instance& buffer) { - ENVOY_LOG(trace, "writing {} byte datagram: downstream={} local={} upstream={}", buffer.length(), - addresses_.peer_->asStringView(), addresses_.local_->asStringView(), + ENVOY_LOG(trace, "writing {} byte datagram upstream: downstream={} local={} upstream={}", + buffer.length(), addresses_.peer_->asStringView(), addresses_.local_->asStringView(), host_->address()->asStringView()); + parent_.config_->stats().downstream_sess_rx_bytes_.add(buffer.length()); + parent_.config_->stats().downstream_sess_rx_datagrams_.inc(); + + idle_timer_->enableTimer(parent_.config_->sessionTimeout()); - // TODO(mattklein123): Refresh idle timer. // NOTE: On the first write, a local ephemeral port is bound, and thus this write can fail due to // port exhaustion. // NOTE: We do not specify the local IP to use for the sendmsg call. We allow the OS to select // the right IP based on outbound routing rules. Api::IoCallUint64Result rc = Network::Utility::writeToSocket(*io_handle_, buffer, nullptr, *host_->address()); - // TODO(mattklein123): Increment stat on failure. - ASSERT(rc.ok()); + if (!rc.ok()) { + // TODO(mattklein123): Upstream cluster TX error stat. + } else { + // TODO(mattklein123): Upstream cluster TX byte/datagram stats. + } } void UdpProxyFilter::ActiveSession::processPacket(Network::Address::InstanceConstSharedPtr, Network::Address::InstanceConstSharedPtr, Buffer::InstancePtr buffer, MonotonicTime) { + ENVOY_LOG(trace, "writing {} byte datagram downstream: downstream={} local={} upstream={}", + buffer->length(), addresses_.peer_->asStringView(), addresses_.local_->asStringView(), + host_->address()->asStringView()); + const uint64_t buffer_length = buffer->length(); + + // TODO(mattklein123): Upstream cluster RX byte/datagram stats. + Network::UdpSendData data{addresses_.local_->ip(), *addresses_.peer_, *buffer}; const Api::IoCallUint64Result rc = parent_.read_callbacks_->udpListener().send(data); - // TODO(mattklein123): Increment stat on failure. - ASSERT(rc.ok()); + if (!rc.ok()) { + parent_.config_->stats().downstream_sess_tx_errors_.inc(); + } else { + parent_.config_->stats().downstream_sess_tx_bytes_.add(buffer_length); + parent_.config_->stats().downstream_sess_tx_datagrams_.inc(); + } } } // namespace UdpProxy diff --git a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h index 9ee9188bb460..e248dafb75cc 100644 --- a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h +++ b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h @@ -15,21 +15,53 @@ namespace Extensions { namespace UdpFilters { namespace UdpProxy { +/** + * All UDP proxy stats. @see stats_macros.h + */ +#define ALL_UDP_PROXY_STATS(COUNTER, GAUGE) \ + COUNTER(downstream_sess_rx_bytes) \ + COUNTER(downstream_sess_rx_datagrams) \ + COUNTER(downstream_sess_rx_errors) \ + COUNTER(downstream_sess_total) \ + COUNTER(downstream_sess_tx_bytes) \ + COUNTER(downstream_sess_tx_datagrams) \ + COUNTER(downstream_sess_tx_errors) \ + COUNTER(idle_timeout) \ + GAUGE(downstream_sess_active, Accumulate) + +/** + * Struct definition for all UDP proxy stats. @see stats_macros.h + */ +struct UdpProxyStats { + ALL_UDP_PROXY_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT) +}; + class UdpProxyFilterConfig { public: UdpProxyFilterConfig(Upstream::ClusterManager& cluster_manager, TimeSource& time_source, + Stats::Scope& root_scope, const envoy::config::filter::udp::udp_proxy::v2alpha::UdpProxyConfig& config) - : cluster_manager_(cluster_manager), time_source_(time_source), config_(config) {} + : cluster_manager_(cluster_manager), time_source_(time_source), cluster_(config.cluster()), + session_timeout_(PROTOBUF_GET_MS_OR_DEFAULT(config, idle_timeout, 60 * 1000)), + stats_(generateStats(config.stat_prefix(), root_scope)) {} - Upstream::ThreadLocalCluster* getCluster() const { - return cluster_manager_.get(config_.cluster()); - } + Upstream::ThreadLocalCluster* getCluster() const { return cluster_manager_.get(cluster_); } + std::chrono::milliseconds sessionTimeout() const { return session_timeout_; } + UdpProxyStats& stats() const { return stats_; } TimeSource& timeSource() const { return time_source_; } private: + static UdpProxyStats generateStats(const std::string& stat_prefix, Stats::Scope& scope) { + const auto final_prefix = fmt::format("udp.{}", stat_prefix); + return {ALL_UDP_PROXY_STATS(POOL_COUNTER_PREFIX(scope, final_prefix), + POOL_GAUGE_PREFIX(scope, final_prefix))}; + } + Upstream::ClusterManager& cluster_manager_; TimeSource& time_source_; - const envoy::config::filter::udp::udp_proxy::v2alpha::UdpProxyConfig config_; + const std::string cluster_; + const std::chrono::milliseconds session_timeout_; + mutable UdpProxyStats stats_; }; using UdpProxyFilterConfigSharedPtr = std::shared_ptr; @@ -42,6 +74,7 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, Logger::Loggableaddress()->socket(Network::Address::SocketType::Datagram); + } + const UdpProxyFilterConfigSharedPtr config_; absl::flat_hash_set diff --git a/source/extensions/quic_listeners/quiche/active_quic_listener.h b/source/extensions/quic_listeners/quiche/active_quic_listener.h index fc14a490e242..c4e52ae7d319 100644 --- a/source/extensions/quic_listeners/quiche/active_quic_listener.h +++ b/source/extensions/quic_listeners/quiche/active_quic_listener.h @@ -32,8 +32,7 @@ class ActiveQuicListener : public Network::UdpListenerCallbacks, // Network::UdpListenerCallbacks void onData(Network::UdpRecvData& data) override; void onWriteReady(const Network::Socket& socket) override; - void onReceiveError(const Network::UdpListenerCallbacks::ErrorCode& /*error_code*/, - Api::IoError::IoErrorCode /*err*/) override { + void onReceiveError(Api::IoError::IoErrorCode /*error_code*/) override { // No-op. Quic can't do anything upon listener error. } diff --git a/source/server/connection_handler_impl.cc b/source/server/connection_handler_impl.cc index d4c272f95dc4..d47340ee1640 100644 --- a/source/server/connection_handler_impl.cc +++ b/source/server/connection_handler_impl.cc @@ -438,11 +438,8 @@ void ActiveUdpListener::onWriteReady(const Network::Socket&) { // data } -void ActiveUdpListener::onReceiveError(const Network::UdpListenerCallbacks::ErrorCode&, - Api::IoError::IoErrorCode) { - // TODO(sumukhs): Determine what to do on receive error. - // Would the filters need to know on error? Can't foresee a scenario where they - // would take an action +void ActiveUdpListener::onReceiveError(Api::IoError::IoErrorCode error_code) { + read_filter_->onReceiveError(error_code); } void ActiveUdpListener::addReadFilter(Network::UdpListenerReadFilterPtr&& filter) { diff --git a/source/server/connection_handler_impl.h b/source/server/connection_handler_impl.h index cc809fcf4ea3..35c52a849c1d 100644 --- a/source/server/connection_handler_impl.h +++ b/source/server/connection_handler_impl.h @@ -262,8 +262,7 @@ class ActiveUdpListener : public Network::UdpListenerCallbacks, // Network::UdpListenerCallbacks void onData(Network::UdpRecvData& data) override; void onWriteReady(const Network::Socket& socket) override; - void onReceiveError(const Network::UdpListenerCallbacks::ErrorCode& error_code, - Api::IoError::IoErrorCode err) override; + void onReceiveError(Api::IoError::IoErrorCode error_code) override; // ActiveListenerImplBase Network::Listener* listener() override { return udp_listener_.get(); } diff --git a/test/common/network/udp_listener_impl_test.cc b/test/common/network/udp_listener_impl_test.cc index 94408da5242e..485df4fef59d 100644 --- a/test/common/network/udp_listener_impl_test.cc +++ b/test/common/network/udp_listener_impl_test.cc @@ -146,7 +146,7 @@ TEST_P(UdpListenerImplTest, UseActualDstUdp) { *send_to_addr_); ASSERT_EQ(send_rc.rc_, second.length()); - EXPECT_CALL(listener_callbacks_, onData_(_)) + EXPECT_CALL(listener_callbacks_, onData(_)) .WillOnce(Invoke([&](const UdpRecvData& data) -> void { validateRecvCallbackParams(data); @@ -160,7 +160,7 @@ TEST_P(UdpListenerImplTest, UseActualDstUdp) { dispatcher_->exit(); })); - EXPECT_CALL(listener_callbacks_, onWriteReady_(_)) + EXPECT_CALL(listener_callbacks_, onWriteReady(_)) .WillRepeatedly(Invoke([&](const Socket& socket) { EXPECT_EQ(socket.ioHandle().fd(), server_socket_->ioHandle().fd()); })); @@ -196,7 +196,7 @@ TEST_P(UdpListenerImplTest, UdpEcho) { std::vector server_received_data; - EXPECT_CALL(listener_callbacks_, onData_(_)) + EXPECT_CALL(listener_callbacks_, onData(_)) .WillOnce(Invoke([&](const UdpRecvData& data) -> void { validateRecvCallbackParams(data); @@ -216,7 +216,7 @@ TEST_P(UdpListenerImplTest, UdpEcho) { server_received_data.push_back(data_str); })); - EXPECT_CALL(listener_callbacks_, onWriteReady_(_)).WillOnce(Invoke([&](const Socket& socket) { + EXPECT_CALL(listener_callbacks_, onWriteReady(_)).WillOnce(Invoke([&](const Socket& socket) { EXPECT_EQ(socket.ioHandle().fd(), server_socket_->ioHandle().fd()); ASSERT_NE(test_peer_address, nullptr); @@ -283,15 +283,15 @@ TEST_P(UdpListenerImplTest, UdpListenerEnableDisable) { *send_to_addr_); ASSERT_EQ(send_rc.rc_, second.length()); - EXPECT_CALL(listener_callbacks_, onData_(_)).Times(0); + EXPECT_CALL(listener_callbacks_, onData(_)).Times(0); - EXPECT_CALL(listener_callbacks_, onWriteReady_(_)).Times(0); + EXPECT_CALL(listener_callbacks_, onWriteReady(_)).Times(0); dispatcher_->run(Event::Dispatcher::RunType::Block); listener_->enable(); - EXPECT_CALL(listener_callbacks_, onData_(_)) + EXPECT_CALL(listener_callbacks_, onData(_)) .Times(2) .WillOnce(Return()) .WillOnce(Invoke([&](const UdpRecvData& data) -> void { @@ -302,7 +302,7 @@ TEST_P(UdpListenerImplTest, UdpListenerEnableDisable) { dispatcher_->exit(); })); - EXPECT_CALL(listener_callbacks_, onWriteReady_(_)) + EXPECT_CALL(listener_callbacks_, onWriteReady(_)) .WillRepeatedly(Invoke([&](const Socket& socket) { EXPECT_EQ(socket.ioHandle().fd(), server_socket_->ioHandle().fd()); })); @@ -329,19 +329,14 @@ TEST_P(UdpListenerImplTest, UdpListenerRecvMsgError) { nullptr, *send_to_addr_); ASSERT_EQ(send_rc.rc_, first.length()); - EXPECT_CALL(listener_callbacks_, onData_(_)).Times(0); + EXPECT_CALL(listener_callbacks_, onData(_)).Times(0); - EXPECT_CALL(listener_callbacks_, onWriteReady_(_)) - .Times(1) - .WillRepeatedly(Invoke([&](const Socket& socket) { - EXPECT_EQ(socket.ioHandle().fd(), server_socket_->ioHandle().fd()); - })); + EXPECT_CALL(listener_callbacks_, onWriteReady(_)).WillOnce(Invoke([&](const Socket& socket) { + EXPECT_EQ(socket.ioHandle().fd(), server_socket_->ioHandle().fd()); + })); - EXPECT_CALL(listener_callbacks_, onReceiveError_(_, _)) - .Times(1) - .WillOnce(Invoke([&](const UdpListenerCallbacks::ErrorCode& err_code, - Api::IoError::IoErrorCode err) -> void { - ASSERT_EQ(UdpListenerCallbacks::ErrorCode::SyscallError, err_code); + EXPECT_CALL(listener_callbacks_, onReceiveError(_)) + .WillOnce(Invoke([&](Api::IoError::IoErrorCode err) -> void { ASSERT_EQ(Api::IoError::IoErrorCode::NoSupport, err); dispatcher_->exit(); diff --git a/test/extensions/filters/udp/udp_proxy/BUILD b/test/extensions/filters/udp/udp_proxy/BUILD index 0a2c103376b9..f49ca2ff9a79 100644 --- a/test/extensions/filters/udp/udp_proxy/BUILD +++ b/test/extensions/filters/udp/udp_proxy/BUILD @@ -11,6 +11,18 @@ load( envoy_package() +envoy_extension_cc_test( + name = "udp_proxy_filter_test", + srcs = ["udp_proxy_filter_test.cc"], + extension_name = "envoy.filters.udp_listener.udp_proxy", + deps = [ + "//source/extensions/filters/udp/udp_proxy:udp_proxy_filter_lib", + "//test/mocks/network:io_handle_mocks", + "//test/mocks/upstream:upstream_mocks", + "@envoy_api//envoy/config/filter/udp/udp_proxy/v2alpha:pkg_cc_proto", + ], +) + envoy_extension_cc_test( name = "udp_proxy_integration_test", srcs = ["udp_proxy_integration_test.cc"], diff --git a/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc b/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc new file mode 100644 index 000000000000..e82eee9931a0 --- /dev/null +++ b/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc @@ -0,0 +1,248 @@ +#include "envoy/config/filter/udp/udp_proxy/v2alpha/udp_proxy.pb.validate.h" + +#include "extensions/filters/udp/udp_proxy/udp_proxy_filter.h" + +#include "test/mocks/network/io_handle.h" +#include "test/mocks/upstream/mocks.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::AtLeast; +using testing::ByMove; +using testing::InSequence; +using testing::Return; +using testing::SaveArg; + +namespace Envoy { +namespace Extensions { +namespace UdpFilters { +namespace UdpProxy { +namespace { + +class TestUdpProxyFilter : public UdpProxyFilter { +public: + using UdpProxyFilter::UdpProxyFilter; + + MOCK_METHOD1(createIoHandle, Network::IoHandlePtr(const Upstream::HostConstSharedPtr& host)); +}; + +Api::IoCallUint64Result makeNoError(uint64_t rc) { + auto no_error = Api::ioCallUint64ResultNoError(); + no_error.rc_ = rc; + return no_error; +} + +Api::IoCallUint64Result makeError(int sys_errno) { + return Api::IoCallUint64Result(0, Api::IoErrorPtr(new Network::IoSocketError(sys_errno), + Network::IoSocketError::deleteIoError)); +} + +class UdpProxyFilterTest : public testing::Test { +public: + struct TestSession { + TestSession(UdpProxyFilterTest& parent, + const Network::Address::InstanceConstSharedPtr& upstream_address) + : parent_(parent), upstream_address_(upstream_address), + io_handle_(new Network::MockIoHandle()) {} + + void expectUpstreamWrite(const std::string& data, int sys_errno = 0) { + EXPECT_CALL(*idle_timer_, enableTimer(parent_.config_->sessionTimeout(), nullptr)); + EXPECT_CALL(*io_handle_, sendmsg(_, 1, 0, nullptr, _)) + .WillOnce(Invoke( + [this, data, sys_errno]( + const Buffer::RawSlice* slices, uint64_t, int, const Network::Address::Ip*, + const Network::Address::Instance& peer_address) -> Api::IoCallUint64Result { + EXPECT_EQ(data, absl::string_view(static_cast(slices[0].mem_), + slices[0].len_)); + EXPECT_EQ(peer_address, *upstream_address_); + return sys_errno == 0 ? makeNoError(data.size()) : makeError(sys_errno); + })); + } + + void recvDataFromUpstream(const std::string& data, int send_sys_errno = 0) { + EXPECT_CALL(*idle_timer_, enableTimer(parent_.config_->sessionTimeout(), nullptr)); + + // Return the datagram. + EXPECT_CALL(*io_handle_, recvmsg(_, 1, _, _)) + .WillOnce(Invoke( + [this, data](Buffer::RawSlice* slices, const uint64_t, uint32_t, + Network::IoHandle::RecvMsgOutput& output) -> Api::IoCallUint64Result { + ASSERT(data.size() <= slices[0].len_); + memcpy(slices[0].mem_, data.data(), data.size()); + output.peer_address_ = upstream_address_; + return makeNoError(data.size()); + })); + // Send the datagram downstream. + EXPECT_CALL(parent_.callbacks_.udp_listener_, send(_)) + .WillOnce(Invoke([data, send_sys_errno]( + const Network::UdpSendData& send_data) -> Api::IoCallUint64Result { + // TODO(mattklein123): Verify peer/local address. + EXPECT_EQ(send_data.buffer_.toString(), data); + if (send_sys_errno == 0) { + send_data.buffer_.drain(send_data.buffer_.length()); + return makeNoError(data.size()); + } else { + return makeError(send_sys_errno); + } + })); + // Return an EAGAIN result. + EXPECT_CALL(*io_handle_, recvmsg(_, 1, _, _)) + .WillOnce(Return(ByMove(Api::IoCallUint64Result( + 0, Api::IoErrorPtr(Network::IoSocketError::getIoSocketEagainInstance(), + Network::IoSocketError::deleteIoError))))); + // Kick off the receive. + file_event_cb_(Event::FileReadyType::Read); + } + + UdpProxyFilterTest& parent_; + const Network::Address::InstanceConstSharedPtr upstream_address_; + Event::MockTimer* idle_timer_{}; + Network::MockIoHandle* io_handle_; + Event::FileReadyCb file_event_cb_; + }; + + UdpProxyFilterTest() + : upstream_address_(Network::Utility::parseInternetAddressAndPort("20.0.0.1:443")) { + // Disable strict mock warnings. + EXPECT_CALL(callbacks_, udpListener()).Times(AtLeast(0)); + EXPECT_CALL(*cluster_manager_.thread_local_cluster_.lb_.host_, address()) + .WillRepeatedly(Return(upstream_address_)); + } + + ~UdpProxyFilterTest() { EXPECT_CALL(callbacks_.udp_listener_, onDestroy()); } + + void setup(const std::string& yaml) { + envoy::config::filter::udp::udp_proxy::v2alpha::UdpProxyConfig config; + TestUtility::loadFromYamlAndValidate(yaml, config); + config_ = std::make_shared(cluster_manager_, time_system_, stats_store_, + config); + filter_ = std::make_unique(callbacks_, config_); + } + + void recvDataFromDownstream(const std::string& peer_address, const std::string& local_address, + const std::string& buffer) { + Network::UdpRecvData data; + data.addresses_.peer_ = Network::Utility::parseInternetAddressAndPort(peer_address); + data.addresses_.local_ = Network::Utility::parseInternetAddressAndPort(local_address); + data.buffer_ = std::make_unique(buffer); + data.receive_time_ = MonotonicTime(std::chrono::seconds(0)); + filter_->onData(data); + } + + void expectSessionCreate() { + test_sessions_.emplace_back(*this, upstream_address_); + TestSession& new_session = test_sessions_.back(); + EXPECT_CALL(cluster_manager_, get(_)); + new_session.idle_timer_ = new Event::MockTimer(&callbacks_.udp_listener_.dispatcher_); + EXPECT_CALL(*filter_, createIoHandle(_)) + .WillOnce(Return(ByMove(Network::IoHandlePtr{test_sessions_.back().io_handle_}))); + EXPECT_CALL(*new_session.io_handle_, fd()); + EXPECT_CALL(callbacks_.udp_listener_.dispatcher_, + createFileEvent_(_, _, Event::FileTriggerType::Edge, Event::FileReadyType::Read)) + .WillOnce(DoAll(SaveArg<1>(&new_session.file_event_cb_), Return(nullptr))); + } + + void checkTransferStats(uint64_t rx_bytes, uint64_t rx_datagrams, uint64_t tx_bytes, + uint64_t tx_datagrams) { + EXPECT_EQ(rx_bytes, config_->stats().downstream_sess_rx_bytes_.value()); + EXPECT_EQ(rx_datagrams, config_->stats().downstream_sess_rx_datagrams_.value()); + EXPECT_EQ(tx_bytes, config_->stats().downstream_sess_tx_bytes_.value()); + EXPECT_EQ(tx_datagrams, config_->stats().downstream_sess_tx_datagrams_.value()); + } + + Upstream::MockClusterManager cluster_manager_; + NiceMock time_system_; + Stats::IsolatedStoreImpl stats_store_; + UdpProxyFilterConfigSharedPtr config_; + Network::MockUdpReadFilterCallbacks callbacks_; + std::unique_ptr filter_; + std::vector test_sessions_; + // If a test ever support more than 1 upstream host this will need to move to the session/test. + const Network::Address::InstanceConstSharedPtr upstream_address_; +}; + +// Basic UDP proxy flow with a single session. +TEST_F(UdpProxyFilterTest, BasicFlow) { + InSequence s; + + setup(R"EOF( +stat_prefix: foo +cluster: fake_cluster + )EOF"); + + expectSessionCreate(); + test_sessions_[0].expectUpstreamWrite("hello"); + recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello"); + EXPECT_EQ(1, config_->stats().downstream_sess_total_.value()); + EXPECT_EQ(1, config_->stats().downstream_sess_active_.value()); + checkTransferStats(5 /*rx_bytes*/, 1 /*rx_datagrams*/, 0 /*tx_bytes*/, 0 /*tx_datagrams*/); + test_sessions_[0].recvDataFromUpstream("world"); + checkTransferStats(5 /*rx_bytes*/, 1 /*rx_datagrams*/, 5 /*tx_bytes*/, 1 /*tx_datagrams*/); + + test_sessions_[0].expectUpstreamWrite("hello2"); + test_sessions_[0].expectUpstreamWrite("hello3"); + recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello2"); + checkTransferStats(11 /*rx_bytes*/, 2 /*rx_datagrams*/, 5 /*tx_bytes*/, 1 /*tx_datagrams*/); + recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello3"); + checkTransferStats(17 /*rx_bytes*/, 3 /*rx_datagrams*/, 5 /*tx_bytes*/, 1 /*tx_datagrams*/); + + test_sessions_[0].recvDataFromUpstream("world2"); + checkTransferStats(17 /*rx_bytes*/, 3 /*rx_datagrams*/, 11 /*tx_bytes*/, 2 /*tx_datagrams*/); + test_sessions_[0].recvDataFromUpstream("world3"); + checkTransferStats(17 /*rx_bytes*/, 3 /*rx_datagrams*/, 17 /*tx_bytes*/, 3 /*tx_datagrams*/); +} + +// Idle timeout flow. +TEST_F(UdpProxyFilterTest, IdleTimeout) { + InSequence s; + + setup(R"EOF( +stat_prefix: foo +cluster: fake_cluster + )EOF"); + + expectSessionCreate(); + test_sessions_[0].expectUpstreamWrite("hello"); + recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello"); + EXPECT_EQ(1, config_->stats().downstream_sess_total_.value()); + EXPECT_EQ(1, config_->stats().downstream_sess_active_.value()); + + test_sessions_[0].idle_timer_->invokeCallback(); + EXPECT_EQ(1, config_->stats().downstream_sess_total_.value()); + EXPECT_EQ(0, config_->stats().downstream_sess_active_.value()); + + expectSessionCreate(); + test_sessions_[1].expectUpstreamWrite("hello"); + recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello"); + EXPECT_EQ(2, config_->stats().downstream_sess_total_.value()); + EXPECT_EQ(1, config_->stats().downstream_sess_active_.value()); +} + +// Verify downstream send and receive error handling. +TEST_F(UdpProxyFilterTest, SendReceiveErrorHandling) { + InSequence s; + + setup(R"EOF( +stat_prefix: foo +cluster: fake_cluster + )EOF"); + + filter_->onReceiveError(Api::IoError::IoErrorCode::UnknownError); + EXPECT_EQ(1, config_->stats().downstream_sess_rx_errors_.value()); + + expectSessionCreate(); + test_sessions_[0].expectUpstreamWrite("hello"); + recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello"); + checkTransferStats(5 /*rx_bytes*/, 1 /*rx_datagrams*/, 0 /*tx_bytes*/, 0 /*tx_datagrams*/); + + test_sessions_[0].recvDataFromUpstream("world2", EMSGSIZE); + checkTransferStats(5 /*rx_bytes*/, 1 /*rx_datagrams*/, 0 /*tx_bytes*/, 0 /*tx_datagrams*/); + EXPECT_EQ(1, config_->stats().downstream_sess_tx_errors_.value()); +} + +} // namespace +} // namespace UdpProxy +} // namespace UdpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/test/extensions/filters/udp/udp_proxy/udp_proxy_integration_test.cc b/test/extensions/filters/udp/udp_proxy/udp_proxy_integration_test.cc index b644e3d26115..727b21ab0870 100644 --- a/test/extensions/filters/udp/udp_proxy/udp_proxy_integration_test.cc +++ b/test/extensions/filters/udp/udp_proxy/udp_proxy_integration_test.cc @@ -23,6 +23,7 @@ class UdpSyncClient { } void recv(Network::UdpRecvData& datagram) { + datagram = Network::UdpRecvData(); const auto rc = Network::Test::readFromSocket(socket_->ioHandle(), *socket_->localAddress(), datagram); ASSERT_TRUE(rc.ok()); @@ -43,6 +44,7 @@ class UdpProxyIntegrationTest : public testing::TestWithParamtoString()); // Respond from the upstream. - fake_upstreams_[0]->sendUdpDatagram("world", *request_datagram.addresses_.peer_); + fake_upstreams_[0]->sendUdpDatagram("world1", *request_datagram.addresses_.peer_); Network::UdpRecvData response_datagram; client.recv(response_datagram); - EXPECT_EQ("world", response_datagram.buffer_->toString()); + EXPECT_EQ("world1", response_datagram.buffer_->toString()); EXPECT_EQ(listener_address.asString(), response_datagram.addresses_.peer_->asString()); + + EXPECT_EQ(5, test_server_->counter("udp.foo.downstream_sess_rx_bytes")->value()); + EXPECT_EQ(1, test_server_->counter("udp.foo.downstream_sess_rx_datagrams")->value()); + EXPECT_EQ(6, test_server_->counter("udp.foo.downstream_sess_tx_bytes")->value()); + EXPECT_EQ(1, test_server_->counter("udp.foo.downstream_sess_tx_datagrams")->value()); + EXPECT_EQ(1, test_server_->counter("udp.foo.downstream_sess_total")->value()); + EXPECT_EQ(1, test_server_->gauge("udp.foo.downstream_sess_active")->value()); } }; diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index 997977f5892c..9a7c0edf340a 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -627,6 +627,7 @@ class FakeUpstream : Logger::Loggable, // Network::UdpListenerReadFilter void onData(Network::UdpRecvData& data) override { parent_.onRecvDatagram(data); } + void onReceiveError(Api::IoError::IoErrorCode) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } private: FakeUpstream& parent_; diff --git a/test/mocks/network/BUILD b/test/mocks/network/BUILD index 6011484c1094..646a5ee1a172 100644 --- a/test/mocks/network/BUILD +++ b/test/mocks/network/BUILD @@ -20,6 +20,15 @@ envoy_cc_mock( ], ) +envoy_cc_mock( + name = "io_handle_mocks", + srcs = ["io_handle.cc"], + hdrs = ["io_handle.h"], + deps = [ + "//include/envoy/network:io_handle_interface", + ], +) + envoy_cc_mock( name = "network_mocks", srcs = ["mocks.cc"], diff --git a/test/mocks/network/io_handle.cc b/test/mocks/network/io_handle.cc new file mode 100644 index 000000000000..3f5871626651 --- /dev/null +++ b/test/mocks/network/io_handle.cc @@ -0,0 +1,12 @@ +#include "test/mocks/network/io_handle.h" + +#include "envoy/network/address.h" + +namespace Envoy { +namespace Network { + +MockIoHandle::MockIoHandle() = default; +MockIoHandle::~MockIoHandle() = default; + +} // namespace Network +} // namespace Envoy diff --git a/test/mocks/network/io_handle.h b/test/mocks/network/io_handle.h new file mode 100644 index 000000000000..b87bded3dc16 --- /dev/null +++ b/test/mocks/network/io_handle.h @@ -0,0 +1,29 @@ +#pragma once + +#include "envoy/network/io_handle.h" + +#include "gmock/gmock.h" + +namespace Envoy { +namespace Network { + +class MockIoHandle : public IoHandle { +public: + MockIoHandle(); + ~MockIoHandle(); + + MOCK_CONST_METHOD0(fd, int()); + MOCK_METHOD0(close, Api::IoCallUint64Result()); + MOCK_CONST_METHOD0(isOpen, bool()); + MOCK_METHOD3(readv, Api::IoCallUint64Result(uint64_t max_length, Buffer::RawSlice* slices, + uint64_t num_slice)); + MOCK_METHOD2(writev, Api::IoCallUint64Result(const Buffer::RawSlice* slices, uint64_t num_slice)); + MOCK_METHOD5(sendmsg, Api::IoCallUint64Result(const Buffer::RawSlice* slices, uint64_t num_slice, + int flags, const Address::Ip* self_ip, + const Address::Instance& peer_address)); + MOCK_METHOD4(recvmsg, Api::IoCallUint64Result(Buffer::RawSlice* slices, const uint64_t num_slice, + uint32_t self_port, RecvMsgOutput& output)); +}; + +} // namespace Network +} // namespace Envoy diff --git a/test/mocks/network/mocks.cc b/test/mocks/network/mocks.cc index 578e928483c5..fd20cb5e5b11 100644 --- a/test/mocks/network/mocks.cc +++ b/test/mocks/network/mocks.cc @@ -172,7 +172,10 @@ MockTransportSocketCallbacks::MockTransportSocketCallbacks() { } MockTransportSocketCallbacks::~MockTransportSocketCallbacks() = default; -MockUdpListener::MockUdpListener() = default; +MockUdpListener::MockUdpListener() { + ON_CALL(*this, dispatcher()).WillByDefault(ReturnRef(dispatcher_)); +} + MockUdpListener::~MockUdpListener() { onDestroy(); } MockUdpReadFilterCallbacks::MockUdpReadFilterCallbacks() { diff --git a/test/mocks/network/mocks.h b/test/mocks/network/mocks.h index bf85ab789c63..bc545d330634 100644 --- a/test/mocks/network/mocks.h +++ b/test/mocks/network/mocks.h @@ -136,19 +136,9 @@ class MockUdpListenerCallbacks : public UdpListenerCallbacks { MockUdpListenerCallbacks(); ~MockUdpListenerCallbacks() override; - void onData(UdpRecvData& data) override { onData_(data); } - - void onWriteReady(const Socket& socket) override { onWriteReady_(socket); } - - void onReceiveError(const ErrorCode& err_code, Api::IoError::IoErrorCode err) override { - onReceiveError_(err_code, err); - } - - MOCK_METHOD1(onData_, void(UdpRecvData& data)); - - MOCK_METHOD1(onWriteReady_, void(const Socket& socket)); - - MOCK_METHOD2(onReceiveError_, void(const ErrorCode& err_code, Api::IoError::IoErrorCode err)); + MOCK_METHOD1(onData, void(UdpRecvData& data)); + MOCK_METHOD1(onWriteReady, void(const Socket& socket)); + MOCK_METHOD1(onReceiveError, void(Api::IoError::IoErrorCode err)); }; class MockDrainDecision : public DrainDecision { @@ -428,6 +418,8 @@ class MockUdpListener : public UdpListener { MOCK_METHOD0(dispatcher, Event::Dispatcher&()); MOCK_CONST_METHOD0(localAddress, Address::InstanceConstSharedPtr&()); MOCK_METHOD1(send, Api::IoCallUint64Result(const UdpSendData&)); + + Event::MockDispatcher dispatcher_; }; class MockUdpReadFilterCallbacks : public UdpReadFilterCallbacks { diff --git a/test/test_common/network_utility.cc b/test/test_common/network_utility.cc index 6f50bb5465bb..207925f1ca16 100644 --- a/test/test_common/network_utility.cc +++ b/test/test_common/network_utility.cc @@ -191,7 +191,7 @@ const Network::FilterChainSharedPtr createEmptyFilterChainWithRawBufferSockets() namespace { struct SyncPacketProcessor : public Network::UdpPacketProcessor { - SyncPacketProcessor(Network::UdpRecvData& data) : data_(data) {} + SyncPacketProcessor(Network::UdpRecvData& data) : data_(data) { ASSERT(data.buffer_ == nullptr); } void processPacket(Network::Address::InstanceConstSharedPtr local_address, Network::Address::InstanceConstSharedPtr peer_address, diff --git a/test/test_common/network_utility.h b/test/test_common/network_utility.h index 9924a29e54e3..a3410bf7003c 100644 --- a/test/test_common/network_utility.h +++ b/test/test_common/network_utility.h @@ -162,7 +162,8 @@ const FilterChainSharedPtr createEmptyFilterChainWithRawBufferSockets(); /** * Wrapper for Utility::readFromSocket() which reads a single datagram into the supplied - * UdpRecvData without worrying about the packet processor interface. + * UdpRecvData without worrying about the packet processor interface. The function will + * instantiate the buffer returned in data. */ Api::IoCallUint64Result readFromSocket(IoHandle& handle, const Address::Instance& local_address, UdpRecvData& data);