Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(netcheck): Log and shut down stun listeners #1022

Merged
merged 2 commits into from
May 17, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 83 additions & 39 deletions src/hp/netcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use tokio::{
task::JoinSet,
time::{self, Duration, Instant},
};
use tracing::{debug, info, trace, warn};
use tokio_util::sync::CancellationToken;
use tracing::{debug, debug_span, info, instrument, trace, warn, Instrument};
use trust_dns_resolver::TokioAsyncResolver;

use crate::net::{interfaces, ip::to_canonical};
Expand Down Expand Up @@ -206,6 +207,7 @@ impl Client {
) -> Result<Arc<Report>> {
// If not given UdpSockets to send stun packets, create them.
// TODO: Is failure really fatal?
let cancel_token = CancellationToken::new();
let stun_conn4 = match stun_conn4 {
Some(stun_conn4) => stun_conn4,
None => {
Expand All @@ -214,7 +216,11 @@ impl Client {
.await
.context("netcheck: failed to bind udp 0.0.0.0:0")?;
let sock = Arc::new(sock);
self.spawn_udp_listener(sock.clone(), self.msg_sender.clone());
Self::spawn_udp_listener(
sock.clone(),
self.msg_sender.clone(),
cancel_token.clone(),
);
sock
}
};
Expand All @@ -226,36 +232,63 @@ impl Client {
.await
.context("netcheck: failed to bind udp6 [::]:0")?;
let sock = Arc::new(sock);
self.spawn_udp_listener(sock.clone(), self.msg_sender.clone());
Self::spawn_udp_listener(
sock.clone(),
self.msg_sender.clone(),
cancel_token.clone(),
);
sock
}
};
let _cancel_on_drop = cancel_token.drop_guard();

// TODO: consider if DerpMap should be made to easily clone? It seems expensive
// right now.
self.actor.run(dm.clone(), stun_conn4, stun_conn6).await
}

/// Spawns a tokio task reading stun packets from the UDP socket.
fn spawn_udp_listener(&self, sock: Arc<UdpSocket>, sender: mpsc::Sender<(Bytes, SocketAddr)>) {
tokio::spawn(async move {
debug!("udp stun socket listener started");
// TODO: Can we do better for buffers here? Probably doesn't matter
// much.
let mut buf = vec![0u8; 64 << 10];
loop {
if let Err(err) = Self::recv_stun_socket(&sock, &mut buf, &sender).await {
// TODO: handle socket closed nicely
warn!(%err, "stun recv failed");
break;
///
/// This is a sync function and does not wait until the task is running. However this
/// is fine since the socket is already bound so packets will not be lost.
fn spawn_udp_listener(
sock: Arc<UdpSocket>,
sender: mpsc::Sender<(Bytes, SocketAddr)>,
cancel_token: CancellationToken,
) {
let span = debug_span!(
"stun.listener.udp",
local_addr = sock
.local_addr()
.map(|a| a.to_string())
.unwrap_or(String::from("-")),
);
tokio::spawn(
async move {
debug!("udp stun socket listener started");
// TODO: Can we do better for buffers here? Probably doesn't matter
// much.
let mut buf = vec![0u8; 64 << 10];
loop {
tokio::select! {
biased;
_ = cancel_token.cancelled() => break,
res = Self::recv_stun_once(&sock, &mut buf, &sender) => {
if let Err(err) = res {
warn!(%err, "stun recv failed");
break;
}
}
}
}
debug!("udp stun socket listener stopped");
}
debug!("udp stun socket listener stopped");
});
.instrument(span),
);
}

/// Receive STUN response from a UDP socket, pass it to the actor.
async fn recv_stun_socket(
async fn recv_stun_once(
sock: &UdpSocket,
buf: &mut [u8],
sender: &mpsc::Sender<(Bytes, SocketAddr)>,
Expand Down Expand Up @@ -499,10 +532,11 @@ struct ReportState {
pc4: Arc<UdpSocket>,
pc6: Arc<UdpSocket>,
pc4_hair: Arc<UdpSocket>,
incremental: bool, // doing a lite, follow-up netcheck
/// Doing a lite, follow-up netcheck
incremental: bool,
stop_probe: Arc<sync::Notify>,
wait_port_map: wg::AsyncWaitGroup,
// to be returned by GetReport
/// The report which will be returned.
report: Arc<RwLock<Report>>,
sent_hair_check: bool,
got_ep4: Option<SocketAddr>,
Expand All @@ -519,6 +553,7 @@ struct Inflight {
}

impl ReportState {
#[instrument(name = "report_state", skip_all)]
async fn run(
mut self,
in_flight: sync::mpsc::Sender<Inflight>,
Expand All @@ -527,6 +562,7 @@ impl ReportState {
skip_external_network: bool,
resolver: &TokioAsyncResolver,
) -> Result<(Report, DerpMap)> {
debug!(port_mapper = %port_mapper.is_some(), %skip_external_network, "running report");
self.report.write().await.os_has_ipv6 = os_has_ipv6().await;

let mut port_mapping = MaybeFuture::default();
Expand Down Expand Up @@ -582,7 +618,7 @@ impl ReportState {
};

let pinger = if self.plan.has_https_probes() {
Some(Pinger::new().await?)
Some(Pinger::new().await.context("failed to create pinger")?)
} else {
None
};
Expand Down Expand Up @@ -610,15 +646,15 @@ impl ReportState {
while let Some(res) = set.next().await {
match res {
Ok(res) => {
trace!("probe successfull");
trace!(probe = ?res.probe, "probe successfull");
return Ok(res);
}
Err(ProbeError::Transient(err)) => {
warn!("probe failed: {:?}", err);
Err(ProbeError::Transient(err, probe)) => {
warn!(?probe, "probe failed: {:?}", err);
continue;
}
Err(ProbeError::Fatal(err)) => {
trace!("probe error fatal: {:?}", err);
Err(ProbeError::Fatal(err, probe)) => {
warn!(?probe, "probe error fatal: {:?}", err);
return Err(err);
}
}
Expand Down Expand Up @@ -819,7 +855,7 @@ impl ReportState {
/// Updates `self` to note that node's latency is `d`. If `ipp`
/// is non-zero (for all but HTTPS replies), it's recorded as our UDP IP:port.
async fn add_node_latency(&mut self, node: &DerpNode, ipp: Option<SocketAddr>, d: Duration) {
debug!("add node latency: {} - {}ms", node.name, d.as_millis());
debug!(node = %node.name, latency = ?d, "add node latency");
let mut report = self.report.write().await;
report.udp = true;
update_latency(&mut report.region_latency, node.region_id, d);
Expand Down Expand Up @@ -870,12 +906,13 @@ impl ReportState {
#[derive(Debug)]
enum ProbeError {
/// Abort the current set.
Fatal(anyhow::Error),
Fatal(anyhow::Error, Probe),
/// Continue the other probes.
Transient(anyhow::Error),
Transient(anyhow::Error, Probe),
}

#[allow(clippy::too_many_arguments)]
#[instrument(level = "debug", skip_all, fields(probe = ?probe))]
async fn run_probe(
report: Arc<RwLock<Report>>,
resolver: &TokioAsyncResolver,
Expand All @@ -886,19 +923,18 @@ async fn run_probe(
in_flight: sync::mpsc::Sender<Inflight>,
pinger: Option<Pinger>,
) -> Result<ProbeReport, ProbeError> {
debug!("run_probe: {:?}", probe);
if !probe.delay().is_zero() {
debug!("delaying probe by {}ms", probe.delay().as_millis());
debug!("delaying probe");
time::sleep(*probe.delay()).await;
}

if !probe_would_help(&*report.read().await, &probe, &node) {
return Err(ProbeError::Fatal(anyhow!("probe would not help")));
return Err(ProbeError::Fatal(anyhow!("probe would not help"), probe));
}

let addr = get_node_addr(resolver, &node, probe.proto()).await;
if addr.is_none() {
return Err(ProbeError::Transient(anyhow!("no node addr")));
return Err(ProbeError::Transient(anyhow!("no node addr"), probe));
}
let addr = addr.unwrap();
let txid = stun::TransactionId::default();
Expand All @@ -913,20 +949,22 @@ async fn run_probe(
s,
})
.await
.map_err(|e| ProbeError::Transient(e.into()))?;
.map_err(|e| ProbeError::Transient(e.into(), probe.clone()))?;
let mut result = ProbeReport::new(probe.clone());

match probe {
Probe::Ipv4 { .. } => {
// TODO:
// metricSTUNSend4.Add(1)
let n = pc4.send_to(&req, addr).await;
debug!("sending probe IPV4: {:?} to {}", n, addr);
debug!(%addr, send_res=?n, "sending probe IPV4");
// TODO: || neterror.TreatAsLostUDP(err)
if n.is_ok() && n.unwrap() == req.len() {
result.ipv4_can_send = true;

let (delay, addr) = r.await.map_err(|e| ProbeError::Transient(e.into()))?;
let (delay, addr) = r
.await
.map_err(|e| ProbeError::Transient(e.into(), probe))?;
result.delay = Some(delay);
result.addr = Some(addr);
}
Expand All @@ -935,12 +973,14 @@ async fn run_probe(
// TODO:
// metricSTUNSend6.Add(1)
let n = pc6.send_to(&req, addr).await;
debug!("sending probe IPV6: {:?} to {}", n, addr);
debug!(%addr, snd_res=?n, "sending probe IPV6");
// TODO: || neterror.TreatAsLostUDP(err)
if n.is_ok() && n.unwrap() == req.len() {
result.ipv6_can_send = true;

let (delay, addr) = r.await.map_err(|e| ProbeError::Transient(e.into()))?;
let (delay, addr) = r
.await
.map_err(|e| ProbeError::Transient(e.into(), probe))?;
result.delay = Some(delay);
result.addr = Some(addr);
}
Expand Down Expand Up @@ -1084,6 +1124,7 @@ impl Actor {
/// Performs a single netcheck run.
///
/// See [`Client::get_report`] for the arguments.
#[instrument(name = "get_report", skip_all)]
async fn run(
&mut self,
dm: DerpMap,
Expand All @@ -1092,7 +1133,8 @@ impl Actor {
) -> Result<Arc<Report>> {
let report_state = self
.create_report_state(&dm, stun_sock_v4.clone(), stun_sock_v6.clone())
.await?;
.await
.context("failed to create ReportState")?;
let (in_flight_s, mut in_flight_r) = sync::mpsc::channel(8);
let mut running = {
let port_mapper = self.port_mapper.clone();
Expand Down Expand Up @@ -1207,7 +1249,7 @@ impl Actor {
pkt: &[u8],
src: SocketAddr,
) {
debug!("received STUN packet from {}", src);
trace!(%src, "received STUN packet");

// if src.is_ipv4() {
// TODO:
Expand Down Expand Up @@ -1455,6 +1497,7 @@ mod tests {

let mut client = Client::new(None).await?;
let dm = stun::test::derp_map_of([stun_addr].into_iter());
dbg!(&dm);

for i in 0..5 {
println!("--round {}", i);
Expand Down Expand Up @@ -1512,6 +1555,7 @@ mod tests {
UseIpv6::None,
);
dm.regions.get_mut(&1).unwrap().nodes[0].stun_only = true;
dbg!(&dm);

let r = client
.get_report(&dm, None, None)
Expand Down