From a43555d5cda9149ea3c30f12dc636cf3eb693536 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 7 May 2021 11:10:47 +1000 Subject: [PATCH 1/4] Instrument the crawl task When we created the crawl task, we forgot to instrument it with the global span. This fix makes sure that the git and network span appears on crawl logs. --- zebra-network/src/peer_set/initialize.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index c8dffc0a1d0..bd96e9ac95e 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -381,15 +381,14 @@ where DemandHandshake { candidate } => { // spawn each handshake into an independent task, so it can make // progress independently of the crawls - let hs_join = - tokio::spawn(dial(candidate, outbound_connector.clone())).map(move |res| { - match res { - Ok(crawler_action) => crawler_action, - Err(e) => { - panic!("panic during handshaking with {:?}: {:?} ", candidate, e); - } + let hs_join = tokio::spawn(dial(candidate, outbound_connector.clone())) + .map(move |res| match res { + Ok(crawler_action) => crawler_action, + Err(e) => { + panic!("panic during handshaking with {:?}: {:?} ", candidate, e); } - }); + }) + .instrument(Span::current()); handshakes.push(Box::pin(hs_join)); } DemandCrawl => { From ad214ac8874c5471cca8371db136b5630425f1d6 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 14 May 2021 16:45:08 +1000 Subject: [PATCH 2/4] Instrument the connector --- zebra-network/src/peer/connector.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/zebra-network/src/peer/connector.rs b/zebra-network/src/peer/connector.rs index 547152e593b..a128b59f287 100644 --- a/zebra-network/src/peer/connector.rs +++ b/zebra-network/src/peer/connector.rs @@ -8,6 +8,7 @@ use std::{ use futures::prelude::*; use tokio::net::TcpStream; use tower::{discover::Change, Service, ServiceExt}; +use tracing_futures::Instrument; use crate::{BoxError, Request, Response}; @@ -50,13 +51,15 @@ where fn call(&mut self, addr: SocketAddr) -> Self::Future { let mut hs = self.handshaker.clone(); + let connected_addr = ConnectedAddr::new_outbound_direct(addr); + let connector_span = info_span!("connector", peer = ?connected_addr); async move { let stream = TcpStream::connect(addr).await?; hs.ready_and().await?; - let connected_addr = ConnectedAddr::new_outbound_direct(addr); let client = hs.call((stream, connected_addr)).await?; Ok(Change::Insert(addr, client)) } + .instrument(connector_span) .boxed() } } From a093a08ef3531d103f66a948f6801054c016365b Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 14 May 2021 16:45:46 +1000 Subject: [PATCH 3/4] Improve handshake instrumentation Make some spans debug, so there are not too many spans. --- zebra-network/src/peer/handshake.rs | 13 ++++++++++--- zebra-network/src/peer_set/initialize.rs | 19 +++++++++++++------ 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index d66e4f42917..270a0f2e001 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -554,7 +554,7 @@ where fn call(&mut self, req: HandshakeRequest) -> Self::Future { let (tcp_stream, connected_addr) = req; - let negotiator_span = span!(Level::INFO, "negotiator", peer = ?connected_addr); + let negotiator_span = debug_span!("negotiator", peer = ?connected_addr); // set the peer connection span's parent to the global span, as it // should exist independently of its creation source (inbound // connection, crawler, initial peer, ...) @@ -633,7 +633,9 @@ where // Instrument the peer's rx and tx streams. + let inner_conn_span = connection_span.clone(); let peer_tx = peer_tx.with(move |msg: Message| { + let span = debug_span!(parent: inner_conn_span.clone(), "outbound_metric"); // Add a metric for outbound messages. metrics::counter!( "zcash.net.out.messages", @@ -645,7 +647,7 @@ where // because we need the sink to be Unpin, and the With // returned by .with is Unpin only if Fut is Unpin, and the // futures generated by async blocks are not Unpin. - future::ready(Ok(msg)) + future::ready(Ok(msg)).instrument(span) }); // CORRECTNESS @@ -654,11 +656,15 @@ where // the inbound_ts_collector. let inbound_ts_collector = timestamp_collector.clone(); let inv_collector = inv_collector.clone(); + let ts_inner_conn_span = connection_span.clone(); + let inv_inner_conn_span = connection_span.clone(); let peer_rx = peer_rx .then(move |msg| { // Add a metric for inbound messages and errors. // Fire a timestamp or failure event. let mut inbound_ts_collector = inbound_ts_collector.clone(); + let span = + debug_span!(parent: ts_inner_conn_span.clone(), "inbound_ts_collector"); async move { match &msg { Ok(msg) => { @@ -694,10 +700,11 @@ where } msg } + .instrument(span) }) .then(move |msg| { let inv_collector = inv_collector.clone(); - let span = debug_span!("inventory_filter"); + let span = debug_span!(parent: inv_inner_conn_span.clone(), "inventory_filter"); async move { if let (Ok(Message::Inv(hashes)), Some(transient_addr)) = (&msg, connected_addr.get_transient_addr()) diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index bd96e9ac95e..5246398b16b 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -251,19 +251,26 @@ where info!("Opened Zcash protocol endpoint at {}", local_addr); loop { if let Ok((tcp_stream, addr)) = listener.accept().await { - debug!(?addr, "got incoming connection"); + let connected_addr = peer::ConnectedAddr::new_inbound_direct(addr); + let accept_span = info_span!("listen_accept", peer = ?connected_addr); + let _guard = accept_span.enter(); + + debug!("got incoming connection"); handshaker.ready_and().await?; // TODO: distinguish between proxied listeners and direct listeners - let connected_addr = peer::ConnectedAddr::new_inbound_direct(addr); + let handshaker_span = info_span!("listen_handshaker", peer = ?connected_addr); // Construct a handshake future but do not drive it yet.... let handshake = handshaker.call((tcp_stream, connected_addr)); // ... instead, spawn a new task to handle this connection let mut tx2 = tx.clone(); - tokio::spawn(async move { - if let Ok(client) = handshake.await { - let _ = tx2.send(Ok(Change::Insert(addr, client))).await; + tokio::spawn( + async move { + if let Ok(client) = handshake.await { + let _ = tx2.send(Ok(Change::Insert(addr, client))).await; + } } - }); + .instrument(handshaker_span), + ); } } } From 0b049c7c11ad4077cfdf42c2bb4e9779d471418a Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 14 May 2021 17:08:02 +1000 Subject: [PATCH 4/4] Add the address to initial peer connection errors --- zebra-network/src/peer_set/initialize.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 5246398b16b..2ed4d8f0c6a 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -204,16 +204,22 @@ where // an indefinite period. We can use `CallAllUnordered` without filling // the underlying `Inbound` buffer, because we immediately drive this // single `CallAll` to completion, and handshakes have a short timeout. - use tower::util::CallAllUnordered; - let addr_stream = futures::stream::iter(initial_peers.into_iter()); - let mut handshakes = CallAllUnordered::new(outbound_connector, addr_stream); + let mut handshakes: FuturesUnordered<_> = initial_peers + .into_iter() + .map(|addr| { + outbound_connector + .clone() + .oneshot(addr) + .map_err(move |e| (addr, e)) + }) + .collect(); while let Some(handshake_result) = handshakes.next().await { // this is verbose, but it's better than just hanging with no output - if let Err(ref e) = handshake_result { - info!(?e, "an initial peer connection failed"); + if let Err((addr, ref e)) = handshake_result { + info!(?addr, ?e, "an initial peer connection failed"); } - tx.send(handshake_result).await?; + tx.send(handshake_result.map_err(|(_addr, e)| e)).await?; } Ok(())