From 626bf47a52accfe19304ea8978371ecd2bdde789 Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 8 Jun 2023 13:10:13 +1000 Subject: [PATCH 1/6] Stop sending peer errors on the PeerSet channel, to respect send limits --- zebra-network/src/peer_set/initialize.rs | 47 ++++++----- .../src/peer_set/initialize/tests/vectors.rs | 83 +++---------------- 2 files changed, 38 insertions(+), 92 deletions(-) diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 72e1b8878b3..78f8383581f 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -5,6 +5,7 @@ use std::{ collections::{BTreeMap, HashSet}, + convert::Infallible, net::SocketAddr, sync::Arc, time::Duration, @@ -13,7 +14,7 @@ use std::{ use futures::{ future::{self, FutureExt}, sink::SinkExt, - stream::{FuturesUnordered, StreamExt, TryStreamExt}, + stream::{FuturesUnordered, StreamExt}, TryFutureExt, }; use rand::seq::SliceRandom; @@ -46,11 +47,15 @@ use crate::{ #[cfg(test)] mod tests; -/// The result of an outbound peer connection attempt or inbound connection -/// handshake. +/// A successful outbound peer connection attempt or inbound connection handshake. /// -/// This result comes from the `Handshaker`. -type DiscoveredPeer = Result<(PeerSocketAddr, peer::Client), BoxError>; +/// The [`Handshake`](peer::Handshake) service returns a [`Result`]. Only successful connections +/// should be sent on the channel. Errors should be logged or ignored. +/// +/// We don't allow any errors in this type, because: +/// - The connection limits don't include failed connections +/// - tower::Discover interprets an error as stream termination +type DiscoveredPeer = (PeerSocketAddr, peer::Client); /// Initialize a peer set, using a network `config`, `inbound_service`, /// and `latest_chain_tip`. @@ -146,14 +151,15 @@ where // Create an mpsc channel for peer changes, // based on the maximum number of inbound and outbound peers. + // + // The connection limit does not apply to errors, + // so they need to be handled before sending to this channel. let (peerset_tx, peerset_rx) = futures::channel::mpsc::channel::(config.peerset_total_connection_limit()); - let discovered_peers = peerset_rx - // Discover interprets an error as stream termination, - // so discard any errored connections... - .filter(|result| future::ready(result.is_ok())) - .map_ok(|(address, client)| Change::Insert(address, client.into())); + let discovered_peers = peerset_rx.map(|(address, client)| { + Result::<_, Infallible>::Ok(Change::Insert(address, client.into())) + }); // Create an mpsc channel for peerset demand signaling, // based on the maximum number of outbound peers. @@ -210,6 +216,9 @@ where // because zcashd rate-limits `addr`/`addrv2` messages per connection, // and if we only have one initial peer, // we need to ensure that its `Response::Addr` is used by the crawler. + // + // TODO: this might not be needed after we added the Connection peer address cache, + // try removing it in a future release? info!( ?active_initial_peer_count, "sending initial request for peers" @@ -342,7 +351,7 @@ where let handshake_result = handshake_result.expect("unexpected panic in initial peer handshake"); match handshake_result { - Ok(ref change) => { + Ok(change) => { handshake_success_total += 1; debug!( ?handshake_success_total, @@ -350,6 +359,9 @@ where ?change, "an initial peer handshake succeeded" ); + + // The connection limit makes sure this send doesn't block + peerset_tx.send(change).await?; } Err((addr, ref e)) => { handshake_error_total += 1; @@ -384,10 +396,6 @@ where } } - peerset_tx - .send(handshake_result.map_err(|(_addr, e)| e)) - .await?; - // Security: Let other tasks run after each connection is processed. // // Avoids remote peers starving other Zebra tasks using initial connection successes or errors. @@ -617,7 +625,8 @@ where let handshake_result = handshake.await; if let Ok(client) = handshake_result { - let _ = peerset_tx.send(Ok((addr, client))).await; + // The connection limit makes sure this send doesn't block + let _ = peerset_tx.send((addr, client)).await; } else { debug!(?handshake_result, "error handshaking with inbound peer"); } @@ -856,9 +865,9 @@ where } HandshakeConnected { address, client } => { debug!(candidate.addr = ?address, "successfully dialed new peer"); - // successes are handled by an independent task, except for `candidates.update` in - // this task, which has a timeout, so they shouldn't hang - peerset_tx.send(Ok((address, client))).await?; + + // The connection limit makes sure this send doesn't block in production code. + peerset_tx.send((address, client)).await?; } HandshakeFailed { failed_addr } => { // The connection was never opened, or it failed the handshake and was dropped. diff --git a/zebra-network/src/peer_set/initialize/tests/vectors.rs b/zebra-network/src/peer_set/initialize/tests/vectors.rs index d9fdf9a1535..f949506cdaf 100644 --- a/zebra-network/src/peer_set/initialize/tests/vectors.rs +++ b/zebra-network/src/peer_set/initialize/tests/vectors.rs @@ -459,15 +459,7 @@ async fn crawler_peer_limit_one_connect_ok_then_drop() { let peer_result = peerset_rx.try_next(); match peer_result { // A peer handshake succeeded. - Ok(Some(peer_result)) => { - assert!( - matches!(peer_result, Ok((_, _))), - "unexpected connection error: {peer_result:?}\n\ - {peer_count} previous peers succeeded", - ); - peer_count += 1; - } - + Ok(Some(_peer_change)) => peer_count += 1, // The channel is closed and there are no messages left in the channel. Ok(None) => break, // The channel is still open, but there are no messages left in the channel. @@ -521,15 +513,7 @@ async fn crawler_peer_limit_one_connect_ok_stay_open() { let peer_change_result = peerset_rx.try_next(); match peer_change_result { // A peer handshake succeeded. - Ok(Some(peer_change_result)) => { - assert!( - matches!(peer_change_result, Ok((_, _))), - "unexpected connection error: {peer_change_result:?}\n\ - {peer_change_count} previous peers succeeded", - ); - peer_change_count += 1; - } - + Ok(Some(_peer_change)) => peer_change_count += 1, // The channel is closed and there are no messages left in the channel. Ok(None) => break, // The channel is still open, but there are no messages left in the channel. @@ -631,15 +615,7 @@ async fn crawler_peer_limit_default_connect_ok_then_drop() { let peer_result = peerset_rx.try_next(); match peer_result { // A peer handshake succeeded. - Ok(Some(peer_result)) => { - assert!( - matches!(peer_result, Ok((_, _))), - "unexpected connection error: {peer_result:?}\n\ - {peer_count} previous peers succeeded", - ); - peer_count += 1; - } - + Ok(Some(_peer_change)) => peer_count += 1, // The channel is closed and there are no messages left in the channel. Ok(None) => break, // The channel is still open, but there are no messages left in the channel. @@ -694,15 +670,7 @@ async fn crawler_peer_limit_default_connect_ok_stay_open() { let peer_change_result = peerset_rx.try_next(); match peer_change_result { // A peer handshake succeeded. - Ok(Some(peer_change_result)) => { - assert!( - matches!(peer_change_result, Ok((_, _))), - "unexpected connection error: {peer_change_result:?}\n\ - {peer_change_count} previous peers succeeded", - ); - peer_change_count += 1; - } - + Ok(Some(_peer_change)) => peer_change_count += 1, // The channel is closed and there are no messages left in the channel. Ok(None) => break, // The channel is still open, but there are no messages left in the channel. @@ -834,15 +802,7 @@ async fn listener_peer_limit_one_handshake_ok_then_drop() { let peer_result = peerset_rx.try_next(); match peer_result { // A peer handshake succeeded. - Ok(Some(peer_result)) => { - assert!( - matches!(peer_result, Ok((_, _))), - "unexpected connection error: {peer_result:?}\n\ - {peer_count} previous peers succeeded", - ); - peer_count += 1; - } - + Ok(Some(_peer_change)) => peer_count += 1, // The channel is closed and there are no messages left in the channel. Ok(None) => break, // The channel is still open, but there are no messages left in the channel. @@ -900,15 +860,7 @@ async fn listener_peer_limit_one_handshake_ok_stay_open() { let peer_change_result = peerset_rx.try_next(); match peer_change_result { // A peer handshake succeeded. - Ok(Some(peer_change_result)) => { - assert!( - matches!(peer_change_result, Ok((_, _))), - "unexpected connection error: {peer_change_result:?}\n\ - {peer_change_count} previous peers succeeded", - ); - peer_change_count += 1; - } - + Ok(Some(_peer_change)) => peer_change_count += 1, // The channel is closed and there are no messages left in the channel. Ok(None) => break, // The channel is still open, but there are no messages left in the channel. @@ -1019,15 +971,7 @@ async fn listener_peer_limit_default_handshake_ok_then_drop() { let peer_result = peerset_rx.try_next(); match peer_result { // A peer handshake succeeded. - Ok(Some(peer_result)) => { - assert!( - matches!(peer_result, Ok((_, _))), - "unexpected connection error: {peer_result:?}\n\ - {peer_count} previous peers succeeded", - ); - peer_count += 1; - } - + Ok(Some(_peer_change)) => peer_count += 1, // The channel is closed and there are no messages left in the channel. Ok(None) => break, // The channel is still open, but there are no messages left in the channel. @@ -1085,15 +1029,7 @@ async fn listener_peer_limit_default_handshake_ok_stay_open() { let peer_change_result = peerset_rx.try_next(); match peer_change_result { // A peer handshake succeeded. - Ok(Some(peer_change_result)) => { - assert!( - matches!(peer_change_result, Ok((_, _))), - "unexpected connection error: {peer_change_result:?}\n\ - {peer_change_count} previous peers succeeded", - ); - peer_change_count += 1; - } - + Ok(Some(_peer_change)) => peer_change_count += 1, // The channel is closed and there are no messages left in the channel. Ok(None) => break, // The channel is still open, but there are no messages left in the channel. @@ -1158,7 +1094,8 @@ async fn add_initial_peers_is_rate_limited() { let elapsed = Instant::now() - before; - assert_eq!(connections.len(), PEER_COUNT); + // Errors are ignored, so we don't expect any peers here + assert_eq!(connections.len(), 0); // Make sure the rate limiting worked by checking if it took long enough assert!( elapsed From c5a514bf47eb3dbcffaca53af58867e903f84420 Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 8 Jun 2023 13:33:15 +1000 Subject: [PATCH 2/6] Move locking out of the cralwer select!, potential deadlock or hang risk --- zebra-network/src/peer_set/initialize.rs | 123 ++++++++++++----------- 1 file changed, 67 insertions(+), 56 deletions(-) diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 78f8383581f..59edd02eee2 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -669,11 +669,10 @@ where enum CrawlerAction { /// Drop the demand signal because there are too many pending handshakes. DemandDrop, - /// Initiate a handshake to `candidate` in response to demand. - DemandHandshake { candidate: MetaAddr }, - /// Crawl existing peers for more peers in response to demand, because there - /// are no available candidates. - DemandCrawl, + /// Initiate a handshake to the next candidate peer in response to demand. + /// + /// If there are no available candidates, crawl existing peers. + DemandHandshakeOrCrawl, /// Crawl existing peers for more peers in response to a timer `tick`. TimerCrawl { tick: Instant }, /// Handle a successfully connected handshake `peer_set_change`. @@ -778,21 +777,25 @@ where ); let crawler_action = tokio::select! { + biased; + // Check for completed handshakes first, because the rest of the app needs them. + // Pending handshakes are limited by the connection limit. next_handshake_res = handshakes.next() => next_handshake_res.expect( "handshakes never terminates, because it contains a future that never resolves" ), + // The timer is rate-limited next_timer = crawl_timer.next() => next_timer.expect("timers never terminate"), - // turn the demand into an action, based on the crawler's current state + // Turn any new demand into an action, based on the crawler's current state. + // + // # Concurrency + // + // Demand is potentially unlimited, so it must go last in a biased select!. _ = demand_rx.next() => { if active_outbound_connections.update_count() >= config.peerset_outbound_connection_limit() { // Too many open outbound connections or pending handshakes already DemandDrop - } else if let Some(candidate) = candidates.next().await { - // candidates.next has a short delay, and briefly holds the address - // book lock, so it shouldn't hang - DemandHandshake { candidate } } else { - DemandCrawl + DemandHandshakeOrCrawl } } }; @@ -804,53 +807,61 @@ where // rapidly. trace!("too many open connections or in-flight handshakes, dropping demand signal"); } - DemandHandshake { candidate } => { - // Increment the connection count before we spawn the connection. - let outbound_connection_tracker = active_outbound_connections.track_connection(); - debug!( - outbound_connections = ?active_outbound_connections.update_count(), - "opening an outbound peer connection" - ); - - // Spawn each handshake into an independent task, so it can make - // progress independently of the crawls. - let hs_join = tokio::spawn(dial( - candidate, - outbound_connector.clone(), - outbound_connection_tracker, - )) - .map(move |res| match res { - Ok(crawler_action) => crawler_action, - Err(e) => { - panic!("panic during handshaking with {candidate:?}: {e:?} "); - } - }) - .in_current_span(); - - handshakes.push(Box::pin(hs_join)); - } - DemandCrawl => { - debug!("demand for peers but no available candidates"); - // update has timeouts, and briefly holds the address book - // lock, so it shouldn't hang + DemandHandshakeOrCrawl => { + // Try to get the next available peer for a handshake. // - // TODO: refactor candidates into a buffered service, so we can - // spawn independent tasks to avoid deadlocks - let more_peers = candidates.update().await?; + // candidates.next has a short timeout, and briefly holds the address + // book lock, so it shouldn't hang + if let Some(candidate) = candidates.next().await { + // Increment the connection count before we spawn the connection. + let outbound_connection_tracker = + active_outbound_connections.track_connection(); + debug!( + outbound_connections = ?active_outbound_connections.update_count(), + "opening an outbound peer connection" + ); - // If we got more peers, try to connect to a new peer. - // - // # Security - // - // Update attempts are rate-limited by the candidate set. - // - // We only try peers if there was actually an update. - // So if all peers have had a recent attempt, - // and there was recent update with no peers, - // the channel will drain. - // This prevents useless update attempt loops. - if let Some(more_peers) = more_peers { - let _ = demand_tx.try_send(more_peers); + // Spawn each handshake into an independent task, so it can make + // progress independently of the crawls. + let hs_join = tokio::spawn(dial( + candidate, + outbound_connector.clone(), + outbound_connection_tracker, + )) + .map(move |res| match res { + Ok(crawler_action) => crawler_action, + Err(e) => { + panic!("panic during handshaking with {candidate:?}: {e:?} "); + } + }) + .in_current_span(); + + handshakes.push(Box::pin(hs_join)); + } else { + // There weren't any peers, so try to get more peers. + debug!("demand for peers but no available candidates"); + + // update has timeouts, and briefly holds the address book + // lock, so it shouldn't hang + // + // TODO: refactor candidates into a buffered service, so we can + // spawn independent tasks to avoid deadlocks + let more_peers = candidates.update().await?; + + // If we got more peers, try to connect to a new peer on our next loop. + // + // # Security + // + // Update attempts are rate-limited by the candidate set. + // + // We only try peers if there was actually an update. + // So if all peers have had a recent attempt, + // and there was recent update with no peers, + // the channel will drain. + // This prevents useless update attempt loops. + if let Some(more_peers) = more_peers { + let _ = demand_tx.try_send(more_peers); + } } } TimerCrawl { tick } => { From 99592273acc0634d0c5d031900ee5727ec25a830 Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 8 Jun 2023 13:45:46 +1000 Subject: [PATCH 3/6] Move report_failed() out of the CandidateSet, reducing concurrency risks --- zebra-network/src/peer_set/candidate_set.rs | 22 +++++--------------- zebra-network/src/peer_set/initialize.rs | 23 +++++++++++++++++++-- 2 files changed, 26 insertions(+), 19 deletions(-) diff --git a/zebra-network/src/peer_set/candidate_set.rs b/zebra-network/src/peer_set/candidate_set.rs index 76006672c9a..7d83f5c0237 100644 --- a/zebra-network/src/peer_set/candidate_set.rs +++ b/zebra-network/src/peer_set/candidate_set.rs @@ -180,8 +180,6 @@ where /// The handshaker sets up the peer message receiver so it also sends a /// [`Responded`] peer address update. /// - /// [`report_failed`][Self::report_failed] puts peers into the [`Failed`] state. - /// /// [`next`][Self::next] puts peers into the [`AttemptPending`] state. /// /// ## Security @@ -410,22 +408,12 @@ where Some(next_peer) } +} - /// Mark `addr` as a failed peer. - pub async fn report_failed(&mut self, addr: &MetaAddr) { - let addr = MetaAddr::new_errored(addr.addr, addr.services); - - // # Correctness - // - // Spawn address book accesses on a blocking thread, - // to avoid deadlocks (see #1976). - let address_book = self.address_book.clone(); - let span = Span::current(); - tokio::task::spawn_blocking(move || { - span.in_scope(|| address_book.lock().unwrap().update(addr)) - }) - .await - .expect("panic in peer failure address book update task"); +impl CandidateSet { + /// Returns the address book for this `CandidateSet`. + pub async fn address_book(&self) -> Arc> { + self.address_book.clone() } } diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 59edd02eee2..3cfbbd14043 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -27,6 +27,7 @@ use tokio_stream::wrappers::IntervalStream; use tower::{ buffer::Buffer, discover::Change, layer::Layer, util::BoxService, Service, ServiceExt, }; +use tracing::Span; use tracing_futures::Instrument; use zebra_chain::chain_tip::ChainTip; @@ -752,6 +753,8 @@ where "starting the peer address crawler", ); + let address_book = candidates.address_book().await; + let mut handshakes = FuturesUnordered::new(); // returns None when empty. // Keeping an unresolved future in the pool means the stream @@ -882,9 +885,9 @@ where } HandshakeFailed { failed_addr } => { // The connection was never opened, or it failed the handshake and was dropped. - debug!(?failed_addr.addr, "marking candidate as failed"); - candidates.report_failed(&failed_addr).await; + report_failed(address_book.clone(), failed_addr).await; + // The demand signal that was taken out of the queue // to attempt to connect to the failed candidate never // turned into a connection, so add it back: @@ -901,6 +904,22 @@ where } } +/// Mark `addr` as a failed peer. +async fn report_failed(address_book: Arc>, addr: MetaAddr) { + let addr = MetaAddr::new_errored(addr.addr, addr.services); + + // # Correctness + // + // Spawn address book accesses on a blocking thread, + // to avoid deadlocks (see #1976). + let span = Span::current(); + tokio::task::spawn_blocking(move || { + span.in_scope(|| address_book.lock().unwrap().update(addr)) + }) + .await + .expect("panic in peer failure address book update task"); +} + /// Try to connect to `candidate` using `outbound_connector`. /// Uses `outbound_connection_tracker` to track the active connection count. /// From 74ed89d7d7d6b6133551ce7b880995ce3f6b2022 Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 8 Jun 2023 14:19:42 +1000 Subject: [PATCH 4/6] Make CandidateSet Send --- zebra-network/src/peer_set/candidate_set.rs | 10 ++++++---- zebra-network/src/peer_set/candidate_set/tests/prop.rs | 2 +- zebra-network/src/peer_set/initialize.rs | 2 +- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/zebra-network/src/peer_set/candidate_set.rs b/zebra-network/src/peer_set/candidate_set.rs index 7d83f5c0237..f3126e6adde 100644 --- a/zebra-network/src/peer_set/candidate_set.rs +++ b/zebra-network/src/peer_set/candidate_set.rs @@ -125,7 +125,11 @@ mod tests; // When we add the Seed state: // * show that seed peers that transition to other never attempted // states are already in the address book -pub(crate) struct CandidateSet { +pub(crate) struct CandidateSet +where + S: Service + Send, + S::Future: Send + 'static, +{ // Correctness: the address book must be private, // so all operations are performed on a blocking thread (see #1976). address_book: Arc>, @@ -136,7 +140,7 @@ pub(crate) struct CandidateSet { impl CandidateSet where - S: Service, + S: Service + Send, S::Future: Send + 'static, { /// Uses `address_book` and `peer_service` to manage a [`CandidateSet`] of peers. @@ -408,9 +412,7 @@ where Some(next_peer) } -} -impl CandidateSet { /// Returns the address book for this `CandidateSet`. pub async fn address_book(&self) -> Arc> { self.address_book.clone() diff --git a/zebra-network/src/peer_set/candidate_set/tests/prop.rs b/zebra-network/src/peer_set/candidate_set/tests/prop.rs index 394e35df6c3..d77b190c67b 100644 --- a/zebra-network/src/peer_set/candidate_set/tests/prop.rs +++ b/zebra-network/src/peer_set/candidate_set/tests/prop.rs @@ -139,7 +139,7 @@ proptest! { /// - if no reconnection peer is returned at all. async fn check_candidates_rate_limiting(candidate_set: &mut CandidateSet, candidates: u32) where - S: tower::Service, + S: tower::Service + Send, S::Future: Send + 'static, { let mut now = Instant::now(); diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 3cfbbd14043..53e18f0dab4 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -734,7 +734,7 @@ where + Send + 'static, C::Future: Send + 'static, - S: Service, + S: Service + Send + Sync + 'static, S::Future: Send + 'static, { use CrawlerAction::*; From 5effe36b5716070eef8b6926e0d613e49bbce4ba Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 8 Jun 2023 14:50:29 +1000 Subject: [PATCH 5/6] Make all CandidateSet operations concurrent, previous hand/deadlock bug --- zebra-network/src/peer_set/initialize.rs | 247 ++++++++++++++--------- 1 file changed, 156 insertions(+), 91 deletions(-) diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 53e18f0dab4..15b7c152898 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -683,6 +683,10 @@ enum CrawlerAction { }, /// Handle a handshake failure to `failed_addr`. HandshakeFailed { failed_addr: MetaAddr }, + /// Handle a finished demand crawl (DemandHandshakeOrCrawl with no peers). + DemandCrawlFinished, + /// Handle a finished TimerCrawl. + TimerCrawlFinished, } /// Given a channel `demand_rx` that signals a need for new peers, try to find @@ -720,7 +724,7 @@ async fn crawl_and_dial( config: Config, mut demand_tx: futures::channel::mpsc::Sender, mut demand_rx: futures::channel::mpsc::Receiver, - mut candidates: CandidateSet, + candidates: CandidateSet, outbound_connector: C, mut peerset_tx: futures::channel::mpsc::Sender, mut active_outbound_connections: ActiveConnectionCounter, @@ -739,14 +743,6 @@ where { use CrawlerAction::*; - // CORRECTNESS - // - // To avoid hangs and starvation, the crawler must: - // - spawn a separate task for each crawl and handshake, so they can make - // progress independently (and avoid deadlocking each other) - // - use the `select!` macro for all actions, because the `select` function - // is biased towards the first ready future - info!( crawl_new_peer_interval = ?config.crawl_new_peer_interval, outbound_connections = ?active_outbound_connections.update_count(), @@ -755,12 +751,17 @@ where let address_book = candidates.address_book().await; + // # Concurrency + // + // Allow tasks using the candidate set to be spawned, so they can run concurrently. + // Previously, Zebra has had deadlocks and long hangs caused by running dependent + // candidate set futures in the same async task. + let candidates = Arc::new(futures::lock::Mutex::new(candidates)); + + // This contains both crawl and handshake tasks. let mut handshakes = FuturesUnordered::new(); // returns None when empty. - // Keeping an unresolved future in the pool means the stream - // never terminates. - // We could use StreamExt::select_next_some and StreamExt::fuse, but `fuse` - // prevents us from adding items to the stream and checking its length. + // Keeping an unresolved future in the pool means the stream never terminates. handshakes.push(future::pending().boxed()); let mut crawl_timer = tokio::time::interval(config.crawl_new_peer_interval); @@ -770,6 +771,10 @@ where let mut crawl_timer = IntervalStream::new(crawl_timer).map(|tick| TimerCrawl { tick }); + // # Concurrency + // + // To avoid hangs and starvation, the crawler must spawn a separate task for each crawl + // and handshake, so they can make progress independently (and avoid deadlocking each other). loop { metrics::gauge!( "crawler.in_flight_handshakes", @@ -804,83 +809,89 @@ where }; match crawler_action { + // Dummy actions DemandDrop => { // This is set to trace level because when the peerset is - // congested it can generate a lot of demand signal very - // rapidly. + // congested it can generate a lot of demand signal very rapidly. trace!("too many open connections or in-flight handshakes, dropping demand signal"); } - DemandHandshakeOrCrawl => { - // Try to get the next available peer for a handshake. - // - // candidates.next has a short timeout, and briefly holds the address - // book lock, so it shouldn't hang - if let Some(candidate) = candidates.next().await { - // Increment the connection count before we spawn the connection. - let outbound_connection_tracker = - active_outbound_connections.track_connection(); - debug!( - outbound_connections = ?active_outbound_connections.update_count(), - "opening an outbound peer connection" - ); - // Spawn each handshake into an independent task, so it can make - // progress independently of the crawls. - let hs_join = tokio::spawn(dial( - candidate, - outbound_connector.clone(), - outbound_connection_tracker, - )) - .map(move |res| match res { - Ok(crawler_action) => crawler_action, - Err(e) => { - panic!("panic during handshaking with {candidate:?}: {e:?} "); - } - }) - .in_current_span(); - - handshakes.push(Box::pin(hs_join)); - } else { - // There weren't any peers, so try to get more peers. - debug!("demand for peers but no available candidates"); + // Spawned futures + DemandHandshakeOrCrawl => { + let candidates = candidates.clone(); + let demand_tx = demand_tx.clone(); + let outbound_connector = outbound_connector.clone(); - // update has timeouts, and briefly holds the address book - // lock, so it shouldn't hang - // - // TODO: refactor candidates into a buffered service, so we can - // spawn independent tasks to avoid deadlocks - let more_peers = candidates.update().await?; + // Increment the connection count before we spawn the connection. + let outbound_connection_tracker = active_outbound_connections.track_connection(); + debug!( + outbound_connections = ?active_outbound_connections.update_count(), + "opening an outbound peer connection" + ); - // If we got more peers, try to connect to a new peer on our next loop. - // - // # Security + // Spawn each handshake or crawl into an independent task, so handshakes can make + // progress while crawls are running. + let handshake_or_crawl_handle = tokio::spawn(async move { + // Try to get the next available peer for a handshake. // - // Update attempts are rate-limited by the candidate set. + // candidates.next() has a short timeout, and briefly holds the address + // book lock, so it shouldn't hang. // - // We only try peers if there was actually an update. - // So if all peers have had a recent attempt, - // and there was recent update with no peers, - // the channel will drain. - // This prevents useless update attempt loops. - if let Some(more_peers) = more_peers { - let _ = demand_tx.try_send(more_peers); + // Hold the lock for as short a time as possible. + let candidate = { candidates.lock().await.next().await }; + + if let Some(candidate) = candidate { + // we don't need to spawn here, because there's nothing running concurrently + dial(candidate, outbound_connector, outbound_connection_tracker).await + } else { + // There weren't any peers, so try to get more peers. + debug!("demand for peers but no available candidates"); + + crawl(candidates, demand_tx).await; + + DemandCrawlFinished } - } + }) + .map(move |res| match res { + Ok(crawler_action) => crawler_action, + Err(e) => { + panic!("panic during handshaking: {e:?} "); + } + }) + .in_current_span(); + + handshakes.push(Box::pin(handshake_or_crawl_handle)); } TimerCrawl { tick } => { - debug!( - ?tick, - "crawling for more peers in response to the crawl timer" - ); - // TODO: spawn independent tasks to avoid deadlocks - candidates.update().await?; - // Try to connect to a new peer. - let _ = demand_tx.try_send(MorePeers); + let candidates = candidates.clone(); + let demand_tx = demand_tx.clone(); + + let crawl_handle = tokio::spawn(async move { + debug!( + ?tick, + "crawling for more peers in response to the crawl timer" + ); + + crawl(candidates, demand_tx).await; + + TimerCrawlFinished + }) + .map(move |res| match res { + Ok(crawler_action) => crawler_action, + Err(e) => { + panic!("panic during TimerCrawl: {tick:?} {e:?} "); + } + }) + .in_current_span(); + + handshakes.push(Box::pin(crawl_handle)); } + + // Completed futures HandshakeConnected { address, client } => { debug!(candidate.addr = ?address, "successfully dialed new peer"); - // The connection limit makes sure this send doesn't block in production code. + // The connection limit makes sure this send doesn't block. peerset_tx.send((address, client)).await?; } HandshakeFailed { failed_addr } => { @@ -888,13 +899,22 @@ where debug!(?failed_addr.addr, "marking candidate as failed"); report_failed(address_book.clone(), failed_addr).await; - // The demand signal that was taken out of the queue - // to attempt to connect to the failed candidate never - // turned into a connection, so add it back: + // The demand signal that was taken out of the queue to attempt to connect to the + // failed candidate never turned into a connection, so add it back. // - // Security: handshake failures are rate-limited by peer attempt timeouts. + // # Security + // + // Handshake failures are rate-limited by peer attempt timeouts. let _ = demand_tx.try_send(MorePeers); } + DemandCrawlFinished => { + // This is set to trace level because when the peerset is + // congested it can generate a lot of demand signal very rapidly. + trace!("demand-based crawl finished"); + } + TimerCrawlFinished => { + debug!("timer-based crawl finished"); + } } // Security: Let other tasks run after each crawler action is processed. @@ -904,20 +924,48 @@ where } } -/// Mark `addr` as a failed peer. -async fn report_failed(address_book: Arc>, addr: MetaAddr) { - let addr = MetaAddr::new_errored(addr.addr, addr.services); +/// Try to get more peers using `candidates`, then queue a connection attempt using `demand_tx`. +/// If there were no new peers, the connection attempt is skipped. +#[instrument(skip(candidates, demand_tx))] +async fn crawl( + candidates: Arc>>, + mut demand_tx: futures::channel::mpsc::Sender, +) where + S: Service + Send + Sync + 'static, + S::Future: Send + 'static, +{ + // update() has timeouts, and briefly holds the address book + // lock, so it shouldn't hang. + // Try to get new peers, holding the lock for as short a time as possible. + let result = { + let result = candidates.lock().await.update().await; + std::mem::drop(candidates); + result + }; + let more_peers = match result { + Ok(more_peers) => more_peers, + Err(e) => { + info!( + ?e, + "candidate set returned an error, is Zebra shutting down?" + ); + return; + } + }; - // # Correctness + // If we got more peers, try to connect to a new peer on our next loop. // - // Spawn address book accesses on a blocking thread, - // to avoid deadlocks (see #1976). - let span = Span::current(); - tokio::task::spawn_blocking(move || { - span.in_scope(|| address_book.lock().unwrap().update(addr)) - }) - .await - .expect("panic in peer failure address book update task"); + // # Security + // + // Update attempts are rate-limited by the candidate set, + // and we only try peers if there was actually an update. + // + // So if all peers have had a recent attempt, and there was recent update + // with no peers, the channel will drain. This prevents useless update attempt + // loops. + if let Some(more_peers) = more_peers { + let _ = demand_tx.try_send(more_peers); + } } /// Try to connect to `candidate` using `outbound_connector`. @@ -941,7 +989,7 @@ where + 'static, C::Future: Send + 'static, { - // CORRECTNESS + // # Correctness // // To avoid hangs, the dialer must only await: // - functions that return immediately, or @@ -968,6 +1016,23 @@ where .await } +/// Mark `addr` as a failed peer in `address_book`. +#[instrument(skip(address_book))] +async fn report_failed(address_book: Arc>, addr: MetaAddr) { + let addr = MetaAddr::new_errored(addr.addr, addr.services); + + // # Correctness + // + // Spawn address book accesses on a blocking thread, + // to avoid deadlocks (see #1976). + let span = Span::current(); + tokio::task::spawn_blocking(move || { + span.in_scope(|| address_book.lock().unwrap().update(addr)) + }) + .await + .expect("panic in peer failure address book update task"); +} + impl From> for CrawlerAction { fn from(dial_result: Result<(PeerSocketAddr, peer::Client), (MetaAddr, BoxError)>) -> Self { use CrawlerAction::*; From 1e643e1c8906cb31d23c8fdb5d27e659fa8363d1 Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 8 Jun 2023 15:43:02 +1000 Subject: [PATCH 6/6] Reduce the gap between handshakes and peer set updates, and exit the task on shutdown --- zebra-network/src/peer_set/initialize.rs | 178 +++++++++++++---------- 1 file changed, 100 insertions(+), 78 deletions(-) diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 15b7c152898..d306475b722 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -676,16 +676,11 @@ enum CrawlerAction { DemandHandshakeOrCrawl, /// Crawl existing peers for more peers in response to a timer `tick`. TimerCrawl { tick: Instant }, - /// Handle a successfully connected handshake `peer_set_change`. - HandshakeConnected { - address: PeerSocketAddr, - client: peer::Client, - }, - /// Handle a handshake failure to `failed_addr`. - HandshakeFailed { failed_addr: MetaAddr }, - /// Handle a finished demand crawl (DemandHandshakeOrCrawl with no peers). + /// Clear a finished handshake. + HandshakeFinished, + /// Clear a finished demand crawl (DemandHandshakeOrCrawl with no peers). DemandCrawlFinished, - /// Handle a finished TimerCrawl. + /// Clear a finished TimerCrawl. TimerCrawlFinished, } @@ -722,11 +717,11 @@ enum CrawlerAction { )] async fn crawl_and_dial( config: Config, - mut demand_tx: futures::channel::mpsc::Sender, + demand_tx: futures::channel::mpsc::Sender, mut demand_rx: futures::channel::mpsc::Receiver, candidates: CandidateSet, outbound_connector: C, - mut peerset_tx: futures::channel::mpsc::Sender, + peerset_tx: futures::channel::mpsc::Sender, mut active_outbound_connections: ActiveConnectionCounter, ) -> Result<(), BoxError> where @@ -792,35 +787,37 @@ where "handshakes never terminates, because it contains a future that never resolves" ), // The timer is rate-limited - next_timer = crawl_timer.next() => next_timer.expect("timers never terminate"), + next_timer = crawl_timer.next() => Ok(next_timer.expect("timers never terminate")), // Turn any new demand into an action, based on the crawler's current state. // // # Concurrency // // Demand is potentially unlimited, so it must go last in a biased select!. - _ = demand_rx.next() => { + next_demand = demand_rx.next() => next_demand.ok_or("demand stream closed, is Zebra shutting down?".into()).map(|MorePeers|{ if active_outbound_connections.update_count() >= config.peerset_outbound_connection_limit() { // Too many open outbound connections or pending handshakes already DemandDrop } else { DemandHandshakeOrCrawl } - } + }) }; match crawler_action { // Dummy actions - DemandDrop => { + Ok(DemandDrop) => { // This is set to trace level because when the peerset is // congested it can generate a lot of demand signal very rapidly. trace!("too many open connections or in-flight handshakes, dropping demand signal"); } - // Spawned futures - DemandHandshakeOrCrawl => { + // Spawned tasks + Ok(DemandHandshakeOrCrawl) => { let candidates = candidates.clone(); - let demand_tx = demand_tx.clone(); let outbound_connector = outbound_connector.clone(); + let peerset_tx = peerset_tx.clone(); + let address_book = address_book.clone(); + let demand_tx = demand_tx.clone(); // Increment the connection count before we spawn the connection. let outbound_connection_tracker = active_outbound_connections.track_connection(); @@ -842,27 +839,37 @@ where if let Some(candidate) = candidate { // we don't need to spawn here, because there's nothing running concurrently - dial(candidate, outbound_connector, outbound_connection_tracker).await + dial( + candidate, + outbound_connector, + outbound_connection_tracker, + peerset_tx, + address_book, + demand_tx, + ) + .await?; + + Ok(HandshakeFinished) } else { // There weren't any peers, so try to get more peers. debug!("demand for peers but no available candidates"); - crawl(candidates, demand_tx).await; + crawl(candidates, demand_tx).await?; - DemandCrawlFinished + Ok(DemandCrawlFinished) } }) .map(move |res| match res { Ok(crawler_action) => crawler_action, Err(e) => { - panic!("panic during handshaking: {e:?} "); + panic!("panic during handshaking: {e:?}"); } }) .in_current_span(); handshakes.push(Box::pin(handshake_or_crawl_handle)); } - TimerCrawl { tick } => { + Ok(TimerCrawl { tick }) => { let candidates = candidates.clone(); let demand_tx = demand_tx.clone(); @@ -872,14 +879,14 @@ where "crawling for more peers in response to the crawl timer" ); - crawl(candidates, demand_tx).await; + crawl(candidates, demand_tx).await?; - TimerCrawlFinished + Ok(TimerCrawlFinished) }) .map(move |res| match res { Ok(crawler_action) => crawler_action, Err(e) => { - panic!("panic during TimerCrawl: {tick:?} {e:?} "); + panic!("panic during TimerCrawl: {tick:?} {e:?}"); } }) .in_current_span(); @@ -887,34 +894,24 @@ where handshakes.push(Box::pin(crawl_handle)); } - // Completed futures - HandshakeConnected { address, client } => { - debug!(candidate.addr = ?address, "successfully dialed new peer"); - - // The connection limit makes sure this send doesn't block. - peerset_tx.send((address, client)).await?; + // Completed spawned tasks + Ok(HandshakeFinished) => { + // Already logged in dial() } - HandshakeFailed { failed_addr } => { - // The connection was never opened, or it failed the handshake and was dropped. - debug!(?failed_addr.addr, "marking candidate as failed"); - report_failed(address_book.clone(), failed_addr).await; - - // The demand signal that was taken out of the queue to attempt to connect to the - // failed candidate never turned into a connection, so add it back. - // - // # Security - // - // Handshake failures are rate-limited by peer attempt timeouts. - let _ = demand_tx.try_send(MorePeers); - } - DemandCrawlFinished => { + Ok(DemandCrawlFinished) => { // This is set to trace level because when the peerset is // congested it can generate a lot of demand signal very rapidly. trace!("demand-based crawl finished"); } - TimerCrawlFinished => { + Ok(TimerCrawlFinished) => { debug!("timer-based crawl finished"); } + + // Fatal errors and shutdowns + Err(error) => { + info!(?error, "crawler task exiting due to an error"); + return Err(error); + } } // Security: Let other tasks run after each crawler action is processed. @@ -930,7 +927,8 @@ where async fn crawl( candidates: Arc>>, mut demand_tx: futures::channel::mpsc::Sender, -) where +) -> Result<(), BoxError> +where S: Service + Send + Sync + 'static, S::Future: Send + 'static, { @@ -949,7 +947,7 @@ async fn crawl( ?e, "candidate set returned an error, is Zebra shutting down?" ); - return; + return Err(e); } }; @@ -964,21 +962,38 @@ async fn crawl( // with no peers, the channel will drain. This prevents useless update attempt // loops. if let Some(more_peers) = more_peers { - let _ = demand_tx.try_send(more_peers); + if let Err(send_error) = demand_tx.try_send(more_peers) { + if send_error.is_disconnected() { + // Zebra is shutting down + return Err(send_error.into()); + } + } } + + Ok(()) } /// Try to connect to `candidate` using `outbound_connector`. /// Uses `outbound_connection_tracker` to track the active connection count. /// -/// Returns a `HandshakeConnected` action on success, and a -/// `HandshakeFailed` action on error. -#[instrument(skip(outbound_connector, outbound_connection_tracker))] +/// On success, sends peers to `peerset_tx`. +/// On failure, marks the peer as failed in the address book, +/// then re-adds demand to `demand_tx`. +#[instrument(skip( + outbound_connector, + outbound_connection_tracker, + peerset_tx, + address_book, + demand_tx +))] async fn dial( candidate: MetaAddr, mut outbound_connector: C, outbound_connection_tracker: ConnectionTracker, -) -> CrawlerAction + mut peerset_tx: futures::channel::mpsc::Sender, + address_book: Arc>, + mut demand_tx: futures::channel::mpsc::Sender, +) -> Result<(), BoxError> where C: Service< OutboundConnectorRequest, @@ -998,10 +1013,7 @@ where debug!(?candidate.addr, "attempting outbound connection in response to demand"); // the connector is always ready, so this can't hang - let outbound_connector = outbound_connector - .ready() - .await - .expect("outbound connector never errors"); + let outbound_connector = outbound_connector.ready().await?; let req = OutboundConnectorRequest { addr: candidate.addr, @@ -1009,11 +1021,36 @@ where }; // the handshake has timeouts, so it shouldn't hang - outbound_connector - .call(req) - .map_err(|e| (candidate, e)) - .map(Into::into) - .await + let handshake_result = outbound_connector.call(req).map(Into::into).await; + + match handshake_result { + Ok((address, client)) => { + debug!(?candidate.addr, "successfully dialed new peer"); + + // The connection limit makes sure this send doesn't block. + peerset_tx.send((address, client)).await?; + } + // The connection was never opened, or it failed the handshake and was dropped. + Err(error) => { + debug!(?error, ?candidate.addr, "failed to make outbound connection to peer"); + report_failed(address_book.clone(), candidate).await; + + // The demand signal that was taken out of the queue to attempt to connect to the + // failed candidate never turned into a connection, so add it back. + // + // # Security + // + // Handshake failures are rate-limited by peer attempt timeouts. + if let Err(send_error) = demand_tx.try_send(MorePeers) { + if send_error.is_disconnected() { + // Zebra is shutting down + return Err(send_error.into()); + } + } + } + } + + Ok(()) } /// Mark `addr` as a failed peer in `address_book`. @@ -1032,18 +1069,3 @@ async fn report_failed(address_book: Arc>, addr: M .await .expect("panic in peer failure address book update task"); } - -impl From> for CrawlerAction { - fn from(dial_result: Result<(PeerSocketAddr, peer::Client), (MetaAddr, BoxError)>) -> Self { - use CrawlerAction::*; - match dial_result { - Ok((address, client)) => HandshakeConnected { address, client }, - Err((candidate, e)) => { - debug!(?candidate.addr, ?e, "failed to connect to candidate"); - HandshakeFailed { - failed_addr: candidate, - } - } - } - } -}