diff --git a/src/hp/netcheck.rs b/src/hp/netcheck.rs index 68e0874147..8b0e1cbf1e 100644 --- a/src/hp/netcheck.rs +++ b/src/hp/netcheck.rs @@ -23,7 +23,8 @@ use tokio::{ task::JoinSet, time::{self, Duration, Instant}, }; -use tracing::{debug, info, trace, warn}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, debug_span, info, instrument, trace, warn, Instrument}; use trust_dns_resolver::TokioAsyncResolver; use crate::net::{interfaces, ip::to_canonical}; @@ -206,6 +207,7 @@ impl Client { ) -> Result> { // If not given UdpSockets to send stun packets, create them. // TODO: Is failure really fatal? + let cancel_token = CancellationToken::new(); let stun_conn4 = match stun_conn4 { Some(stun_conn4) => stun_conn4, None => { @@ -214,7 +216,11 @@ impl Client { .await .context("netcheck: failed to bind udp 0.0.0.0:0")?; let sock = Arc::new(sock); - self.spawn_udp_listener(sock.clone(), self.msg_sender.clone()); + Self::spawn_udp_listener( + sock.clone(), + self.msg_sender.clone(), + cancel_token.clone(), + ); sock } }; @@ -226,10 +232,15 @@ impl Client { .await .context("netcheck: failed to bind udp6 [::]:0")?; let sock = Arc::new(sock); - self.spawn_udp_listener(sock.clone(), self.msg_sender.clone()); + Self::spawn_udp_listener( + sock.clone(), + self.msg_sender.clone(), + cancel_token.clone(), + ); sock } }; + let _cancel_on_drop = cancel_token.drop_guard(); // TODO: consider if DerpMap should be made to easily clone? It seems expensive // right now. @@ -237,25 +248,47 @@ impl Client { } /// Spawns a tokio task reading stun packets from the UDP socket. - fn spawn_udp_listener(&self, sock: Arc, sender: mpsc::Sender<(Bytes, SocketAddr)>) { - tokio::spawn(async move { - debug!("udp stun socket listener started"); - // TODO: Can we do better for buffers here? Probably doesn't matter - // much. - let mut buf = vec![0u8; 64 << 10]; - loop { - if let Err(err) = Self::recv_stun_socket(&sock, &mut buf, &sender).await { - // TODO: handle socket closed nicely - warn!(%err, "stun recv failed"); - break; + /// + /// This is a sync function and does not wait until the task is running. However this + /// is fine since the socket is already bound so packets will not be lost. + fn spawn_udp_listener( + sock: Arc, + sender: mpsc::Sender<(Bytes, SocketAddr)>, + cancel_token: CancellationToken, + ) { + let span = debug_span!( + "stun.listener.udp", + local_addr = sock + .local_addr() + .map(|a| a.to_string()) + .unwrap_or(String::from("-")), + ); + tokio::spawn( + async move { + debug!("udp stun socket listener started"); + // TODO: Can we do better for buffers here? Probably doesn't matter + // much. + let mut buf = vec![0u8; 64 << 10]; + loop { + tokio::select! { + biased; + _ = cancel_token.cancelled() => break, + res = Self::recv_stun_once(&sock, &mut buf, &sender) => { + if let Err(err) = res { + warn!(%err, "stun recv failed"); + break; + } + } + } } + debug!("udp stun socket listener stopped"); } - debug!("udp stun socket listener stopped"); - }); + .instrument(span), + ); } /// Receive STUN response from a UDP socket, pass it to the actor. - async fn recv_stun_socket( + async fn recv_stun_once( sock: &UdpSocket, buf: &mut [u8], sender: &mpsc::Sender<(Bytes, SocketAddr)>, @@ -499,10 +532,11 @@ struct ReportState { pc4: Arc, pc6: Arc, pc4_hair: Arc, - incremental: bool, // doing a lite, follow-up netcheck + /// Doing a lite, follow-up netcheck + incremental: bool, stop_probe: Arc, wait_port_map: wg::AsyncWaitGroup, - // to be returned by GetReport + /// The report which will be returned. report: Arc>, sent_hair_check: bool, got_ep4: Option, @@ -519,6 +553,7 @@ struct Inflight { } impl ReportState { + #[instrument(name = "report_state", skip_all)] async fn run( mut self, in_flight: sync::mpsc::Sender, @@ -527,6 +562,7 @@ impl ReportState { skip_external_network: bool, resolver: &TokioAsyncResolver, ) -> Result<(Report, DerpMap)> { + debug!(port_mapper = %port_mapper.is_some(), %skip_external_network, "running report"); self.report.write().await.os_has_ipv6 = os_has_ipv6().await; let mut port_mapping = MaybeFuture::default(); @@ -582,7 +618,7 @@ impl ReportState { }; let pinger = if self.plan.has_https_probes() { - Some(Pinger::new().await?) + Some(Pinger::new().await.context("failed to create pinger")?) } else { None }; @@ -610,15 +646,15 @@ impl ReportState { while let Some(res) = set.next().await { match res { Ok(res) => { - trace!("probe successfull"); + trace!(probe = ?res.probe, "probe successfull"); return Ok(res); } - Err(ProbeError::Transient(err)) => { - warn!("probe failed: {:?}", err); + Err(ProbeError::Transient(err, probe)) => { + warn!(?probe, "probe failed: {:?}", err); continue; } - Err(ProbeError::Fatal(err)) => { - trace!("probe error fatal: {:?}", err); + Err(ProbeError::Fatal(err, probe)) => { + warn!(?probe, "probe error fatal: {:?}", err); return Err(err); } } @@ -819,7 +855,7 @@ impl ReportState { /// Updates `self` to note that node's latency is `d`. If `ipp` /// is non-zero (for all but HTTPS replies), it's recorded as our UDP IP:port. async fn add_node_latency(&mut self, node: &DerpNode, ipp: Option, d: Duration) { - debug!("add node latency: {} - {}ms", node.name, d.as_millis()); + debug!(node = %node.name, latency = ?d, "add node latency"); let mut report = self.report.write().await; report.udp = true; update_latency(&mut report.region_latency, node.region_id, d); @@ -870,12 +906,13 @@ impl ReportState { #[derive(Debug)] enum ProbeError { /// Abort the current set. - Fatal(anyhow::Error), + Fatal(anyhow::Error, Probe), /// Continue the other probes. - Transient(anyhow::Error), + Transient(anyhow::Error, Probe), } #[allow(clippy::too_many_arguments)] +#[instrument(level = "debug", skip_all, fields(probe = ?probe))] async fn run_probe( report: Arc>, resolver: &TokioAsyncResolver, @@ -886,19 +923,18 @@ async fn run_probe( in_flight: sync::mpsc::Sender, pinger: Option, ) -> Result { - debug!("run_probe: {:?}", probe); if !probe.delay().is_zero() { - debug!("delaying probe by {}ms", probe.delay().as_millis()); + debug!("delaying probe"); time::sleep(*probe.delay()).await; } if !probe_would_help(&*report.read().await, &probe, &node) { - return Err(ProbeError::Fatal(anyhow!("probe would not help"))); + return Err(ProbeError::Fatal(anyhow!("probe would not help"), probe)); } let addr = get_node_addr(resolver, &node, probe.proto()).await; if addr.is_none() { - return Err(ProbeError::Transient(anyhow!("no node addr"))); + return Err(ProbeError::Transient(anyhow!("no node addr"), probe)); } let addr = addr.unwrap(); let txid = stun::TransactionId::default(); @@ -913,7 +949,7 @@ async fn run_probe( s, }) .await - .map_err(|e| ProbeError::Transient(e.into()))?; + .map_err(|e| ProbeError::Transient(e.into(), probe.clone()))?; let mut result = ProbeReport::new(probe.clone()); match probe { @@ -921,12 +957,14 @@ async fn run_probe( // TODO: // metricSTUNSend4.Add(1) let n = pc4.send_to(&req, addr).await; - debug!("sending probe IPV4: {:?} to {}", n, addr); + debug!(%addr, send_res=?n, "sending probe IPV4"); // TODO: || neterror.TreatAsLostUDP(err) if n.is_ok() && n.unwrap() == req.len() { result.ipv4_can_send = true; - let (delay, addr) = r.await.map_err(|e| ProbeError::Transient(e.into()))?; + let (delay, addr) = r + .await + .map_err(|e| ProbeError::Transient(e.into(), probe))?; result.delay = Some(delay); result.addr = Some(addr); } @@ -935,12 +973,14 @@ async fn run_probe( // TODO: // metricSTUNSend6.Add(1) let n = pc6.send_to(&req, addr).await; - debug!("sending probe IPV6: {:?} to {}", n, addr); + debug!(%addr, snd_res=?n, "sending probe IPV6"); // TODO: || neterror.TreatAsLostUDP(err) if n.is_ok() && n.unwrap() == req.len() { result.ipv6_can_send = true; - let (delay, addr) = r.await.map_err(|e| ProbeError::Transient(e.into()))?; + let (delay, addr) = r + .await + .map_err(|e| ProbeError::Transient(e.into(), probe))?; result.delay = Some(delay); result.addr = Some(addr); } @@ -1084,6 +1124,7 @@ impl Actor { /// Performs a single netcheck run. /// /// See [`Client::get_report`] for the arguments. + #[instrument(name = "get_report", skip_all)] async fn run( &mut self, dm: DerpMap, @@ -1092,7 +1133,8 @@ impl Actor { ) -> Result> { let report_state = self .create_report_state(&dm, stun_sock_v4.clone(), stun_sock_v6.clone()) - .await?; + .await + .context("failed to create ReportState")?; let (in_flight_s, mut in_flight_r) = sync::mpsc::channel(8); let mut running = { let port_mapper = self.port_mapper.clone(); @@ -1207,7 +1249,7 @@ impl Actor { pkt: &[u8], src: SocketAddr, ) { - debug!("received STUN packet from {}", src); + trace!(%src, "received STUN packet"); // if src.is_ipv4() { // TODO: @@ -1455,6 +1497,7 @@ mod tests { let mut client = Client::new(None).await?; let dm = stun::test::derp_map_of([stun_addr].into_iter()); + dbg!(&dm); for i in 0..5 { println!("--round {}", i); @@ -1512,6 +1555,7 @@ mod tests { UseIpv6::None, ); dm.regions.get_mut(&1).unwrap().nodes[0].stun_only = true; + dbg!(&dm); let r = client .get_report(&dm, None, None)