Skip to content

Commit

Permalink
fix(net): Add outer timeouts for critical network operations to avoid…
Browse files Browse the repository at this point in the history
… hangs (#7869)

* Refactor out try_to_sync_once()

* Add outer timeouts for obtaining and extending tips

* Refactor out request_genesis_once()

* Wrap genesis download once in a timeout

* Increase the genesis timeout to avoid denial of service from old nodes

* Add an outer timeout to mempool crawls

* Add an outer timeout to mempool download/verify

* Remove threaded mutex blocking from the inbound service

* Explain why inbound readiness never hangs

* Fix whitespace that cargo fmt doesn't

* Avoid hangs by always resetting the past lookahead limit flag

* Document block-specific and syncer-wide errors

* Update zebrad/src/components/sync.rs

Co-authored-by: Marek <[email protected]>

* Use correct condition for log messages

Co-authored-by: Marek <[email protected]>

* Keep lookahead reset metric

---------

Co-authored-by: Arya <[email protected]>
Co-authored-by: Marek <[email protected]>
  • Loading branch information
3 people authored Nov 2, 2023
1 parent afbe807 commit 628b3e3
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 98 deletions.
4 changes: 4 additions & 0 deletions zebra-network/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1289,11 +1289,15 @@ where
// <https://docs.rs/tower/latest/tower/buffer/struct.Buffer.html#a-note-on-choosing-a-bound>
//
// The inbound service must be called immediately after a buffer slot is reserved.
//
// The inbound service never times out in readiness, because the load shed layer is always
// ready, and returns an error in response to the request instead.
if self.svc.ready().await.is_err() {
self.fail_with(PeerError::ServiceShutdown).await;
return;
}

// Inbound service request timeouts are handled by the timeout layer in `start::start()`.
let rsp = match self.svc.call(req.clone()).await {
Err(e) => {
if e.is::<tower::load_shed::error::Overloaded>() {
Expand Down
1 change: 1 addition & 0 deletions zebrad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ toml = "0.8.3"
futures = "0.3.29"
rayon = "1.7.0"
tokio = { version = "1.33.0", features = ["time", "rt-multi-thread", "macros", "tracing", "signal"] }
tokio-stream = { version = "0.1.14", features = ["time"] }
tower = { version = "0.4.13", features = ["hedge", "limit"] }
pin-project = "1.1.3"

Expand Down
58 changes: 47 additions & 11 deletions zebrad/src/components/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
collections::HashSet,
future::Future,
pin::Pin,
sync::Arc,
sync::{Arc, TryLockError},
task::{Context, Poll},
time::Duration,
};
Expand Down Expand Up @@ -278,7 +278,11 @@ impl Service<zn::Request> for Inbound {
}
}
Err(TryRecvError::Empty) => {
// There's no setup data yet, so keep waiting for it
// There's no setup data yet, so keep waiting for it.
//
// We could use Future::poll() to get a waker and return Poll::Pending here.
// But we want to drop excess requests during startup instead. Otherwise,
// the inbound service gets overloaded, and starts disconnecting peers.
result = Ok(());
Setup::Pending {
full_verify_concurrency_limit,
Expand Down Expand Up @@ -307,6 +311,11 @@ impl Service<zn::Request> for Inbound {
mempool,
state,
} => {
// # Correctness
//
// Clear the stream but ignore the final Pending return value.
// If we returned Pending here, and there were no waiting block downloads,
// then inbound requests would wait for the next block download, and hang forever.
while let Poll::Ready(Some(_)) = block_downloads.as_mut().poll_next(cx) {}

result = Ok(());
Expand Down Expand Up @@ -366,29 +375,56 @@ impl Service<zn::Request> for Inbound {
//
// # Correctness
//
// Briefly hold the address book threaded mutex while
// cloning the address book. Then sanitize in the future,
// after releasing the lock.
let peers = address_book.lock().unwrap().clone();
// If the address book is busy, try again inside the future. If it can't be locked
// twice, ignore the request.
let address_book = address_book.clone();

let get_peers = move || match address_book.try_lock() {
Ok(address_book) => Some(address_book.clone()),
Err(TryLockError::WouldBlock) => None,
Err(TryLockError::Poisoned(_)) => panic!("previous thread panicked while holding the address book lock"),
};

let peers = get_peers();

async move {
// Correctness: get the current time after acquiring the address book lock.
// Correctness: get the current time inside the future.
//
// This time is used to filter outdated peers, so it doesn't really matter
// This time is used to filter outdated peers, so it doesn't matter much
// if we get it when the future is created, or when it starts running.
let now = Utc::now();

// If we didn't get the peers when the future was created, wait for other tasks
// to run, then try again when the future first runs.
if peers.is_none() {
tokio::task::yield_now().await;
}
let peers = peers.or_else(get_peers);
let is_busy = peers.is_none();

// Send a sanitized response
let mut peers = peers.sanitized(now);
let mut peers = peers.map_or_else(Vec::new, |peers| peers.sanitized(now));

// Truncate the list
let address_limit = div_ceil(peers.len(), ADDR_RESPONSE_LIMIT_DENOMINATOR);
let address_limit = MAX_ADDRS_IN_MESSAGE.min(address_limit);
peers.truncate(address_limit);

if peers.is_empty() {
// We don't know if the peer response will be empty until we've sanitized them.
debug!("ignoring `Peers` request from remote peer because our address book is empty");
// Sometimes we don't know if the peer response will be empty until we've
// sanitized them.
if is_busy {
info!(
"ignoring `Peers` request from remote peer because our address \
book is busy"
);
} else {
debug!(
"ignoring `Peers` request from remote peer because our address \
book has no available peers"
);
}

Ok(zn::Response::Nil)
} else {
Ok(zn::Response::Peers(peers))
Expand Down
21 changes: 16 additions & 5 deletions zebrad/src/components/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ use std::{
collections::HashSet,
future::Future,
iter,
pin::Pin,
pin::{pin, Pin},
task::{Context, Poll},
};

use futures::{future::FutureExt, stream::Stream};
use tokio::sync::broadcast;
use tokio_stream::StreamExt;
use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service};

use zebra_chain::{
Expand All @@ -42,7 +43,7 @@ use zebra_node_services::mempool::{Gossip, Request, Response};
use zebra_state as zs;
use zebra_state::{ChainTipChange, TipAction};

use crate::components::sync::SyncStatus;
use crate::components::{mempool::crawler::RATE_LIMIT_DELAY, sync::SyncStatus};

pub mod config;
mod crawler;
Expand Down Expand Up @@ -580,9 +581,11 @@ impl Service<Request> for Mempool {
let best_tip_height = self.latest_chain_tip.best_tip_height();

// Clean up completed download tasks and add to mempool if successful.
while let Poll::Ready(Some(r)) = tx_downloads.as_mut().poll_next(cx) {
while let Poll::Ready(Some(r)) =
pin!(tx_downloads.timeout(RATE_LIMIT_DELAY)).poll_next(cx)
{
match r {
Ok((tx, expected_tip_height)) => {
Ok(Ok((tx, expected_tip_height))) => {
// # Correctness:
//
// It's okay to use tip height here instead of the tip hash since
Expand All @@ -609,12 +612,20 @@ impl Service<Request> for Mempool {
tx_downloads.download_if_needed_and_verify(tx.transaction.into());
}
}
Err((txid, error)) => {
Ok(Err((txid, error))) => {
tracing::debug!(?txid, ?error, "mempool transaction failed to verify");

metrics::counter!("mempool.failed.verify.tasks.total", 1, "reason" => error.to_string());
storage.reject_if_needed(txid, error);
}
Err(_elapsed) => {
// A timeout happens when the stream hangs waiting for another service,
// so there is no specific transaction ID.

tracing::info!("mempool transaction failed to verify due to timeout");

metrics::counter!("mempool.failed.verify.tasks.total", 1, "reason" => "timeout");
}
};
}

Expand Down
17 changes: 14 additions & 3 deletions zebrad/src/components/mempool/crawler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@
use std::{collections::HashSet, time::Duration};

use futures::{future, pin_mut, stream::FuturesUnordered, StreamExt};
use tokio::{sync::watch, task::JoinHandle, time::sleep};
use tokio::{
sync::watch,
task::JoinHandle,
time::{sleep, timeout},
};
use tower::{timeout::Timeout, BoxError, Service, ServiceExt};
use tracing_futures::Instrument;

Expand All @@ -77,7 +81,7 @@ const FANOUT: usize = 3;
///
/// Using a prime number makes sure that mempool crawler fanouts
/// don't synchronise with other crawls.
const RATE_LIMIT_DELAY: Duration = Duration::from_secs(73);
pub const RATE_LIMIT_DELAY: Duration = Duration::from_secs(73);

/// The time to wait for a peer response.
///
Expand Down Expand Up @@ -191,7 +195,14 @@ where

loop {
self.wait_until_enabled().await?;
self.crawl_transactions().await?;
// Avoid hangs when the peer service is not ready, or due to bugs in async code.
timeout(RATE_LIMIT_DELAY, self.crawl_transactions())
.await
.unwrap_or_else(|timeout| {
// Temporary errors just get logged and ignored.
info!("mempool crawl timed out: {timeout:?}");
Ok(())
})?;
sleep(RATE_LIMIT_DELAY).await;
}
}
Expand Down
Loading

0 comments on commit 628b3e3

Please sign in to comment.