Skip to content

Commit

Permalink
Add initial statsd reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
faern committed Dec 13, 2023
1 parent 104ccaf commit bdfa6ca
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 2 deletions.
29 changes: 29 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ opt-level = 3
lto = true
codegen-units = 1

[features]
# Enable this feature to make it possible to have tcp2udp report metrics over statsd
statsd = ["cadence"]

[dependencies]
tokio = { version = "1.0", features = ["rt-multi-thread", "macros", "net", "time", "io-util"] }
err-context = "0.1.0"
Expand All @@ -32,6 +36,7 @@ lazy_static = "1.4.0"
# Only used by the binaries in src/bin/ and is optional so it's not
# pulled in when built as a library.
env_logger = { version = "0.10.0", optional = true }
cadence = { version = "1.0.0", optional = true }

[target.'cfg(target_os = "linux")'.dependencies]
nix = { version = "0.27.1", features = ["socket"] }
12 changes: 11 additions & 1 deletion src/bin/tcp2udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ pub struct Options {

#[clap(flatten)]
tcp2udp_options: tcp2udp::Options,

#[cfg(feature = "statsd")]
/// Host to send statsd metrics to.
#[clap(long)]
statsd_host: Option<std::net::SocketAddr>,
}

fn main() {
Expand All @@ -28,10 +33,15 @@ fn main() {

let options = Options::parse();

#[cfg(feature = "statsd")]
let statsd_host = options.statsd_host;
#[cfg(not(feature = "statsd"))]
let statsd_host = None;

let runtime = create_runtime(options.threads);

let error = runtime
.block_on(tcp2udp::run(options.tcp2udp_options))
.block_on(tcp2udp::run(options.tcp2udp_options, statsd_host))
.into_error();
log::error!("Error: {}", error.display("\nCaused by: "));
std::process::exit(1);
Expand Down
108 changes: 108 additions & 0 deletions src/statsd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#[cfg(feature = "statsd")]
use cadence::{CountedExt, Gauged, StatsdClient};
#[cfg(feature = "statsd")]
use std::sync::atomic::{AtomicU64, Ordering};

/// Queue with a maximum capacity of 8K events.
/// This program is extremely unlikely to ever reach that upper bound.
/// The bound is still here so that if it ever were to happen, we drop events
/// instead of indefinitely filling the memory with unsent events.
const QUEUE_SIZE: usize = 8 * 1024;

Check failure on line 10 in src/statsd.rs

View workflow job for this annotation

GitHub Actions / clippy

constant `QUEUE_SIZE` is never used

error: constant `QUEUE_SIZE` is never used --> src/statsd.rs:10:7 | 10 | const QUEUE_SIZE: usize = 8 * 1024; | ^^^^^^^^^^ | = note: `-D dead-code` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(dead_code)]`

const PREFIX: &str = "tcp2udp";

Check failure on line 12 in src/statsd.rs

View workflow job for this annotation

GitHub Actions / clippy

constant `PREFIX` is never used

error: constant `PREFIX` is never used --> src/statsd.rs:12:7 | 12 | const PREFIX: &str = "tcp2udp"; | ^^^^^^

#[cfg(feature = "statsd")]
pub struct StatsdMetrics(Option<StatsdMetricsImpl>);
#[cfg(not(feature = "statsd"))]
pub struct StatsdMetrics(Option<()>);

impl StatsdMetrics {
/// Creates a dummy statsd metrics instance. Does not actually connect to any statds
/// server, nor emits any events. Used as an API compatible drop in when metrics
/// should not be emitted.
pub fn dummy() -> Self {
Self(None)
}

/// Creates a statsd metric reporting instance connecting to the given host addr.
#[cfg(feature = "statsd")]
pub fn real(host: std::net::SocketAddr) -> Self {
Self(Some(StatsdMetricsImpl::new(host)))
}

/// Emit a metric saying we failed to accept an incoming TCP connection (probably ran out of file descriptors)
pub fn accept_error(&self) {
#[cfg(feature = "statsd")]
if let Some(statsd) = &self.0 {
statsd.accept_error()
}
}

/// Increment the connection counter inside this metrics instance and emit that new gauge value
pub fn incr_connections(&self) {
#[cfg(feature = "statsd")]
if let Some(statsd) = &self.0 {
statsd.incr_connections()
}
}

/// Decrement the connection counter inside this metrics instance and emit that new gauge value
pub fn decr_connections(&self) {
#[cfg(feature = "statsd")]
if let Some(statsd) = &self.0 {
statsd.decr_connections()
}
}
}

#[cfg(feature = "statsd")]
struct StatsdMetricsImpl {
client: StatsdClient,
num_connections: AtomicU64,
}

#[cfg(feature = "statsd")]
impl StatsdMetricsImpl {
pub fn new(host: std::net::SocketAddr) -> Self {
use cadence::{UdpMetricSink, QueuingMetricSink};

let socket = std::net::UdpSocket::bind("0.0.0.0:0").unwrap();
log::debug!("Statsd socket bound to {}", socket.local_addr().unwrap());

// Create a non-buffered blocking metrics sink. It is important that it's not buffered,
// so events are emitted instantly when they happen (this program does not emit a lot of
// events, nor does it attach timestamps to the events.
// The fact that it's blocking does not matter, since the `QueuingMetricSink` will make sure
// the `UdpMetricSink` runs in its own thread anyway.
let udp_sink = UdpMetricSink::from(host, socket).unwrap();
let queuing_sink = QueuingMetricSink::with_capacity(udp_sink, QUEUE_SIZE);
let statds_client = StatsdClient::from_sink(PREFIX, queuing_sink);
Self {
client: statds_client,
num_connections: AtomicU64::new(0),
}
}

pub fn accept_error(&self) {
log::debug!("Sending statsd tcp_accept_errors");
if let Err(e) = self.client.incr("tcp_accept_errors") {
log::error!("Failed to emit statsd tcp_accept_errors: {e}");
}
}

pub fn incr_connections(&self) {
let num_connections = self.num_connections.fetch_add(1, Ordering::SeqCst) + 1;
log::debug!("Sending statsd num_connections = {num_connections}");
if let Err(e) = self.client.gauge("num_connections", num_connections) {
log::error!("Failed to emit statsd num_connections: {e}");
}
}

pub fn decr_connections(&self) {
let num_connections = self.num_connections.fetch_sub(1, Ordering::SeqCst) - 1;
log::debug!("Sending statsd num_connections = {num_connections}");
if let Err(e) = self.client.gauge("num_connections", num_connections) {
log::error!("Failed to emit statsd num_connections: {e}");
}
}
}
32 changes: 31 additions & 1 deletion src/tcp2udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ use std::convert::Infallible;
use std::fmt;
use std::io;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use std::time::Duration;
use tokio::net::{TcpListener, TcpSocket, TcpStream, UdpSocket};
use tokio::time::sleep;

#[path = "statsd.rs"]
mod statsd;

#[derive(Debug)]
#[cfg_attr(feature = "clap", derive(clap::Parser))]
#[cfg_attr(feature = "clap", group(skip))]
Expand Down Expand Up @@ -85,7 +89,14 @@ impl std::error::Error for Tcp2UdpError {
/// If binding a listening socket fails this returns an error. Otherwise the function
/// will continue indefinitely to accept incoming connections and forward to UDP.
/// Errors are just logged.
pub async fn run(options: Options) -> Result<Infallible, Tcp2UdpError> {
///
/// If the `statsd` feature is enabled, and a socket address is passed in `statsd_host`,
/// metrics will be sent to it. If this argument is not `None` but the feature is disabled,
/// it will just be ignored.
pub async fn run(
options: Options,
statsd_host: Option<SocketAddr>,
) -> Result<Infallible, Tcp2UdpError> {
if options.tcp_listen_addrs.is_empty() {
return Err(Tcp2UdpError::NoTcpListenAddrs);
}
Expand All @@ -98,6 +109,17 @@ pub async fn run(options: Options) -> Result<Infallible, Tcp2UdpError> {
}
});

let statsd = Arc::new(match statsd_host {
None => statsd::StatsdMetrics::dummy(),
#[cfg(feature = "statsd")]
Some(statsd_host) => statsd::StatsdMetrics::real(statsd_host),
#[cfg(not(feature = "statsd"))]
Some(_) => {
log::warn!("Compiled without statsd support, ignoring statsd argument");
statsd::StatsdMetrics::dummy()
},
});

let mut join_handles = Vec::with_capacity(options.tcp_listen_addrs.len());
for tcp_listen_addr in options.tcp_listen_addrs {
let tcp_listener = create_listening_socket(tcp_listen_addr, &options.tcp_options)?;
Expand All @@ -106,13 +128,15 @@ pub async fn run(options: Options) -> Result<Infallible, Tcp2UdpError> {
let udp_forward_addr = options.udp_forward_addr;
let tcp_recv_timeout = options.tcp_options.recv_timeout;
let tcp_nodelay = options.tcp_options.nodelay;
let statsd = Arc::clone(&statsd);
join_handles.push(tokio::spawn(async move {
process_tcp_listener(
tcp_listener,
udp_bind_ip,
udp_forward_addr,
tcp_recv_timeout,
tcp_nodelay,
statsd,
)
.await;
}));
Expand Down Expand Up @@ -150,6 +174,7 @@ async fn process_tcp_listener(
udp_forward_addr: SocketAddr,
tcp_recv_timeout: Option<Duration>,
tcp_nodelay: bool,
statsd: Arc<statsd::StatsdMetrics>,
) -> ! {
let mut cooldown =
ExponentialBackoff::new(Duration::from_millis(50), Duration::from_millis(5000));
Expand All @@ -160,7 +185,9 @@ async fn process_tcp_listener(
if let Err(error) = crate::tcp_options::set_nodelay(&tcp_stream, tcp_nodelay) {
log::error!("Error: {}", error.display("\nCaused by: "));
}
let statsd = statsd.clone();
tokio::spawn(async move {
statsd.incr_connections();
if let Err(error) = process_socket(
tcp_stream,
tcp_peer_addr,
Expand All @@ -172,12 +199,15 @@ async fn process_tcp_listener(
{
log::error!("Error: {}", error.display("\nCaused by: "));
}
statsd.decr_connections();
});
cooldown.reset();
}
Err(error) => {
log::error!("Error when accepting incoming TCP connection: {}", error);

statsd.accept_error();

// If the process runs out of file descriptors, it will fail to accept a socket.
// But that socket will also remain in the queue, so it will fail again immediately.
// This will busy loop consuming the CPU and filling any logs. To prevent this,
Expand Down

0 comments on commit bdfa6ca

Please sign in to comment.