diff --git a/Cargo.lock b/Cargo.lock index 85f955b..188259c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -107,6 +107,15 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" +[[package]] +name = "cadence" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eab51a759f502097abe855100b81b421d3a104b62a2c3209f751d90ce6dd2ea1" +dependencies = [ + "crossbeam-channel", +] + [[package]] name = "cc" version = "1.0.83" @@ -168,6 +177,25 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "crossbeam-channel" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" +dependencies = [ + "cfg-if", +] + [[package]] name = "env_logger" version = "0.10.0" @@ -577,6 +605,7 @@ dependencies = [ name = "udp-over-tcp" version = "0.3.1" dependencies = [ + "cadence", "clap", "env_logger", "err-context", diff --git a/Cargo.toml b/Cargo.toml index 3b63a22..6f7af5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,10 @@ opt-level = 3 lto = true codegen-units = 1 +[features] +# Enable this feature to make it possible to have tcp2udp report metrics over statsd +statsd = ["cadence"] + [dependencies] tokio = { version = "1.0", features = ["rt-multi-thread", "macros", "net", "time", "io-util"] } err-context = "0.1.0" @@ -32,6 +36,7 @@ lazy_static = "1.4.0" # Only used by the binaries in src/bin/ and is optional so it's not # pulled in when built as a library. env_logger = { version = "0.10.0", optional = true } +cadence = { version = "1.0.0", optional = true } [target.'cfg(target_os = "linux")'.dependencies] nix = { version = "0.27.1", features = ["socket"] } diff --git a/src/bin/tcp2udp.rs b/src/bin/tcp2udp.rs index 5ff1819..37ac308 100644 --- a/src/bin/tcp2udp.rs +++ b/src/bin/tcp2udp.rs @@ -20,6 +20,11 @@ pub struct Options { #[clap(flatten)] tcp2udp_options: tcp2udp::Options, + + #[cfg(feature = "statsd")] + /// Host to send statsd metrics to. + #[clap(long)] + statsd_host: Option, } fn main() { @@ -28,10 +33,15 @@ fn main() { let options = Options::parse(); + #[cfg(feature = "statsd")] + let statsd_host = options.statsd_host; + #[cfg(not(feature = "statsd"))] + let statsd_host = None; + let runtime = create_runtime(options.threads); let error = runtime - .block_on(tcp2udp::run(options.tcp2udp_options)) + .block_on(tcp2udp::run(options.tcp2udp_options, statsd_host)) .into_error(); log::error!("Error: {}", error.display("\nCaused by: ")); std::process::exit(1); diff --git a/src/statsd.rs b/src/statsd.rs new file mode 100644 index 0000000..3edb7ca --- /dev/null +++ b/src/statsd.rs @@ -0,0 +1,108 @@ +#[cfg(feature = "statsd")] +use cadence::{CountedExt, Gauged, StatsdClient}; +#[cfg(feature = "statsd")] +use std::sync::atomic::{AtomicU64, Ordering}; + +/// Queue with a maximum capacity of 8K events. +/// This program is extremely unlikely to ever reach that upper bound. +/// The bound is still here so that if it ever were to happen, we drop events +/// instead of indefinitely filling the memory with unsent events. +const QUEUE_SIZE: usize = 8 * 1024; + +const PREFIX: &str = "tcp2udp"; + +#[cfg(feature = "statsd")] +pub struct StatsdMetrics(Option); +#[cfg(not(feature = "statsd"))] +pub struct StatsdMetrics(Option<()>); + +impl StatsdMetrics { + /// Creates a dummy statsd metrics instance. Does not actually connect to any statds + /// server, nor emits any events. Used as an API compatible drop in when metrics + /// should not be emitted. + pub fn dummy() -> Self { + Self(None) + } + + /// Creates a statsd metric reporting instance connecting to the given host addr. + #[cfg(feature = "statsd")] + pub fn real(host: std::net::SocketAddr) -> Self { + Self(Some(StatsdMetricsImpl::new(host))) + } + + /// Emit a metric saying we failed to accept an incoming TCP connection (probably ran out of file descriptors) + pub fn accept_error(&self) { + #[cfg(feature = "statsd")] + if let Some(statsd) = &self.0 { + statsd.accept_error() + } + } + + /// Increment the connection counter inside this metrics instance and emit that new gauge value + pub fn incr_connections(&self) { + #[cfg(feature = "statsd")] + if let Some(statsd) = &self.0 { + statsd.incr_connections() + } + } + + /// Decrement the connection counter inside this metrics instance and emit that new gauge value + pub fn decr_connections(&self) { + #[cfg(feature = "statsd")] + if let Some(statsd) = &self.0 { + statsd.decr_connections() + } + } +} + +#[cfg(feature = "statsd")] +struct StatsdMetricsImpl { + client: StatsdClient, + num_connections: AtomicU64, +} + +#[cfg(feature = "statsd")] +impl StatsdMetricsImpl { + pub fn new(host: std::net::SocketAddr) -> Self { + use cadence::{UdpMetricSink, QueuingMetricSink}; + + let socket = std::net::UdpSocket::bind("0.0.0.0:0").unwrap(); + log::debug!("Statsd socket bound to {}", socket.local_addr().unwrap()); + + // Create a non-buffered blocking metrics sink. It is important that it's not buffered, + // so events are emitted instantly when they happen (this program does not emit a lot of + // events, nor does it attach timestamps to the events. + // The fact that it's blocking does not matter, since the `QueuingMetricSink` will make sure + // the `UdpMetricSink` runs in its own thread anyway. + let udp_sink = UdpMetricSink::from(host, socket).unwrap(); + let queuing_sink = QueuingMetricSink::with_capacity(udp_sink, QUEUE_SIZE); + let statds_client = StatsdClient::from_sink(PREFIX, queuing_sink); + Self { + client: statds_client, + num_connections: AtomicU64::new(0), + } + } + + pub fn accept_error(&self) { + log::debug!("Sending statsd tcp_accept_errors"); + if let Err(e) = self.client.incr("tcp_accept_errors") { + log::error!("Failed to emit statsd tcp_accept_errors: {e}"); + } + } + + pub fn incr_connections(&self) { + let num_connections = self.num_connections.fetch_add(1, Ordering::SeqCst) + 1; + log::debug!("Sending statsd num_connections = {num_connections}"); + if let Err(e) = self.client.gauge("num_connections", num_connections) { + log::error!("Failed to emit statsd num_connections: {e}"); + } + } + + pub fn decr_connections(&self) { + let num_connections = self.num_connections.fetch_sub(1, Ordering::SeqCst) - 1; + log::debug!("Sending statsd num_connections = {num_connections}"); + if let Err(e) = self.client.gauge("num_connections", num_connections) { + log::error!("Failed to emit statsd num_connections: {e}"); + } + } +} diff --git a/src/tcp2udp.rs b/src/tcp2udp.rs index 49dac39..498188c 100644 --- a/src/tcp2udp.rs +++ b/src/tcp2udp.rs @@ -8,10 +8,14 @@ use std::convert::Infallible; use std::fmt; use std::io; use std::net::{IpAddr, SocketAddr}; +use std::sync::Arc; use std::time::Duration; use tokio::net::{TcpListener, TcpSocket, TcpStream, UdpSocket}; use tokio::time::sleep; +#[path = "statsd.rs"] +mod statsd; + #[derive(Debug)] #[cfg_attr(feature = "clap", derive(clap::Parser))] #[cfg_attr(feature = "clap", group(skip))] @@ -85,7 +89,14 @@ impl std::error::Error for Tcp2UdpError { /// If binding a listening socket fails this returns an error. Otherwise the function /// will continue indefinitely to accept incoming connections and forward to UDP. /// Errors are just logged. -pub async fn run(options: Options) -> Result { +/// +/// If the `statsd` feature is enabled, and a socket address is passed in `statsd_host`, +/// metrics will be sent to it. If this argument is not `None` but the feature is disabled, +/// it will just be ignored. +pub async fn run( + options: Options, + statsd_host: Option, +) -> Result { if options.tcp_listen_addrs.is_empty() { return Err(Tcp2UdpError::NoTcpListenAddrs); } @@ -98,6 +109,17 @@ pub async fn run(options: Options) -> Result { } }); + let statsd = Arc::new(match statsd_host { + None => statsd::StatsdMetrics::dummy(), + #[cfg(feature = "statsd")] + Some(statsd_host) => statsd::StatsdMetrics::real(statsd_host), + #[cfg(not(feature = "statsd"))] + Some(_) => { + log::warn!("Compiled without statsd support, ignoring statsd argument"); + statsd::StatsdMetrics::dummy() + }, + }); + let mut join_handles = Vec::with_capacity(options.tcp_listen_addrs.len()); for tcp_listen_addr in options.tcp_listen_addrs { let tcp_listener = create_listening_socket(tcp_listen_addr, &options.tcp_options)?; @@ -106,6 +128,7 @@ pub async fn run(options: Options) -> Result { let udp_forward_addr = options.udp_forward_addr; let tcp_recv_timeout = options.tcp_options.recv_timeout; let tcp_nodelay = options.tcp_options.nodelay; + let statsd = Arc::clone(&statsd); join_handles.push(tokio::spawn(async move { process_tcp_listener( tcp_listener, @@ -113,6 +136,7 @@ pub async fn run(options: Options) -> Result { udp_forward_addr, tcp_recv_timeout, tcp_nodelay, + statsd, ) .await; })); @@ -150,6 +174,7 @@ async fn process_tcp_listener( udp_forward_addr: SocketAddr, tcp_recv_timeout: Option, tcp_nodelay: bool, + statsd: Arc, ) -> ! { let mut cooldown = ExponentialBackoff::new(Duration::from_millis(50), Duration::from_millis(5000)); @@ -160,7 +185,9 @@ async fn process_tcp_listener( if let Err(error) = crate::tcp_options::set_nodelay(&tcp_stream, tcp_nodelay) { log::error!("Error: {}", error.display("\nCaused by: ")); } + let statsd = statsd.clone(); tokio::spawn(async move { + statsd.incr_connections(); if let Err(error) = process_socket( tcp_stream, tcp_peer_addr, @@ -172,12 +199,15 @@ async fn process_tcp_listener( { log::error!("Error: {}", error.display("\nCaused by: ")); } + statsd.decr_connections(); }); cooldown.reset(); } Err(error) => { log::error!("Error when accepting incoming TCP connection: {}", error); + statsd.accept_error(); + // If the process runs out of file descriptors, it will fail to accept a socket. // But that socket will also remain in the queue, so it will fail again immediately. // This will busy loop consuming the CPU and filling any logs. To prevent this,