diff --git a/api/docs/BUILD b/api/docs/BUILD index 3292a1212c38..29cecec8cfa3 100644 --- a/api/docs/BUILD +++ b/api/docs/BUILD @@ -65,6 +65,7 @@ proto_library( "//envoy/config/filter/network/zookeeper_proxy/v1alpha1:pkg", "//envoy/config/filter/thrift/rate_limit/v2alpha1:pkg", "//envoy/config/filter/thrift/router/v2alpha1:pkg", + "//envoy/config/filter/udp/udp_proxy/v2alpha:pkg", "//envoy/config/grpc_credential/v2alpha:pkg", "//envoy/config/health_checker/redis/v2:pkg", "//envoy/config/listener/v2:pkg", diff --git a/api/envoy/config/filter/udp/udp_proxy/v2alpha/udp_proxy.proto b/api/envoy/config/filter/udp/udp_proxy/v2alpha/udp_proxy.proto index 68603f38d327..c91420b29b10 100644 --- a/api/envoy/config/filter/udp/udp_proxy/v2alpha/udp_proxy.proto +++ b/api/envoy/config/filter/udp/udp_proxy/v2alpha/udp_proxy.proto @@ -10,7 +10,9 @@ import "google/protobuf/duration.proto"; import "validate/validate.proto"; -// TODO(mattklein123): docs +// [#protodoc-title: UDP proxy] +// UDP proxy :ref:`configuration overview `. +// [#extension: envoy.filters.udp_listener.udp_proxy] // Configuration for the UDP proxy filter. message UdpProxyConfig { diff --git a/docs/root/api-v2/config/filter/filter.rst b/docs/root/api-v2/config/filter/filter.rst index 6ddd5e15abf3..e7b8d2ff6f8e 100644 --- a/docs/root/api-v2/config/filter/filter.rst +++ b/docs/root/api-v2/config/filter/filter.rst @@ -5,10 +5,11 @@ Filters :glob: :maxdepth: 2 + listener/listener network/network + udp/udp http/http - thrift/thrift accesslog/v2/accesslog.proto fault/v2/fault.proto - listener/listener dubbo/dubbo + thrift/thrift diff --git a/docs/root/api-v2/config/filter/udp/udp.rst b/docs/root/api-v2/config/filter/udp/udp.rst new file mode 100644 index 000000000000..9728ddad1497 --- /dev/null +++ b/docs/root/api-v2/config/filter/udp/udp.rst @@ -0,0 +1,8 @@ +UDP listener filters +==================== + +.. toctree:: + :glob: + :maxdepth: 2 + + */v2alpha/* diff --git a/docs/root/configuration/listeners/listeners.rst b/docs/root/configuration/listeners/listeners.rst index 73605a853658..9b3e2161ef0c 100644 --- a/docs/root/configuration/listeners/listeners.rst +++ b/docs/root/configuration/listeners/listeners.rst @@ -10,4 +10,5 @@ Listeners stats listener_filters/listener_filters network_filters/network_filters + udp_filters/udp_filters lds diff --git a/docs/root/configuration/listeners/udp_filters/udp_filters.rst b/docs/root/configuration/listeners/udp_filters/udp_filters.rst new file mode 100644 index 000000000000..1665052de2b6 --- /dev/null +++ b/docs/root/configuration/listeners/udp_filters/udp_filters.rst @@ -0,0 +1,11 @@ +.. _config_udp_listener_filters: + +UDP listener filters +==================== + +Envoy has the following builtin UDP listener filters. + +.. toctree:: + :maxdepth: 2 + + udp_proxy diff --git a/docs/root/configuration/listeners/udp_filters/udp_proxy.rst b/docs/root/configuration/listeners/udp_filters/udp_proxy.rst new file mode 100644 index 000000000000..a651bb241a5a --- /dev/null +++ b/docs/root/configuration/listeners/udp_filters/udp_proxy.rst @@ -0,0 +1,134 @@ +.. _config_udp_listener_filters_udp_proxy: + +UDP proxy +========= + +.. attention:: + + UDP proxy support should be considered alpha and not production ready. + +* :ref:`v2 API reference ` +* This filter should be configured with the name *envoy.filters.udp_listener.udp_proxy* + +Overview +-------- + +The UDP proxy listener filter allows Envoy to operate as a *non-transparent* proxy between a +UDP client and server. The lack of transparency means that the upstream server will see the +source IP and port of the Envoy instance versus the client. All datagrams flow from the client, to +Envoy, to the upstream server, back to Envoy, and back to the client. + +Because UDP is not a connection oriented protocol, Envoy must keep track of a client's *session* +such that the response datagrams from an upstream server can be routed back to the correct client. +Each session is index by the 4-tuple consisting of source IP/port and local IP/port that the +datagram is received on. Sessions last until the :ref:`idle timeout +` is reached. + +Load balancing and unhealthy host handling +------------------------------------------ + +Envoy will fully utilize the configured load balancer for the configured upstream cluster when +load balancing UDP datagrams. When a new session is created, Envoy will associate the session +with an upstream host selected using the configured load balancer. All future datagrams that +belong to the session will be routed to the same upstream host. + +When an upstream host becomes unhealthy (due to :ref:`active health checking +`), Envoy will attempt to create a new session to a healthy host +when the next datagram is received. + +Circuit breaking +---------------- + +The number of sessions that can be created per upstream cluster is limited by the cluster's +:ref:`maximum connection circuit breaker `. +By default this is 1024. + +Example configuration +--------------------- + +The following example configuration will cause Envoy to listen on UDP port 1234 and proxy to a UDP +server listening on port 1235. + + .. code-block:: yaml + + admin: + access_log_path: /tmp/admin_access.log + address: + socket_address: + protocol: TCP + address: 127.0.0.1 + port_value: 9901 + static_resources: + listeners: + - name: listener_0 + address: + socket_address: + protocol: UDP + address: 127.0.0.1 + port_value: 1234 + listener_filters: + name: envoy.filters.udp_listener.udp_proxy + typed_config: + '@type': type.googleapis.com/envoy.config.filter.udp.udp_proxy.v2alpha.UdpProxyConfig + stat_prefix: service + cluster: service_udp + clusters: + - name: service_udp + connect_timeout: 0.25s + type: STATIC + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: service_udp + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 1235 + +Statistics +---------- + +The UDP proxy filter emits both its own downstream statistics as well as many of the :ref:`cluster +upstream statistics ` where applicable. The downstream +statistics are rooted at *udp..* with the following statistics: + +.. csv-table:: + :header: Name, Type, Description + :widths: 1, 1, 2 + + downstream_sess_no_route, Counter, Number of datagrams not routed due to no cluster + downstream_sess_rx_bytes, Counter, Number of bytes received + downstream_sess_rx_datagrams, Counter, Number of datagrams received + downstream_sess_rx_errors, Counter, Number of datagram receive errors + downstream_sess_total, Counter, Number sessions created in total + downstream_sess_tx_bytes, Counter, Number of bytes transmitted + downstream_sess_tx_datagrams, Counter, Number of datagrams transmitted + downstream_sess_tx_errors, counter, Number of datagram transmission errors + idle_timeout, Counter, Number of sessions destroyed due to idle timeout + downstream_sess_active, Gauge, Number of sessions currently active + +The following standard :ref:`upstream cluster stats ` are used +by the UDP proxy: + +.. csv-table:: + :header: Name, Type, Description + :widths: 1, 1, 2 + + upstream_cx_none_healthy, Counter, Number of datagrams dropped due to no healthy hosts + upstream_cx_overflow, Counter, Number of datagrams dropped due to hitting the session circuit breaker + upstream_cx_rx_bytes_total, Counter, Number of bytes received + upstream_cx_tx_bytes_total, Counter, Number of bytes transmitted + +The UDP proxy filter also emits custom upstream cluster stats prefixed with +*cluster..udp.*: + +.. csv-table:: + :header: Name, Type, Description + :widths: 1, 1, 2 + + sess_rx_datagrams, Counter, Number of datagrams received + sess_rx_errors, Counter, Number of datagram receive errors + sess_tx_datagrams, Counter, Number of datagrams transmitted + sess_tx_errors, Counter, Number of datagrams tramsitted diff --git a/docs/root/intro/arch_overview/listeners/listeners.rst b/docs/root/intro/arch_overview/listeners/listeners.rst index 2f841e220f4b..01dca7c998e4 100644 --- a/docs/root/intro/arch_overview/listeners/listeners.rst +++ b/docs/root/intro/arch_overview/listeners/listeners.rst @@ -5,9 +5,12 @@ Listeners The Envoy configuration supports any number of listeners within a single process. Generally we recommend running a single Envoy per machine regardless of the number of configured listeners. This -allows for easier operation and a single source of statistics. Currently Envoy only supports TCP +allows for easier operation and a single source of statistics. Envoy supports both TCP and UDP listeners. +TCP +--- + Each listener is independently configured with some number :ref:`filter chains `, where an individual chain is selected based on its :ref:`match criteria `. An individual filter chain is @@ -29,3 +32,14 @@ Listeners can also be fetched dynamically via the :ref:`listener discovery servi `. Listener :ref:`configuration `. + +UDP +--- + +Envoy also supports UDP listeners and specifically :ref:`UDP listener filters +`. UDP listener filters are instantiated once per worker and are global +to that worker. Each listener filter processes each UDP datagram that is received by the worker +listening on the port. In practice, UDP listeners are configured with the SO_REUSEPORT kernel option +which will cause the kernel to consistently hash each UDP 4-tuple to the same worker. This allows a +UDP listener filter to be "session" oriented if it so desires. A built-in example of this +functionality is the :ref:`UDP proxy ` listener filter. diff --git a/docs/root/intro/arch_overview/listeners/listeners_toc.rst b/docs/root/intro/arch_overview/listeners/listeners_toc.rst index 5b488ad2488f..922cb3e72447 100644 --- a/docs/root/intro/arch_overview/listeners/listeners_toc.rst +++ b/docs/root/intro/arch_overview/listeners/listeners_toc.rst @@ -8,3 +8,4 @@ Listeners listener_filters network_filters tcp_proxy + udp_proxy diff --git a/docs/root/intro/arch_overview/listeners/udp_proxy.rst b/docs/root/intro/arch_overview/listeners/udp_proxy.rst new file mode 100644 index 000000000000..ea886a59fb18 --- /dev/null +++ b/docs/root/intro/arch_overview/listeners/udp_proxy.rst @@ -0,0 +1,5 @@ +UDP proxy +========= + +Envoy supports UDP proxy via the :ref:`UDP proxy listener filter +`. diff --git a/docs/root/intro/version_history.rst b/docs/root/intro/version_history.rst index 3ecee6399aca..08f0f692385b 100644 --- a/docs/root/intro/version_history.rst +++ b/docs/root/intro/version_history.rst @@ -17,12 +17,13 @@ Version history * redis: performance improvement for larger split commands by avoiding string copies. * router: added support for REQ(header-name) :ref:`header formatter `. * router: skip the Location header when the response code is not a 201 or a 3xx. +* router: exposed DOWNSTREAM_REMOTE_ADDRESS as custom HTTP request/response headers. * server: fixed a bug in config validation for configs with runtime layers * tcp_proxy: added :ref:`ClusterWeight.metadata_match` * tcp_proxy: added :ref:`hash_policy` * thrift_proxy: added support for cluster header based routing. * tls: remove TLS 1.0 and 1.1 from client defaults -* router: exposed DOWNSTREAM_REMOTE_ADDRESS as custom HTTP request/response headers. +* udp: added initial support for :ref:`UDP proxy ` 1.12.0 (October 31, 2019) ========================= diff --git a/include/envoy/upstream/resource_manager.h b/include/envoy/upstream/resource_manager.h index 4fe7681aaaa3..3571b64e4111 100644 --- a/include/envoy/upstream/resource_manager.h +++ b/include/envoy/upstream/resource_manager.h @@ -73,7 +73,7 @@ class ResourceManager { virtual ~ResourceManager() = default; /** - * @return Resource& active TCP connections. + * @return Resource& active TCP connections and UDP sessions. */ virtual Resource& connections() PURE; diff --git a/source/extensions/extensions_build_config.bzl b/source/extensions/extensions_build_config.bzl index 9d7c18c07f8b..5f658433258b 100644 --- a/source/extensions/extensions_build_config.bzl +++ b/source/extensions/extensions_build_config.bzl @@ -97,7 +97,6 @@ EXTENSIONS = { # UDP filters # - # WiP "envoy.filters.udp_listener.udp_proxy": "//source/extensions/filters/udp/udp_proxy:config", # diff --git a/source/extensions/filters/udp/udp_proxy/BUILD b/source/extensions/filters/udp/udp_proxy/BUILD index ca7466ea964b..0704d744ad64 100644 --- a/source/extensions/filters/udp/udp_proxy/BUILD +++ b/source/extensions/filters/udp/udp_proxy/BUILD @@ -29,7 +29,7 @@ envoy_cc_extension( srcs = ["config.cc"], hdrs = ["config.h"], security_posture = "robust_to_untrusted_downstream", - status = "wip", + status = "alpha", deps = [ ":udp_proxy_filter_lib", "//include/envoy/registry", diff --git a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc index ad2eef579090..fa4aee00e3e1 100644 --- a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc +++ b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc @@ -7,53 +7,168 @@ namespace Extensions { namespace UdpFilters { namespace UdpProxy { +UdpProxyFilter::UdpProxyFilter(Network::UdpReadFilterCallbacks& callbacks, + const UdpProxyFilterConfigSharedPtr& config) + : UdpListenerReadFilter(callbacks), config_(config), + cluster_update_callbacks_( + config->clusterManager().addThreadLocalClusterUpdateCallbacks(*this)) { + Upstream::ThreadLocalCluster* cluster = config->clusterManager().get(config->cluster()); + if (cluster != nullptr) { + onClusterAddOrUpdate(*cluster); + } +} + +void UdpProxyFilter::onClusterAddOrUpdate(Upstream::ThreadLocalCluster& cluster) { + if (cluster.info()->name() != config_->cluster()) { + return; + } + + ENVOY_LOG(debug, "udp proxy: attaching to cluster {}", cluster.info()->name()); + cluster_info_.emplace(*this, cluster); +} + +void UdpProxyFilter::onClusterRemoval(const std::string& cluster) { + if (cluster != config_->cluster()) { + return; + } + + ENVOY_LOG(debug, "udp proxy: detaching from cluster {}", cluster); + cluster_info_.reset(); +} + void UdpProxyFilter::onData(Network::UdpRecvData& data) { + if (!cluster_info_.has_value()) { + config_->stats().downstream_sess_no_route_.inc(); + return; + } + + cluster_info_.value().onData(data); +} + +void UdpProxyFilter::onReceiveError(Api::IoError::IoErrorCode) { + config_->stats().downstream_sess_rx_errors_.inc(); +} + +UdpProxyFilter::ClusterInfo::ClusterInfo(UdpProxyFilter& filter, + Upstream::ThreadLocalCluster& cluster) + : filter_(filter), cluster_(cluster), + cluster_stats_(generateStats(cluster.info()->statsScope())), + member_update_cb_handle_(cluster.prioritySet().addMemberUpdateCb( + [this](const Upstream::HostVector&, const Upstream::HostVector& hosts_removed) { + for (const auto& host : hosts_removed) { + // This is similar to removeSession() but slightly different due to removeSession() + // also handling deletion of the host to session map entry if there are no sessions + // left. It would be nice to unify the logic but that can be cleaned up later. + auto host_sessions_it = host_to_sessions_.find(host.get()); + if (host_sessions_it != host_to_sessions_.end()) { + for (const auto& session : host_sessions_it->second) { + ASSERT(sessions_.count(session) == 1); + sessions_.erase(session); + } + host_to_sessions_.erase(host_sessions_it); + } + } + })) {} + +UdpProxyFilter::ClusterInfo::~ClusterInfo() { + member_update_cb_handle_->remove(); + // Sanity check the session accounting. This is not as fast as a straight teardown, but this is + // not a performance critical path. + while (!sessions_.empty()) { + removeSession(sessions_.begin()->get()); + } + ASSERT(host_to_sessions_.empty()); +} + +void UdpProxyFilter::ClusterInfo::onData(Network::UdpRecvData& data) { const auto active_session_it = sessions_.find(data.addresses_); ActiveSession* active_session; if (active_session_it == sessions_.end()) { - // TODO(mattklein123): Session circuit breaker. - // TODO(mattklein123): Instead of looking up the cluster each time, keep track of it via - // cluster manager callbacks. - Upstream::ThreadLocalCluster* cluster = config_->getCluster(); - // TODO(mattklein123): Handle the case where the cluster does not exist and add stat. - ASSERT(cluster != nullptr); + if (!cluster_.info() + ->resourceManager(Upstream::ResourcePriority::Default) + .connections() + .canCreate()) { + cluster_.info()->stats().upstream_cx_overflow_.inc(); + return; + } // TODO(mattklein123): Pass a context and support hash based routing. - Upstream::HostConstSharedPtr host = cluster->loadBalancer().chooseHost(nullptr); - // TODO(mattklein123): Handle the case where the host does not exist. - ASSERT(host != nullptr); + Upstream::HostConstSharedPtr host = cluster_.loadBalancer().chooseHost(nullptr); + if (host == nullptr) { + cluster_.info()->stats().upstream_cx_none_healthy_.inc(); + return; + } - auto new_session = std::make_unique(*this, std::move(data.addresses_), host); - active_session = new_session.get(); - sessions_.emplace(std::move(new_session)); + active_session = createSession(std::move(data.addresses_), host); } else { - // TODO(mattklein123): Handle the host going away going away or failing health checks. active_session = active_session_it->get(); + if (active_session->host().health() == Upstream::Host::Health::Unhealthy) { + // If a host becomes unhealthy, we optimally would like to replace it with a new session + // to a healthy host. We may eventually want to make this behavior configurable, but for now + // this will be the universal behavior. + + // TODO(mattklein123): Pass a context and support hash based routing. + Upstream::HostConstSharedPtr host = cluster_.loadBalancer().chooseHost(nullptr); + if (host != nullptr && host->health() != Upstream::Host::Health::Unhealthy && + host.get() != &active_session->host()) { + ENVOY_LOG(debug, "upstream session unhealthy, recreating the session"); + removeSession(active_session); + active_session = createSession(std::move(data.addresses_), host); + } else { + // In this case we could not get a better host, so just keep using the current session. + ENVOY_LOG(trace, "upstream session unhealthy, but unable to get a better host"); + } + } } active_session->write(*data.buffer_); } -void UdpProxyFilter::onReceiveError(Api::IoError::IoErrorCode) { - config_->stats().downstream_sess_rx_errors_.inc(); +UdpProxyFilter::ActiveSession* +UdpProxyFilter::ClusterInfo::createSession(Network::UdpRecvData::LocalPeerAddresses&& addresses, + const Upstream::HostConstSharedPtr& host) { + auto new_session = std::make_unique(*this, std::move(addresses), host); + auto new_session_ptr = new_session.get(); + sessions_.emplace(std::move(new_session)); + host_to_sessions_[host.get()].emplace(new_session_ptr); + return new_session_ptr; } -UdpProxyFilter::ActiveSession::ActiveSession(UdpProxyFilter& parent, +void UdpProxyFilter::ClusterInfo::removeSession(const ActiveSession* session) { + // First remove from the host to sessions map. + ASSERT(host_to_sessions_[&session->host()].count(session) == 1); + auto host_sessions_it = host_to_sessions_.find(&session->host()); + host_sessions_it->second.erase(session); + if (host_sessions_it->second.empty()) { + host_to_sessions_.erase(host_sessions_it); + } + + // Now remove it from the primary map. + ASSERT(sessions_.count(session) == 1); + sessions_.erase(session); +} + +UdpProxyFilter::ActiveSession::ActiveSession(ClusterInfo& cluster, Network::UdpRecvData::LocalPeerAddresses&& addresses, const Upstream::HostConstSharedPtr& host) - : parent_(parent), addresses_(std::move(addresses)), host_(host), - idle_timer_(parent.read_callbacks_->udpListener().dispatcher().createTimer( + : cluster_(cluster), addresses_(std::move(addresses)), host_(host), + idle_timer_(cluster.filter_.read_callbacks_->udpListener().dispatcher().createTimer( [this] { onIdleTimer(); })), // NOTE: The socket call can only fail due to memory/fd exhaustion. No local ephemeral port // is bound until the first packet is sent to the upstream host. - io_handle_(parent.createIoHandle(host)), - socket_event_(parent.read_callbacks_->udpListener().dispatcher().createFileEvent( + io_handle_(cluster.filter_.createIoHandle(host)), + socket_event_(cluster.filter_.read_callbacks_->udpListener().dispatcher().createFileEvent( io_handle_->fd(), [this](uint32_t) { onReadReady(); }, Event::FileTriggerType::Edge, Event::FileReadyType::Read)) { - ENVOY_LOG(debug, "creating new session: downstream={} local={}", addresses_.peer_->asStringView(), - addresses_.local_->asStringView()); - parent_.config_->stats().downstream_sess_total_.inc(); - parent_.config_->stats().downstream_sess_active_.inc(); + ENVOY_LOG(debug, "creating new session: downstream={} local={} upstream={}", + addresses_.peer_->asStringView(), addresses_.local_->asStringView(), + host->address()->asStringView()); + cluster_.filter_.config_->stats().downstream_sess_total_.inc(); + cluster_.filter_.config_->stats().downstream_sess_active_.inc(); + cluster_.cluster_.info() + ->resourceManager(Upstream::ResourcePriority::Default) + .connections() + .inc(); // TODO(mattklein123): Enable dropped packets socket option. In general the Socket abstraction // does not work well right now for client sockets. It's too heavy weight and is aimed at listener @@ -63,27 +178,32 @@ UdpProxyFilter::ActiveSession::ActiveSession(UdpProxyFilter& parent, } UdpProxyFilter::ActiveSession::~ActiveSession() { - parent_.config_->stats().downstream_sess_active_.dec(); + cluster_.filter_.config_->stats().downstream_sess_active_.dec(); + cluster_.cluster_.info() + ->resourceManager(Upstream::ResourcePriority::Default) + .connections() + .dec(); } void UdpProxyFilter::ActiveSession::onIdleTimer() { ENVOY_LOG(debug, "session idle timeout: downstream={} local={}", addresses_.peer_->asStringView(), addresses_.local_->asStringView()); - parent_.config_->stats().idle_timeout_.inc(); - parent_.sessions_.erase(addresses_); + cluster_.filter_.config_->stats().idle_timeout_.inc(); + cluster_.removeSession(this); } void UdpProxyFilter::ActiveSession::onReadReady() { - idle_timer_->enableTimer(parent_.config_->sessionTimeout()); + idle_timer_->enableTimer(cluster_.filter_.config_->sessionTimeout()); // TODO(mattklein123): We should not be passing *addresses_.local_ to this function as we are // not trying to populate the local address for received packets. uint32_t packets_dropped = 0; const Api::IoErrorPtr result = Network::Utility::readPacketsFromSocket( - *io_handle_, *addresses_.local_, *this, parent_.config_->timeSource(), packets_dropped); + *io_handle_, *addresses_.local_, *this, cluster_.filter_.config_->timeSource(), + packets_dropped); // TODO(mattklein123): Handle no error when we limit the number of packets read. if (result->getErrorCode() != Api::IoError::IoErrorCode::Again) { - // TODO(mattklein123): Upstream cluster RX error stat. + cluster_.cluster_stats_.sess_rx_errors_.inc(); } } @@ -91,10 +211,11 @@ void UdpProxyFilter::ActiveSession::write(const Buffer::Instance& buffer) { ENVOY_LOG(trace, "writing {} byte datagram upstream: downstream={} local={} upstream={}", buffer.length(), addresses_.peer_->asStringView(), addresses_.local_->asStringView(), host_->address()->asStringView()); - parent_.config_->stats().downstream_sess_rx_bytes_.add(buffer.length()); - parent_.config_->stats().downstream_sess_rx_datagrams_.inc(); + const uint64_t buffer_length = buffer.length(); + cluster_.filter_.config_->stats().downstream_sess_rx_bytes_.add(buffer_length); + cluster_.filter_.config_->stats().downstream_sess_rx_datagrams_.inc(); - idle_timer_->enableTimer(parent_.config_->sessionTimeout()); + idle_timer_->enableTimer(cluster_.filter_.config_->sessionTimeout()); // NOTE: On the first write, a local ephemeral port is bound, and thus this write can fail due to // port exhaustion. @@ -103,9 +224,10 @@ void UdpProxyFilter::ActiveSession::write(const Buffer::Instance& buffer) { Api::IoCallUint64Result rc = Network::Utility::writeToSocket(*io_handle_, buffer, nullptr, *host_->address()); if (!rc.ok()) { - // TODO(mattklein123): Upstream cluster TX error stat. + cluster_.cluster_stats_.sess_tx_errors_.inc(); } else { - // TODO(mattklein123): Upstream cluster TX byte/datagram stats. + cluster_.cluster_stats_.sess_tx_datagrams_.inc(); + cluster_.cluster_.info()->stats().upstream_cx_tx_bytes_total_.add(buffer_length); } } @@ -117,15 +239,16 @@ void UdpProxyFilter::ActiveSession::processPacket(Network::Address::InstanceCons host_->address()->asStringView()); const uint64_t buffer_length = buffer->length(); - // TODO(mattklein123): Upstream cluster RX byte/datagram stats. + cluster_.cluster_stats_.sess_rx_datagrams_.inc(); + cluster_.cluster_.info()->stats().upstream_cx_rx_bytes_total_.add(buffer_length); Network::UdpSendData data{addresses_.local_->ip(), *addresses_.peer_, *buffer}; - const Api::IoCallUint64Result rc = parent_.read_callbacks_->udpListener().send(data); + const Api::IoCallUint64Result rc = cluster_.filter_.read_callbacks_->udpListener().send(data); if (!rc.ok()) { - parent_.config_->stats().downstream_sess_tx_errors_.inc(); + cluster_.filter_.config_->stats().downstream_sess_tx_errors_.inc(); } else { - parent_.config_->stats().downstream_sess_tx_bytes_.add(buffer_length); - parent_.config_->stats().downstream_sess_tx_datagrams_.inc(); + cluster_.filter_.config_->stats().downstream_sess_tx_bytes_.add(buffer_length); + cluster_.filter_.config_->stats().downstream_sess_tx_datagrams_.inc(); } } diff --git a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h index e248dafb75cc..fdf970bf4193 100644 --- a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h +++ b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h @@ -10,15 +10,18 @@ #include "absl/container/flat_hash_set.h" +// TODO(mattklein123): UDP session access logging. + namespace Envoy { namespace Extensions { namespace UdpFilters { namespace UdpProxy { /** - * All UDP proxy stats. @see stats_macros.h + * All UDP proxy downtream stats. @see stats_macros.h */ -#define ALL_UDP_PROXY_STATS(COUNTER, GAUGE) \ +#define ALL_UDP_PROXY_DOWNSTREAM_STATS(COUNTER, GAUGE) \ + COUNTER(downstream_sess_no_route) \ COUNTER(downstream_sess_rx_bytes) \ COUNTER(downstream_sess_rx_datagrams) \ COUNTER(downstream_sess_rx_errors) \ @@ -30,10 +33,26 @@ namespace UdpProxy { GAUGE(downstream_sess_active, Accumulate) /** - * Struct definition for all UDP proxy stats. @see stats_macros.h + * Struct definition for all UDP proxy downstream stats. @see stats_macros.h + */ +struct UdpProxyDownstreamStats { + ALL_UDP_PROXY_DOWNSTREAM_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT) +}; + +/** + * All UDP proxy upstream cluster stats. @see stats_macros.h + */ +#define ALL_UDP_PROXY_UPSTREAM_STATS(COUNTER) \ + COUNTER(sess_rx_datagrams) \ + COUNTER(sess_rx_errors) \ + COUNTER(sess_tx_datagrams) \ + COUNTER(sess_tx_errors) + +/** + * Struct definition for all UDP proxy upstream stats. @see stats_macros.h */ -struct UdpProxyStats { - ALL_UDP_PROXY_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT) +struct UdpProxyUpstreamStats { + ALL_UDP_PROXY_UPSTREAM_STATS(GENERATE_COUNTER_STRUCT) }; class UdpProxyFilterConfig { @@ -45,38 +64,43 @@ class UdpProxyFilterConfig { session_timeout_(PROTOBUF_GET_MS_OR_DEFAULT(config, idle_timeout, 60 * 1000)), stats_(generateStats(config.stat_prefix(), root_scope)) {} - Upstream::ThreadLocalCluster* getCluster() const { return cluster_manager_.get(cluster_); } + const std::string& cluster() const { return cluster_; } + Upstream::ClusterManager& clusterManager() const { return cluster_manager_; } std::chrono::milliseconds sessionTimeout() const { return session_timeout_; } - UdpProxyStats& stats() const { return stats_; } + UdpProxyDownstreamStats& stats() const { return stats_; } TimeSource& timeSource() const { return time_source_; } private: - static UdpProxyStats generateStats(const std::string& stat_prefix, Stats::Scope& scope) { + static UdpProxyDownstreamStats generateStats(const std::string& stat_prefix, + Stats::Scope& scope) { const auto final_prefix = fmt::format("udp.{}", stat_prefix); - return {ALL_UDP_PROXY_STATS(POOL_COUNTER_PREFIX(scope, final_prefix), - POOL_GAUGE_PREFIX(scope, final_prefix))}; + return {ALL_UDP_PROXY_DOWNSTREAM_STATS(POOL_COUNTER_PREFIX(scope, final_prefix), + POOL_GAUGE_PREFIX(scope, final_prefix))}; } Upstream::ClusterManager& cluster_manager_; TimeSource& time_source_; const std::string cluster_; const std::chrono::milliseconds session_timeout_; - mutable UdpProxyStats stats_; + mutable UdpProxyDownstreamStats stats_; }; using UdpProxyFilterConfigSharedPtr = std::shared_ptr; -class UdpProxyFilter : public Network::UdpListenerReadFilter, Logger::Loggable { +class UdpProxyFilter : public Network::UdpListenerReadFilter, + public Upstream::ClusterUpdateCallbacks, + Logger::Loggable { public: UdpProxyFilter(Network::UdpReadFilterCallbacks& callbacks, - const UdpProxyFilterConfigSharedPtr& config) - : UdpListenerReadFilter(callbacks), config_(config) {} + const UdpProxyFilterConfigSharedPtr& config); // Network::UdpListenerReadFilter void onData(Network::UdpRecvData& data) override; void onReceiveError(Api::IoError::IoErrorCode error_code) override; private: + class ClusterInfo; + /** * An active session is similar to a TCP connection. It binds a 4-tuple (downstream IP/port, local * IP/port) to a selected upstream host for the purpose of packet forwarding. Unlike a TCP @@ -87,10 +111,11 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, Logger::Loggable()(value->addresses()); } + size_t operator()(const ActiveSession* value) const { + return absl::Hash()(value->addresses()); + } }; struct HeterogeneousActiveSessionEqual { @@ -155,6 +183,40 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, Logger::Loggableaddresses() == rhs->addresses(); } + bool operator()(const ActiveSessionPtr& lhs, const ActiveSession* rhs) const { + return lhs->addresses() == rhs->addresses(); + } + }; + + /** + * Wraps all cluster specific UDP processing including session tracking, stats, etc. In the future + * we will very likely support different types of routing to multiple upstream clusters. + */ + class ClusterInfo { + public: + ClusterInfo(UdpProxyFilter& filter, Upstream::ThreadLocalCluster& cluster); + ~ClusterInfo(); + void onData(Network::UdpRecvData& data); + void removeSession(const ActiveSession* session); + + UdpProxyFilter& filter_; + Upstream::ThreadLocalCluster& cluster_; + UdpProxyUpstreamStats cluster_stats_; + + private: + ActiveSession* createSession(Network::UdpRecvData::LocalPeerAddresses&& addresses, + const Upstream::HostConstSharedPtr& host); + static UdpProxyUpstreamStats generateStats(Stats::Scope& scope) { + const auto final_prefix = "udp"; + return {ALL_UDP_PROXY_UPSTREAM_STATS(POOL_COUNTER_PREFIX(scope, final_prefix))}; + } + + Common::CallbackHandle* member_update_cb_handle_; + absl::flat_hash_set + sessions_; + absl::flat_hash_map> + host_to_sessions_; }; virtual Network::IoHandlePtr createIoHandle(const Upstream::HostConstSharedPtr& host) { @@ -162,10 +224,16 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, Logger::Loggableaddress()->socket(Network::Address::SocketType::Datagram); } + // Upstream::ClusterUpdateCallbacks + void onClusterAddOrUpdate(Upstream::ThreadLocalCluster& cluster) override; + void onClusterRemoval(const std::string& cluster_name) override; + const UdpProxyFilterConfigSharedPtr config_; - absl::flat_hash_set - sessions_; + const Upstream::ClusterUpdateCallbacksHandlePtr cluster_update_callbacks_; + // Right now we support a single cluster to route to. It is highly likely in the future that + // we will support additional routing options either using filter chain matching, weighting, + // etc. + absl::optional cluster_info_; }; } // namespace UdpProxy diff --git a/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc b/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc index e82eee9931a0..b9dc05a694a2 100644 --- a/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc +++ b/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc @@ -12,6 +12,7 @@ using testing::AtLeast; using testing::ByMove; using testing::InSequence; using testing::Return; +using testing::ReturnNew; using testing::SaveArg; namespace Envoy { @@ -60,37 +61,47 @@ class UdpProxyFilterTest : public testing::Test { })); } - void recvDataFromUpstream(const std::string& data, int send_sys_errno = 0) { + void recvDataFromUpstream(const std::string& data, int recv_sys_errno = 0, + int send_sys_errno = 0) { EXPECT_CALL(*idle_timer_, enableTimer(parent_.config_->sessionTimeout(), nullptr)); // Return the datagram. EXPECT_CALL(*io_handle_, recvmsg(_, 1, _, _)) - .WillOnce(Invoke( - [this, data](Buffer::RawSlice* slices, const uint64_t, uint32_t, - Network::IoHandle::RecvMsgOutput& output) -> Api::IoCallUint64Result { - ASSERT(data.size() <= slices[0].len_); - memcpy(slices[0].mem_, data.data(), data.size()); - output.peer_address_ = upstream_address_; - return makeNoError(data.size()); + .WillOnce( + Invoke([this, data, recv_sys_errno]( + Buffer::RawSlice* slices, const uint64_t, uint32_t, + Network::IoHandle::RecvMsgOutput& output) -> Api::IoCallUint64Result { + if (recv_sys_errno != 0) { + return makeError(recv_sys_errno); + } else { + ASSERT(data.size() <= slices[0].len_); + memcpy(slices[0].mem_, data.data(), data.size()); + output.peer_address_ = upstream_address_; + return makeNoError(data.size()); + } })); - // Send the datagram downstream. - EXPECT_CALL(parent_.callbacks_.udp_listener_, send(_)) - .WillOnce(Invoke([data, send_sys_errno]( - const Network::UdpSendData& send_data) -> Api::IoCallUint64Result { - // TODO(mattklein123): Verify peer/local address. - EXPECT_EQ(send_data.buffer_.toString(), data); - if (send_sys_errno == 0) { - send_data.buffer_.drain(send_data.buffer_.length()); - return makeNoError(data.size()); - } else { - return makeError(send_sys_errno); - } - })); - // Return an EAGAIN result. - EXPECT_CALL(*io_handle_, recvmsg(_, 1, _, _)) - .WillOnce(Return(ByMove(Api::IoCallUint64Result( - 0, Api::IoErrorPtr(Network::IoSocketError::getIoSocketEagainInstance(), - Network::IoSocketError::deleteIoError))))); + + if (recv_sys_errno == 0) { + // Send the datagram downstream. + EXPECT_CALL(parent_.callbacks_.udp_listener_, send(_)) + .WillOnce(Invoke([data, send_sys_errno]( + const Network::UdpSendData& send_data) -> Api::IoCallUint64Result { + // TODO(mattklein123): Verify peer/local address. + EXPECT_EQ(send_data.buffer_.toString(), data); + if (send_sys_errno == 0) { + send_data.buffer_.drain(send_data.buffer_.length()); + return makeNoError(data.size()); + } else { + return makeError(send_sys_errno); + } + })); + // Return an EAGAIN result. + EXPECT_CALL(*io_handle_, recvmsg(_, 1, _, _)) + .WillOnce(Return(ByMove(Api::IoCallUint64Result( + 0, Api::IoErrorPtr(Network::IoSocketError::getIoSocketEagainInstance(), + Network::IoSocketError::deleteIoError))))); + } + // Kick off the receive. file_event_cb_(Event::FileReadyType::Read); } @@ -108,15 +119,25 @@ class UdpProxyFilterTest : public testing::Test { EXPECT_CALL(callbacks_, udpListener()).Times(AtLeast(0)); EXPECT_CALL(*cluster_manager_.thread_local_cluster_.lb_.host_, address()) .WillRepeatedly(Return(upstream_address_)); + EXPECT_CALL(*cluster_manager_.thread_local_cluster_.lb_.host_, health()) + .WillRepeatedly(Return(Upstream::Host::Health::Healthy)); } ~UdpProxyFilterTest() { EXPECT_CALL(callbacks_.udp_listener_, onDestroy()); } - void setup(const std::string& yaml) { + void setup(const std::string& yaml, bool has_cluster = true) { envoy::config::filter::udp::udp_proxy::v2alpha::UdpProxyConfig config; TestUtility::loadFromYamlAndValidate(yaml, config); config_ = std::make_shared(cluster_manager_, time_system_, stats_store_, config); + EXPECT_CALL(cluster_manager_, addThreadLocalClusterUpdateCallbacks_(_)) + .WillOnce(DoAll(SaveArgAddress(&cluster_update_callbacks_), + ReturnNew())); + if (has_cluster) { + EXPECT_CALL(cluster_manager_, get(_)); + } else { + EXPECT_CALL(cluster_manager_, get(_)).WillOnce(Return(nullptr)); + } filter_ = std::make_unique(callbacks_, config_); } @@ -130,10 +151,9 @@ class UdpProxyFilterTest : public testing::Test { filter_->onData(data); } - void expectSessionCreate() { - test_sessions_.emplace_back(*this, upstream_address_); + void expectSessionCreate(const Network::Address::InstanceConstSharedPtr& address) { + test_sessions_.emplace_back(*this, address); TestSession& new_session = test_sessions_.back(); - EXPECT_CALL(cluster_manager_, get(_)); new_session.idle_timer_ = new Event::MockTimer(&callbacks_.udp_listener_.dispatcher_); EXPECT_CALL(*filter_, createIoHandle(_)) .WillOnce(Return(ByMove(Network::IoHandlePtr{test_sessions_.back().io_handle_}))); @@ -156,9 +176,9 @@ class UdpProxyFilterTest : public testing::Test { Stats::IsolatedStoreImpl stats_store_; UdpProxyFilterConfigSharedPtr config_; Network::MockUdpReadFilterCallbacks callbacks_; + Upstream::ClusterUpdateCallbacks* cluster_update_callbacks_{}; std::unique_ptr filter_; std::vector test_sessions_; - // If a test ever support more than 1 upstream host this will need to move to the session/test. const Network::Address::InstanceConstSharedPtr upstream_address_; }; @@ -171,7 +191,7 @@ stat_prefix: foo cluster: fake_cluster )EOF"); - expectSessionCreate(); + expectSessionCreate(upstream_address_); test_sessions_[0].expectUpstreamWrite("hello"); recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello"); EXPECT_EQ(1, config_->stats().downstream_sess_total_.value()); @@ -202,7 +222,7 @@ stat_prefix: foo cluster: fake_cluster )EOF"); - expectSessionCreate(); + expectSessionCreate(upstream_address_); test_sessions_[0].expectUpstreamWrite("hello"); recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello"); EXPECT_EQ(1, config_->stats().downstream_sess_total_.value()); @@ -212,7 +232,7 @@ cluster: fake_cluster EXPECT_EQ(1, config_->stats().downstream_sess_total_.value()); EXPECT_EQ(0, config_->stats().downstream_sess_active_.value()); - expectSessionCreate(); + expectSessionCreate(upstream_address_); test_sessions_[1].expectUpstreamWrite("hello"); recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello"); EXPECT_EQ(2, config_->stats().downstream_sess_total_.value()); @@ -231,14 +251,222 @@ cluster: fake_cluster filter_->onReceiveError(Api::IoError::IoErrorCode::UnknownError); EXPECT_EQ(1, config_->stats().downstream_sess_rx_errors_.value()); - expectSessionCreate(); + expectSessionCreate(upstream_address_); test_sessions_[0].expectUpstreamWrite("hello"); recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello"); checkTransferStats(5 /*rx_bytes*/, 1 /*rx_datagrams*/, 0 /*tx_bytes*/, 0 /*tx_datagrams*/); + EXPECT_EQ(5, cluster_manager_.thread_local_cluster_.cluster_.info_->stats_ + .upstream_cx_tx_bytes_total_.value()); - test_sessions_[0].recvDataFromUpstream("world2", EMSGSIZE); + test_sessions_[0].recvDataFromUpstream("world2", 0, EMSGSIZE); checkTransferStats(5 /*rx_bytes*/, 1 /*rx_datagrams*/, 0 /*tx_bytes*/, 0 /*tx_datagrams*/); + EXPECT_EQ(6, cluster_manager_.thread_local_cluster_.cluster_.info_->stats_ + .upstream_cx_rx_bytes_total_.value()); EXPECT_EQ(1, config_->stats().downstream_sess_tx_errors_.value()); + + test_sessions_[0].recvDataFromUpstream("world2", EMSGSIZE, 0); + checkTransferStats(5 /*rx_bytes*/, 1 /*rx_datagrams*/, 0 /*tx_bytes*/, 0 /*tx_datagrams*/); + EXPECT_EQ(6, cluster_manager_.thread_local_cluster_.cluster_.info_->stats_ + .upstream_cx_rx_bytes_total_.value()); + EXPECT_EQ(1, TestUtility::findCounter( + cluster_manager_.thread_local_cluster_.cluster_.info_->stats_store_, + "udp.sess_rx_errors") + ->value()); + + test_sessions_[0].expectUpstreamWrite("hello", EMSGSIZE); + recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello"); + checkTransferStats(10 /*rx_bytes*/, 2 /*rx_datagrams*/, 0 /*tx_bytes*/, 0 /*tx_datagrams*/); + EXPECT_EQ(5, cluster_manager_.thread_local_cluster_.cluster_.info_->stats_ + .upstream_cx_tx_bytes_total_.value()); + EXPECT_EQ(1, TestUtility::findCounter( + cluster_manager_.thread_local_cluster_.cluster_.info_->stats_store_, + "udp.sess_tx_errors") + ->value()); +} + +// No upstream host handling. +TEST_F(UdpProxyFilterTest, NoUpstreamHost) { + InSequence s; + + setup(R"EOF( +stat_prefix: foo +cluster: fake_cluster + )EOF"); + + EXPECT_CALL(cluster_manager_.thread_local_cluster_.lb_, chooseHost(_)).WillOnce(Return(nullptr)); + recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello"); + EXPECT_EQ(1, cluster_manager_.thread_local_cluster_.cluster_.info_->stats_ + .upstream_cx_none_healthy_.value()); +} + +// No cluster at filter creation. +TEST_F(UdpProxyFilterTest, NoUpstreamClusterAtCreation) { + InSequence s; + + setup(R"EOF( +stat_prefix: foo +cluster: fake_cluster + )EOF", + false); + + recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello"); + EXPECT_EQ(1, config_->stats().downstream_sess_no_route_.value()); +} + +// Dynamic cluster addition and removal handling. +TEST_F(UdpProxyFilterTest, ClusterDynamicAddAndRemoval) { + InSequence s; + + setup(R"EOF( +stat_prefix: foo +cluster: fake_cluster + )EOF", + false); + + recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello"); + EXPECT_EQ(1, config_->stats().downstream_sess_no_route_.value()); + EXPECT_EQ(0, config_->stats().downstream_sess_total_.value()); + EXPECT_EQ(0, config_->stats().downstream_sess_active_.value()); + + // Add a cluster that we don't care about. + NiceMock other_thread_local_cluster; + other_thread_local_cluster.cluster_.info_->name_ = "other_cluster"; + cluster_update_callbacks_->onClusterAddOrUpdate(other_thread_local_cluster); + recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello"); + EXPECT_EQ(2, config_->stats().downstream_sess_no_route_.value()); + EXPECT_EQ(0, config_->stats().downstream_sess_total_.value()); + EXPECT_EQ(0, config_->stats().downstream_sess_active_.value()); + + // Now add the cluster we care about. + cluster_update_callbacks_->onClusterAddOrUpdate(cluster_manager_.thread_local_cluster_); + expectSessionCreate(upstream_address_); + test_sessions_[0].expectUpstreamWrite("hello"); + recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello"); + EXPECT_EQ(1, config_->stats().downstream_sess_total_.value()); + EXPECT_EQ(1, config_->stats().downstream_sess_active_.value()); + + // Remove a cluster we don't care about. + cluster_update_callbacks_->onClusterRemoval("other_cluster"); + + // Remove the cluster we do care about. This should purge all sessions. + cluster_update_callbacks_->onClusterRemoval("fake_cluster"); + EXPECT_EQ(0, config_->stats().downstream_sess_active_.value()); +} + +// Hitting the maximum per-cluster connection/session circuit breaker. +TEST_F(UdpProxyFilterTest, MaxSessionsCircuitBreaker) { + InSequence s; + + setup(R"EOF( +stat_prefix: foo +cluster: fake_cluster + )EOF"); + + // Allow only a single session. + cluster_manager_.thread_local_cluster_.cluster_.info_->resetResourceManager(1, 0, 0, 0, 0); + + expectSessionCreate(upstream_address_); + test_sessions_[0].expectUpstreamWrite("hello"); + recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello"); + EXPECT_EQ(1, config_->stats().downstream_sess_total_.value()); + EXPECT_EQ(1, config_->stats().downstream_sess_active_.value()); + + // This should hit the session circuit breaker. + recvDataFromDownstream("10.0.0.2:1000", "10.0.0.2:80", "hello"); + EXPECT_EQ( + 1, + cluster_manager_.thread_local_cluster_.cluster_.info_->stats_.upstream_cx_overflow_.value()); + EXPECT_EQ(1, config_->stats().downstream_sess_total_.value()); + EXPECT_EQ(1, config_->stats().downstream_sess_active_.value()); + + // Timing out the 1st session should allow us to create another. + test_sessions_[0].idle_timer_->invokeCallback(); + EXPECT_EQ(1, config_->stats().downstream_sess_total_.value()); + EXPECT_EQ(0, config_->stats().downstream_sess_active_.value()); + expectSessionCreate(upstream_address_); + test_sessions_[1].expectUpstreamWrite("hello"); + recvDataFromDownstream("10.0.0.2:1000", "10.0.0.2:80", "hello"); + EXPECT_EQ(2, config_->stats().downstream_sess_total_.value()); + EXPECT_EQ(1, config_->stats().downstream_sess_active_.value()); +} + +// Verify that all sessions for a host are removed when a host is removed. +TEST_F(UdpProxyFilterTest, RemoveHostSessions) { + InSequence s; + + setup(R"EOF( +stat_prefix: foo +cluster: fake_cluster + )EOF"); + + expectSessionCreate(upstream_address_); + test_sessions_[0].expectUpstreamWrite("hello"); + recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello"); + EXPECT_EQ(1, config_->stats().downstream_sess_total_.value()); + EXPECT_EQ(1, config_->stats().downstream_sess_active_.value()); + + cluster_manager_.thread_local_cluster_.cluster_.priority_set_.runUpdateCallbacks( + 0, {}, {cluster_manager_.thread_local_cluster_.lb_.host_}); + EXPECT_EQ(1, config_->stats().downstream_sess_total_.value()); + EXPECT_EQ(0, config_->stats().downstream_sess_active_.value()); + + expectSessionCreate(upstream_address_); + test_sessions_[1].expectUpstreamWrite("hello"); + recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello"); + EXPECT_EQ(2, config_->stats().downstream_sess_total_.value()); + EXPECT_EQ(1, config_->stats().downstream_sess_active_.value()); +} + +// In this case the host becomes unhealthy, but we get the same host back, so just keep using the +// current session. +TEST_F(UdpProxyFilterTest, HostUnhealthyPickSameHost) { + InSequence s; + + setup(R"EOF( +stat_prefix: foo +cluster: fake_cluster + )EOF"); + + expectSessionCreate(upstream_address_); + test_sessions_[0].expectUpstreamWrite("hello"); + recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello"); + EXPECT_EQ(1, config_->stats().downstream_sess_total_.value()); + EXPECT_EQ(1, config_->stats().downstream_sess_active_.value()); + + EXPECT_CALL(*cluster_manager_.thread_local_cluster_.lb_.host_, health()) + .WillRepeatedly(Return(Upstream::Host::Health::Unhealthy)); + test_sessions_[0].expectUpstreamWrite("hello"); + recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello"); +} + +// Make sure that we are able to create a new session if there is an available healthy host and +// our current host is unhealthy. +TEST_F(UdpProxyFilterTest, HostUnhealthyPickDifferentHost) { + InSequence s; + + setup(R"EOF( +stat_prefix: foo +cluster: fake_cluster + )EOF"); + + expectSessionCreate(upstream_address_); + test_sessions_[0].expectUpstreamWrite("hello"); + recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello"); + EXPECT_EQ(1, config_->stats().downstream_sess_total_.value()); + EXPECT_EQ(1, config_->stats().downstream_sess_active_.value()); + + EXPECT_CALL(*cluster_manager_.thread_local_cluster_.lb_.host_, health()) + .WillRepeatedly(Return(Upstream::Host::Health::Unhealthy)); + auto new_host = std::make_shared>(); + auto new_host_address = Network::Utility::parseInternetAddressAndPort("20.0.0.2:443"); + ON_CALL(*new_host, address()).WillByDefault(Return(new_host_address)); + ON_CALL(*new_host, health()).WillByDefault(Return(Upstream::Host::Health::Healthy)); + EXPECT_CALL(cluster_manager_.thread_local_cluster_.lb_, chooseHost(_)).WillOnce(Return(new_host)); + expectSessionCreate(new_host_address); + test_sessions_[1].expectUpstreamWrite("hello"); + recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello"); + EXPECT_EQ(2, config_->stats().downstream_sess_total_.value()); + EXPECT_EQ(1, config_->stats().downstream_sess_active_.value()); } } // namespace diff --git a/test/extensions/filters/udp/udp_proxy/udp_proxy_integration_test.cc b/test/extensions/filters/udp/udp_proxy/udp_proxy_integration_test.cc index 727b21ab0870..e70ee8a83114 100644 --- a/test/extensions/filters/udp/udp_proxy/udp_proxy_integration_test.cc +++ b/test/extensions/filters/udp/udp_proxy/udp_proxy_integration_test.cc @@ -92,8 +92,14 @@ class UdpProxyIntegrationTest : public testing::TestWithParamcounter("udp.foo.downstream_sess_rx_bytes")->value()); EXPECT_EQ(1, test_server_->counter("udp.foo.downstream_sess_rx_datagrams")->value()); + EXPECT_EQ(5, test_server_->counter("cluster.cluster_0.upstream_cx_tx_bytes_total")->value()); + EXPECT_EQ(1, test_server_->counter("cluster.cluster_0.udp.sess_tx_datagrams")->value()); + + EXPECT_EQ(6, test_server_->counter("cluster.cluster_0.upstream_cx_rx_bytes_total")->value()); + EXPECT_EQ(1, test_server_->counter("cluster.cluster_0.udp.sess_rx_datagrams")->value()); EXPECT_EQ(6, test_server_->counter("udp.foo.downstream_sess_tx_bytes")->value()); EXPECT_EQ(1, test_server_->counter("udp.foo.downstream_sess_tx_datagrams")->value()); + EXPECT_EQ(1, test_server_->counter("udp.foo.downstream_sess_total")->value()); EXPECT_EQ(1, test_server_->gauge("udp.foo.downstream_sess_active")->value()); }