From bcdde40ff8aaaba6b3a0c80bc820eb060c1e75b7 Mon Sep 17 00:00:00 2001 From: FujiApple Date: Wed, 24 Jul 2024 21:54:20 +0800 Subject: [PATCH] feat(core): do not crash the tracer for some socket errors (#1255) --- crates/trippy-core/src/error.rs | 4 ++ crates/trippy-core/src/net/common.rs | 19 ++++++++- crates/trippy-core/src/net/ipv4.rs | 25 ++++++++--- crates/trippy-core/src/net/platform/unix.rs | 6 +++ .../trippy-core/src/net/platform/windows.rs | 18 +++++++- crates/trippy-core/src/probe.rs | 41 +++++++++++++++++++ crates/trippy-core/src/state.rs | 25 +++++++++++ crates/trippy-core/src/strategy.rs | 40 ++++++++++++++++-- 8 files changed, 166 insertions(+), 12 deletions(-) diff --git a/crates/trippy-core/src/error.rs b/crates/trippy-core/src/error.rs index 43d55574..7d80353f 100644 --- a/crates/trippy-core/src/error.rs +++ b/crates/trippy-core/src/error.rs @@ -19,6 +19,8 @@ pub enum Error { BadConfig(String), #[error("IO error: {0}")] IoError(#[from] IoError), + #[error("Probe failed to send: {0}")] + ProbeFailed(IoError), #[error("insufficient buffer capacity")] InsufficientCapacity, #[error("address {0} in use")] @@ -66,6 +68,8 @@ impl IoError { #[derive(Debug, Eq, PartialEq)] pub enum ErrorKind { InProgress, + HostUnreachable, + NetUnreachable, Std(io::ErrorKind), } diff --git a/crates/trippy-core/src/net/common.rs b/crates/trippy-core/src/net/common.rs index 39793998..3ca0a6cf 100644 --- a/crates/trippy-core/src/net/common.rs +++ b/crates/trippy-core/src/net/common.rs @@ -11,7 +11,7 @@ impl ErrorMapper { match err { Error::IoError(io_err) => match io_err.kind() { ErrorKind::InProgress => Ok(()), - ErrorKind::Std(_) => Err(Error::IoError(io_err)), + _ => Err(Error::IoError(io_err)), }, err => Err(err), } @@ -28,6 +28,15 @@ impl ErrorMapper { err => err, } } + + /// Convert a given [`ErrorKind`] to [`Error::ProbeFailed`]. + #[allow(clippy::needless_pass_by_value)] + pub fn probe_failed(err: Error, kind: ErrorKind) -> Error { + match err { + Error::IoError(io_err) if io_err.kind() == kind => Error::ProbeFailed(io_err), + _ => err, + } + } } #[cfg(test)] @@ -68,4 +77,12 @@ mod tests { let addr_in_use_err = ErrorMapper::addr_in_use(err, ADDR); assert!(matches!(addr_in_use_err, Error::IoError(_))); } + + #[test] + fn test_probe_failed() { + let io_err = io::Error::from(ErrorKind::HostUnreachable); + let err = Error::IoError(IoError::Bind(io_err, ADDR)); + let probe_err = ErrorMapper::probe_failed(err, ErrorKind::HostUnreachable); + assert!(matches!(probe_err, Error::ProbeFailed(_))); + } } diff --git a/crates/trippy-core/src/net/ipv4.rs b/crates/trippy-core/src/net/ipv4.rs index cf26e17f..8ddc4d7a 100644 --- a/crates/trippy-core/src/net/ipv4.rs +++ b/crates/trippy-core/src/net/ipv4.rs @@ -108,7 +108,12 @@ impl Ipv4 { echo_request.packet(), )?; let remote_addr = SocketAddr::new(IpAddr::V4(self.dest_addr), 0); - icmp_send_socket.send_to(ipv4.packet(), remote_addr)?; + icmp_send_socket + .send_to(ipv4.packet(), remote_addr) + .map_err(Error::IoError) + .map_err(|err| ErrorMapper::probe_failed(err, ErrorKind::HostUnreachable)) + .map_err(|err| ErrorMapper::probe_failed(err, ErrorKind::NetUnreachable)) + .map_err(|err| ErrorMapper::probe_failed(err, INVALID_INPUT_KIND))?; Ok(()) } @@ -168,7 +173,11 @@ impl Ipv4 { udp.packet(), )?; let remote_addr = SocketAddr::new(IpAddr::V4(self.dest_addr), probe.dest_port.0); - raw_send_socket.send_to(ipv4.packet(), remote_addr)?; + raw_send_socket + .send_to(ipv4.packet(), remote_addr) + .map_err(Error::IoError) + .map_err(|err| ErrorMapper::probe_failed(err, ErrorKind::HostUnreachable)) + .map_err(|err| ErrorMapper::probe_failed(err, ErrorKind::NetUnreachable))?; Ok(()) } @@ -182,7 +191,8 @@ impl Ipv4 { .bind(local_addr) .map_err(Error::IoError) .or_else(ErrorMapper::in_progress) - .map_err(|err| ErrorMapper::addr_in_use(err, local_addr))?; + .map_err(|err| ErrorMapper::addr_in_use(err, local_addr)) + .map_err(|err| ErrorMapper::probe_failed(err, ADDR_NOT_AVAILABLE_KIND))?; socket.set_ttl(u32::from(probe.ttl.0))?; socket.send_to(payload, remote_addr)?; Ok(()) @@ -197,7 +207,8 @@ impl Ipv4 { .bind(local_addr) .map_err(Error::IoError) .or_else(ErrorMapper::in_progress) - .map_err(|err| ErrorMapper::addr_in_use(err, local_addr))?; + .map_err(|err| ErrorMapper::addr_in_use(err, local_addr)) + .map_err(|err| ErrorMapper::probe_failed(err, ADDR_NOT_AVAILABLE_KIND))?; socket.set_ttl(u32::from(probe.ttl.0))?; socket.set_tos(u32::from(self.tos.0))?; let remote_addr = SocketAddr::new(IpAddr::V4(self.dest_addr), probe.dest_port.0); @@ -205,7 +216,8 @@ impl Ipv4 { .connect(remote_addr) .map_err(Error::IoError) .or_else(ErrorMapper::in_progress) - .map_err(|err| ErrorMapper::addr_in_use(err, remote_addr))?; + .map_err(|err| ErrorMapper::addr_in_use(err, remote_addr)) + .map_err(|err| ErrorMapper::probe_failed(err, ErrorKind::NetUnreachable))?; Ok(socket) } @@ -458,6 +470,9 @@ impl Ipv4 { } } +const ADDR_NOT_AVAILABLE_KIND: ErrorKind = ErrorKind::Std(io::ErrorKind::AddrNotAvailable); +const INVALID_INPUT_KIND: ErrorKind = ErrorKind::Std(io::ErrorKind::InvalidInput); + const fn icmp_payload_size(packet_size: usize) -> usize { let ip_header_size = Ipv4Packet::minimum_packet_size(); let icmp_header_size = IcmpPacket::minimum_packet_size(); diff --git a/crates/trippy-core/src/net/platform/unix.rs b/crates/trippy-core/src/net/platform/unix.rs index 19f6e1b1..b720487b 100644 --- a/crates/trippy-core/src/net/platform/unix.rs +++ b/crates/trippy-core/src/net/platform/unix.rs @@ -491,6 +491,10 @@ mod socket { fn from(value: &io::Error) -> Self { if value.raw_os_error() == io::Error::from(Error::EINPROGRESS).raw_os_error() { Self::InProgress + } else if value.raw_os_error() == io::Error::from(Error::EHOSTUNREACH).raw_os_error() { + Self::HostUnreachable + } else if value.raw_os_error() == io::Error::from(Error::ENETUNREACH).raw_os_error() { + Self::NetUnreachable } else { Self::Std(value.kind()) } @@ -502,6 +506,8 @@ mod socket { fn from(value: ErrorKind) -> Self { match value { ErrorKind::InProgress => Self::from(Error::EINPROGRESS), + ErrorKind::HostUnreachable => Self::from(Error::EHOSTUNREACH), + ErrorKind::NetUnreachable => Self::from(Error::ENETUNREACH), ErrorKind::Std(kind) => Self::from(kind), } } diff --git a/crates/trippy-core/src/net/platform/windows.rs b/crates/trippy-core/src/net/platform/windows.rs index c1c135f9..346bf6f4 100644 --- a/crates/trippy-core/src/net/platform/windows.rs +++ b/crates/trippy-core/src/net/platform/windows.rs @@ -20,8 +20,8 @@ use windows_sys::Win32::Networking::WinSock::{ IN_ADDR_0, IPPROTO_RAW, IPPROTO_TCP, SIO_ROUTING_INTERFACE_QUERY, SOCKADDR_IN, SOCKADDR_IN6, SOCKADDR_IN6_0, SOCKADDR_STORAGE, SOCKET_ERROR, SOL_SOCKET, SO_ERROR, SO_PORT_SCALABILITY, SO_REUSE_UNICASTPORT, TCP_FAIL_CONNECT_ON_ICMP_ERROR, TCP_ICMP_ERROR_INFO, WSABUF, WSADATA, - WSAEADDRNOTAVAIL, WSAECONNREFUSED, WSAEHOSTUNREACH, WSAEINPROGRESS, WSA_IO_INCOMPLETE, - WSA_IO_PENDING, + WSAEADDRNOTAVAIL, WSAECONNREFUSED, WSAEHOSTUNREACH, WSAEINPROGRESS, WSAENETUNREACH, WSAENOBUFS, + WSA_IO_INCOMPLETE, WSA_IO_PENDING, }; use windows_sys::Win32::System::IO::OVERLAPPED; @@ -590,10 +590,22 @@ impl Socket for SocketImpl { } } +// TODO we handle WSAENOBUFS as NetUnreachable, maybe it needs a separate error type? impl From<&StdIoError> for ErrorKind { fn from(value: &StdIoError) -> Self { + #[allow(clippy::if_same_then_else)] if value.raw_os_error() == StdIoError::from_raw_os_error(WSAEINPROGRESS).raw_os_error() { Self::InProgress + } else if value.raw_os_error() + == StdIoError::from_raw_os_error(WSAEHOSTUNREACH).raw_os_error() + { + Self::HostUnreachable + } else if value.raw_os_error() + == StdIoError::from_raw_os_error(WSAENETUNREACH).raw_os_error() + { + Self::NetUnreachable + } else if value.raw_os_error() == StdIoError::from_raw_os_error(WSAENOBUFS).raw_os_error() { + Self::NetUnreachable } else { Self::Std(value.kind()) } @@ -605,6 +617,8 @@ impl From for StdIoError { fn from(value: ErrorKind) -> Self { match value { ErrorKind::InProgress => Self::from_raw_os_error(WSAEINPROGRESS), + ErrorKind::HostUnreachable => Self::from_raw_os_error(WSAEHOSTUNREACH), + ErrorKind::NetUnreachable => Self::from_raw_os_error(WSAENETUNREACH), ErrorKind::Std(kind) => Self::from(kind), } } diff --git a/crates/trippy-core/src/probe.rs b/crates/trippy-core/src/probe.rs index 21d34cac..b2688086 100644 --- a/crates/trippy-core/src/probe.rs +++ b/crates/trippy-core/src/probe.rs @@ -24,6 +24,11 @@ pub enum ProbeStatus { /// port. When a probe is skipped, it will be marked as `Skipped` and a /// new probe will be sent with the same TTL next available sequence number. Skipped, + /// The probe has failed. + /// + /// A probe is considered failed when an error occurs while sending or + /// receiving. + Failed(ProbeFailed), /// The probe has been sent and is awaiting a response. /// /// If no response is received within the timeout, the probe will remain @@ -110,6 +115,20 @@ impl Probe { extensions, } } + + /// The probe has failed to send. + #[must_use] + pub(crate) const fn failed(self) -> ProbeFailed { + ProbeFailed { + sequence: self.sequence, + identifier: self.identifier, + src_port: self.src_port, + dest_port: self.dest_port, + ttl: self.ttl, + round: self.round, + sent: self.sent, + } + } } /// A complete network tracing probe. @@ -153,6 +172,28 @@ pub struct ProbeComplete { pub extensions: Option, } +/// A failed network tracing probe. +/// +/// A probe is considered failed when an error occurs while sending or +/// receiving. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ProbeFailed { + /// The sequence of the probe. + pub sequence: Sequence, + /// The trace identifier. + pub identifier: TraceId, + /// The source port (UDP/TCP only) + pub src_port: Port, + /// The destination port (UDP/TCP only) + pub dest_port: Port, + /// The TTL of the probe. + pub ttl: TimeToLive, + /// Which round the probe belongs to. + pub round: RoundId, + /// Timestamp when the probe was sent. + pub sent: SystemTime, +} + /// The type of ICMP packet received. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum IcmpPacketType { diff --git a/crates/trippy-core/src/state.rs b/crates/trippy-core/src/state.rs index 05ed6ff6..fd6b0d15 100644 --- a/crates/trippy-core/src/state.rs +++ b/crates/trippy-core/src/state.rs @@ -169,6 +169,8 @@ pub struct Hop { total_sent: usize, /// The total probes received for this hop. total_recv: usize, + /// The total probes that failed for this hop. + total_failed: usize, /// The total round trip time for this hop across all rounds. total_time: Duration, /// The round trip time for this hop in the current round. @@ -237,6 +239,12 @@ impl Hop { self.total_recv } + /// The total number of probes that failed. + #[must_use] + pub const fn total_failed(&self) -> usize { + self.total_failed + } + /// The % of packets that are lost. #[must_use] pub fn loss_pct(&self) -> f64 { @@ -359,6 +367,7 @@ impl Default for Hop { addrs: IndexMap::default(), total_sent: 0, total_recv: 0, + total_failed: 0, total_time: Duration::default(), last: None, best: None, @@ -533,6 +542,21 @@ impl FlowState { self.hops[index].last_dest_port = awaited.dest_port.0; self.hops[index].last_sequence = awaited.sequence.0; } + ProbeStatus::Failed(failed) => { + self.update_lowest_ttl(failed.ttl); + self.update_round(failed.round); + let index = usize::from(failed.ttl.0) - 1; + self.hops[index].total_sent += 1; + self.hops[index].total_failed += 1; + self.hops[index].ttl = failed.ttl.0; + self.hops[index].samples.insert(0, Duration::default()); + if self.hops[index].samples.len() > self.max_samples { + self.hops[index].samples.pop(); + } + self.hops[index].last_src_port = failed.src_port.0; + self.hops[index].last_dest_port = failed.dest_port.0; + self.hops[index].last_sequence = failed.sequence.0; + } ProbeStatus::NotSent | ProbeStatus::Skipped => {} } } @@ -712,6 +736,7 @@ mod tests { Self::Skipped => Self::Skipped, Self::Awaited(awaited) => Self::Awaited(Probe { round, ..awaited }), Self::Complete(completed) => Self::Complete(ProbeComplete { round, ..completed }), + Self::Failed(_) => todo!(), } } } diff --git a/crates/trippy-core/src/strategy.rs b/crates/trippy-core/src/strategy.rs index 4692412e..4499628b 100644 --- a/crates/trippy-core/src/strategy.rs +++ b/crates/trippy-core/src/strategy.rs @@ -7,7 +7,7 @@ use crate::probe::{ ResponseSeqUdp, }; use crate::types::{Checksum, Sequence, TimeToLive, TraceId}; -use crate::{Extensions, IcmpPacketType, MultipathStrategy, PortDirection, Protocol}; +use crate::{Extensions, IcmpPacketType, MultipathStrategy, PortDirection, Probe, Protocol}; use std::net::IpAddr; use std::time::{Duration, SystemTime}; use tracing::instrument; @@ -99,16 +99,20 @@ impl)> Strategy { let sent = SystemTime::now(); match self.config.protocol { Protocol::Icmp => { - network.send_probe(st.next_probe(sent))?; + let probe = st.next_probe(sent); + Self::do_send(network, st, probe)?; + } + Protocol::Udp => { + let probe = st.next_probe(sent); + Self::do_send(network, st, probe)?; } - Protocol::Udp => network.send_probe(st.next_probe(sent))?, Protocol::Tcp => { let mut probe = if st.round_has_capacity() { st.next_probe(sent) } else { return Err(Error::InsufficientCapacity); }; - while let Err(err) = network.send_probe(probe) { + while let Err(err) = Self::do_send(network, st, probe) { match err { Error::AddressInUse(_) => { if st.round_has_capacity() { @@ -126,6 +130,21 @@ impl)> Strategy { Ok(()) } + /// Send the probe and handle errors. + /// + /// Some errors are transient and should not be considered fatal. In these cases we mark the + /// probe as failed and continue. + fn do_send(network: &mut N, st: &mut TracerState, probe: Probe) -> Result<()> { + match network.send_probe(probe) { + Ok(()) => Ok(()), + Err(Error::ProbeFailed(_)) => { + st.fail_probe(); + Ok(()) + } + Err(err) => Err(err), + } + } + /// Read and process the next incoming `ICMP` packet. /// /// We allow multiple probes to be in-flight at any time, and we cannot guarantee that responses @@ -1010,6 +1029,19 @@ mod state { probe } + /// Mark the `ProbeStatus` at the current `sequence` as failed. + #[instrument(skip(self))] + pub fn fail_probe(&mut self) { + let probe_index = usize::from(self.sequence - self.round_sequence); + let probe = self.buffer[probe_index - 1].clone(); + match probe { + ProbeStatus::Awaited(awaited) => { + self.buffer[probe_index - 1] = ProbeStatus::Failed(awaited.failed()); + } + _ => unreachable!("expected ProbeStatus::Awaited"), + } + } + /// Determine the `src_port`, `dest_port` and `identifier` for the current probe. /// /// This will differ depending on the `TracerProtocol`, `MultipathStrategy` &