Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add extra instrumentation for initialize and handshakes #2122

Merged
merged 4 commits into from
May 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion zebra-network/src/peer/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
use futures::prelude::*;
use tokio::net::TcpStream;
use tower::{discover::Change, Service, ServiceExt};
use tracing_futures::Instrument;

use crate::{BoxError, Request, Response};

Expand Down Expand Up @@ -50,13 +51,15 @@ where

fn call(&mut self, addr: SocketAddr) -> Self::Future {
let mut hs = self.handshaker.clone();
let connected_addr = ConnectedAddr::new_outbound_direct(addr);
let connector_span = info_span!("connector", peer = ?connected_addr);
async move {
let stream = TcpStream::connect(addr).await?;
hs.ready_and().await?;
let connected_addr = ConnectedAddr::new_outbound_direct(addr);
let client = hs.call((stream, connected_addr)).await?;
Ok(Change::Insert(addr, client))
}
.instrument(connector_span)
.boxed()
}
}
13 changes: 10 additions & 3 deletions zebra-network/src/peer/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ where
fn call(&mut self, req: HandshakeRequest) -> Self::Future {
let (tcp_stream, connected_addr) = req;

let negotiator_span = span!(Level::INFO, "negotiator", peer = ?connected_addr);
let negotiator_span = debug_span!("negotiator", peer = ?connected_addr);
// set the peer connection span's parent to the global span, as it
// should exist independently of its creation source (inbound
// connection, crawler, initial peer, ...)
Expand Down Expand Up @@ -633,7 +633,9 @@ where

// Instrument the peer's rx and tx streams.

let inner_conn_span = connection_span.clone();
let peer_tx = peer_tx.with(move |msg: Message| {
let span = debug_span!(parent: inner_conn_span.clone(), "outbound_metric");
// Add a metric for outbound messages.
metrics::counter!(
"zcash.net.out.messages",
Expand All @@ -645,7 +647,7 @@ where
// because we need the sink to be Unpin, and the With<Fut, ...>
// returned by .with is Unpin only if Fut is Unpin, and the
// futures generated by async blocks are not Unpin.
future::ready(Ok(msg))
future::ready(Ok(msg)).instrument(span)
});

// CORRECTNESS
Expand All @@ -654,11 +656,15 @@ where
// the inbound_ts_collector.
let inbound_ts_collector = timestamp_collector.clone();
let inv_collector = inv_collector.clone();
let ts_inner_conn_span = connection_span.clone();
let inv_inner_conn_span = connection_span.clone();
let peer_rx = peer_rx
.then(move |msg| {
// Add a metric for inbound messages and errors.
// Fire a timestamp or failure event.
let mut inbound_ts_collector = inbound_ts_collector.clone();
let span =
debug_span!(parent: ts_inner_conn_span.clone(), "inbound_ts_collector");
async move {
match &msg {
Ok(msg) => {
Expand Down Expand Up @@ -694,10 +700,11 @@ where
}
msg
}
.instrument(span)
})
.then(move |msg| {
let inv_collector = inv_collector.clone();
let span = debug_span!("inventory_filter");
let span = debug_span!(parent: inv_inner_conn_span.clone(), "inventory_filter");
async move {
if let (Ok(Message::Inv(hashes)), Some(transient_addr)) =
(&msg, connected_addr.get_transient_addr())
Expand Down
52 changes: 32 additions & 20 deletions zebra-network/src/peer_set/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,16 +204,22 @@ where
// an indefinite period. We can use `CallAllUnordered` without filling
// the underlying `Inbound` buffer, because we immediately drive this
// single `CallAll` to completion, and handshakes have a short timeout.
use tower::util::CallAllUnordered;
let addr_stream = futures::stream::iter(initial_peers.into_iter());
let mut handshakes = CallAllUnordered::new(outbound_connector, addr_stream);
let mut handshakes: FuturesUnordered<_> = initial_peers
.into_iter()
.map(|addr| {
outbound_connector
.clone()
.oneshot(addr)
.map_err(move |e| (addr, e))
})
.collect();

while let Some(handshake_result) = handshakes.next().await {
// this is verbose, but it's better than just hanging with no output
if let Err(ref e) = handshake_result {
info!(?e, "an initial peer connection failed");
if let Err((addr, ref e)) = handshake_result {
info!(?addr, ?e, "an initial peer connection failed");
}
tx.send(handshake_result).await?;
tx.send(handshake_result.map_err(|(_addr, e)| e)).await?;
}

Ok(())
Expand Down Expand Up @@ -251,19 +257,26 @@ where
info!("Opened Zcash protocol endpoint at {}", local_addr);
loop {
if let Ok((tcp_stream, addr)) = listener.accept().await {
debug!(?addr, "got incoming connection");
let connected_addr = peer::ConnectedAddr::new_inbound_direct(addr);
let accept_span = info_span!("listen_accept", peer = ?connected_addr);
let _guard = accept_span.enter();

debug!("got incoming connection");
handshaker.ready_and().await?;
// TODO: distinguish between proxied listeners and direct listeners
let connected_addr = peer::ConnectedAddr::new_inbound_direct(addr);
let handshaker_span = info_span!("listen_handshaker", peer = ?connected_addr);
// Construct a handshake future but do not drive it yet....
let handshake = handshaker.call((tcp_stream, connected_addr));
// ... instead, spawn a new task to handle this connection
let mut tx2 = tx.clone();
tokio::spawn(async move {
if let Ok(client) = handshake.await {
let _ = tx2.send(Ok(Change::Insert(addr, client))).await;
tokio::spawn(
async move {
if let Ok(client) = handshake.await {
let _ = tx2.send(Ok(Change::Insert(addr, client))).await;
}
}
});
.instrument(handshaker_span),
);
}
}
}
Expand Down Expand Up @@ -381,15 +394,14 @@ where
DemandHandshake { candidate } => {
// spawn each handshake into an independent task, so it can make
// progress independently of the crawls
let hs_join =
tokio::spawn(dial(candidate, outbound_connector.clone())).map(move |res| {
match res {
Ok(crawler_action) => crawler_action,
Err(e) => {
panic!("panic during handshaking with {:?}: {:?} ", candidate, e);
}
let hs_join = tokio::spawn(dial(candidate, outbound_connector.clone()))
.map(move |res| match res {
Ok(crawler_action) => crawler_action,
Err(e) => {
panic!("panic during handshaking with {:?}: {:?} ", candidate, e);
}
});
})
.instrument(Span::current());
handshakes.push(Box::pin(hs_join));
}
DemandCrawl => {
Expand Down