From 68d7198e9ff269c6329c03ed9fec187660167780 Mon Sep 17 00:00:00 2001 From: teor Date: Wed, 24 Nov 2021 03:42:44 +1000 Subject: [PATCH] Re-order Zebra startup, so slow services are launched last (#3091) * Start network before verifiers This makes the Groth16 download task start as late as possible. * Explain why the Groth16 download must happen first * Speed up Zebra shutdown: skip waiting for the tokio runtime --- zebra-consensus/src/chain.rs | 3 +- zebrad/src/commands/start.rs | 44 +-- zebrad/src/components.rs | 2 +- zebrad/src/components/inbound.rs | 359 ++++++++++++------------- zebrad/src/components/inbound/tests.rs | 16 +- zebrad/src/components/tokio.rs | 14 +- 6 files changed, 221 insertions(+), 217 deletions(-) diff --git a/zebra-consensus/src/chain.rs b/zebra-consensus/src/chain.rs index c5884c83102..4e29f25a9a9 100644 --- a/zebra-consensus/src/chain.rs +++ b/zebra-consensus/src/chain.rs @@ -197,7 +197,8 @@ where S: Service + Send + Clone + 'static, S::Future: Send + 'static, { - // pre-download Groth16 parameters async + // Pre-download Groth16 parameters in a separate thread. + // This thread must be launched first, so the download doesn't happen on the startup thread. let groth16_download_handle = spawn_blocking(|| { tracing::info!("checking if Zcash Sapling and Sprout parameters have been downloaded"); diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 0991c7d6f70..338ff8031ba 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -56,6 +56,7 @@ use tower::{builder::ServiceBuilder, util::BoxService}; use crate::{ components::{ + inbound::InboundSetupData, mempool::{self, Mempool}, sync, tokio::{RuntimeRun, TokioComponent}, @@ -83,15 +84,6 @@ impl StartCmd { zebra_state::init(config.state.clone(), config.network.network); let state = ServiceBuilder::new().buffer(20).service(state_service); - info!("initializing verifiers"); - let (chain_verifier, tx_verifier, mut groth16_download_handle) = - zebra_consensus::chain::init( - config.consensus.clone(), - config.network.network, - state.clone(), - ) - .await; - info!("initializing network"); // The service that our node uses to respond to requests by peers. The // load_shed middleware ensures that we reduce the size of the peer set @@ -100,24 +92,33 @@ impl StartCmd { let inbound = ServiceBuilder::new() .load_shed() .buffer(20) - .service(Inbound::new( - setup_rx, - state.clone(), - chain_verifier.clone(), - )); + .service(Inbound::new(setup_rx)); let (peer_set, address_book) = zebra_network::init(config.network.clone(), inbound, latest_chain_tip.clone()).await; + info!("initializing verifiers"); + let (chain_verifier, tx_verifier, mut groth16_download_handle) = + zebra_consensus::chain::init( + config.consensus.clone(), + config.network.network, + state.clone(), + ) + .await; + info!("initializing syncer"); - let (syncer, sync_status) = - ChainSync::new(&config, peer_set.clone(), state.clone(), chain_verifier); + let (syncer, sync_status) = ChainSync::new( + &config, + peer_set.clone(), + state.clone(), + chain_verifier.clone(), + ); info!("initializing mempool"); let (mempool, mempool_transaction_receiver) = Mempool::new( &config.mempool, peer_set.clone(), - state, + state.clone(), tx_verifier, sync_status.clone(), latest_chain_tip, @@ -126,8 +127,15 @@ impl StartCmd { let mempool = BoxService::new(mempool); let mempool = ServiceBuilder::new().buffer(20).service(mempool); + let setup_data = InboundSetupData { + address_book, + block_download_peer_set: peer_set.clone(), + block_verifier: chain_verifier, + mempool: mempool.clone(), + state, + }; setup_tx - .send((peer_set.clone(), address_book, mempool.clone())) + .send(setup_data) .map_err(|_| eyre!("could not send setup data to inbound service"))?; let syncer_error_future = syncer.sync(); diff --git a/zebrad/src/components.rs b/zebrad/src/components.rs index 2e0752b0a75..5daf65a4a79 100644 --- a/zebrad/src/components.rs +++ b/zebrad/src/components.rs @@ -5,7 +5,7 @@ //! component and dependency injection models are designed to work together, but //! don't fit the async context well. -mod inbound; +pub mod inbound; #[allow(missing_docs)] pub mod mempool; pub mod metrics; diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index 20150b0beb6..29b8609c667 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -16,8 +16,7 @@ use futures::{ future::{FutureExt, TryFutureExt}, stream::Stream, }; -use oneshot::error::TryRecvError; -use tokio::sync::oneshot; +use tokio::sync::oneshot::{self, error::TryRecvError}; use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service, ServiceExt}; use zebra_network as zn; @@ -39,35 +38,50 @@ mod tests; use downloads::Downloads as BlockDownloads; -type Outbound = Buffer, zn::Request>; +/// A security parameter to return only 1/3 of available addresses as a +/// response to a `Peers` request. +const FRAC_OF_AVAILABLE_ADDRESS: f64 = 1. / 3.; + +type BlockDownloadPeerSet = + Buffer, zn::Request>; type State = Buffer, zs::Request>; type Mempool = Buffer, mp::Request>; type BlockVerifier = Buffer, block::Hash, VerifyChainError>, Arc>; -type InboundBlockDownloads = BlockDownloads, Timeout, State>; +type GossipedBlockDownloads = + BlockDownloads, Timeout, State>; -pub type NetworkSetupData = (Outbound, Arc>, Mempool); +/// The services used by the [`Inbound`] service. +pub struct InboundSetupData { + /// A shared list of peer addresses. + pub address_book: Arc>, -/// A security parameter to return only 1/3 of available addresses as a -/// response to a `Peers` request. -const FRAC_OF_AVAILABLE_ADDRESS: f64 = 1. / 3.; + /// A service that can be used to download gossiped blocks. + pub block_download_peer_set: BlockDownloadPeerSet, + + /// A service that verifies downloaded blocks. + /// + /// Given to `Inbound.block_downloads` after the required services are set up. + pub block_verifier: BlockVerifier, + + /// A service that manages transactions in the memory pool. + pub mempool: Mempool, + + /// A service that manages cached blockchain state. + pub state: State, +} -/// Tracks the internal state of the [`Inbound`] service during network setup. +/// Tracks the internal state of the [`Inbound`] service during setup. pub enum Setup { - /// Waiting for network setup to complete. + /// Waiting for service setup to complete. /// - /// Requests that depend on Zebra's internal network setup are ignored. - /// Other requests are answered. - AwaitingNetwork { - /// A oneshot channel used to receive the address_book and outbound services - /// after the network is set up. - network_setup: oneshot::Receiver, - - /// A service that verifies downloaded blocks. Given to `block_downloads` - /// after the network is set up. - block_verifier: BlockVerifier, + /// All requests are ignored. + Pending { + /// A oneshot channel used to receive required services, + /// after they are set up. + setup: oneshot::Receiver, }, - /// Network setup is complete. + /// Setup is complete. /// /// All requests are answered. Initialized { @@ -75,24 +89,29 @@ pub enum Setup { address_book: Arc>, /// A `futures::Stream` that downloads and verifies gossiped blocks. - block_downloads: Pin>, + block_downloads: Pin>, /// A service that manages transactions in the memory pool. mempool: Mempool, + + /// A service that manages cached blockchain state. + state: State, }, - /// Temporary state used in the service's internal network initialization - /// code. + /// Temporary state used in the inbound service's internal initialization code. /// - /// If this state occurs outside the service initialization code, the - /// service panics. + /// If this state occurs outside the service initialization code, the service panics. FailedInit, - /// Network setup failed, because the setup channel permanently failed. + /// Setup failed, because the setup channel permanently failed. /// The service keeps returning readiness errors for every request. - FailedRecv { error: SharedRecvError }, + FailedRecv { + /// The original channel error. + error: SharedRecvError, + }, } +/// A wrapper around `Arc` that implements `Error`. #[derive(thiserror::Error, Debug, Clone)] #[error(transparent)] pub struct SharedRecvError(Arc); @@ -112,51 +131,46 @@ impl From for SharedRecvError { /// /// - supplying network data like peer addresses to other nodes; /// - supplying chain data like blocks to other nodes; -/// - performing transaction diffusion; -/// - performing block diffusion. +/// - supplying mempool transactions to other nodes; +/// - receiving gossiped transactions; and +/// - receiving gossiped blocks. /// /// Because the `Inbound` service is responsible for participating in the gossip /// protocols used for transaction and block diffusion, there is a potential -/// overlap with the `ChainSync` component. +/// overlap with the `ChainSync` and `Mempool` components. /// -/// The division of responsibility is that the `ChainSync` component is -/// *internally driven*, periodically polling the network to check whether it is -/// behind the current tip, while the `Inbound` service is *externally driven*, -/// responding to block gossip by attempting to download and validate advertised -/// blocks. +/// The division of responsibility is that: +/// +/// The `ChainSync` and `Mempool` components are *internally driven*, +/// periodically polling the network to check for new blocks or transactions. +/// +/// The `Inbound` service is *externally driven*, responding to block gossip +/// by attempting to download and validate advertised blocks. +/// +/// Gossiped transactions are forwarded to the mempool downloader, +/// which unifies polled and gossiped transactions into a single download list. pub struct Inbound { - /// Provides network-dependent services, if they are available. + /// Provides service dependencies, if they are available. /// - /// Some services are unavailable until Zebra has completed network setup. - network_setup: Setup, - - /// A service that manages cached blockchain state. - state: State, + /// Some services are unavailable until Zebra has completed setup. + setup: Setup, } impl Inbound { /// Create a new inbound service. /// - /// The address book and peer set use the newly created inbound service. - /// So they are sent via the `network_setup` channel after initialization. - pub fn new( - network_setup: oneshot::Receiver, - state: State, - block_verifier: BlockVerifier, - ) -> Self { - Self { - network_setup: Setup::AwaitingNetwork { - network_setup, - block_verifier, - }, - state, + /// Dependent services are sent via the `setup` channel after initialization. + pub fn new(setup: oneshot::Receiver) -> Inbound { + Inbound { + setup: Setup::Pending { setup }, } } + /// Remove `self.setup`, temporarily replacing it with an invalid state. fn take_setup(&mut self) -> Setup { - let mut network_setup = Setup::FailedInit; - std::mem::swap(&mut self.network_setup, &mut network_setup); - network_setup + let mut setup = Setup::FailedInit; + std::mem::swap(&mut self.setup, &mut setup); + setup } } @@ -167,25 +181,30 @@ impl Service for Inbound { Pin> + Send + 'static>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - // Check whether the network setup is finished, but don't wait for it to + // Check whether the setup is finished, but don't wait for it to // become ready before reporting readiness. We expect to get it "soon", // and reporting unreadiness might cause unwanted load-shedding, since // the load-shed middleware is unable to distinguish being unready due // to load from being unready while waiting on setup. - // Every network_setup state handler must provide a result + // Every setup variant handler must provide a result let result; - self.network_setup = match self.take_setup() { - Setup::AwaitingNetwork { - mut network_setup, - block_verifier, - } => match network_setup.try_recv() { - Ok((outbound, address_book, mempool)) => { + self.setup = match self.take_setup() { + Setup::Pending { mut setup } => match setup.try_recv() { + Ok(setup_data) => { + let InboundSetupData { + address_book, + block_download_peer_set, + block_verifier, + mempool, + state, + } = setup_data; + let block_downloads = Box::pin(BlockDownloads::new( - Timeout::new(outbound.clone(), BLOCK_DOWNLOAD_TIMEOUT), + Timeout::new(block_download_peer_set.clone(), BLOCK_DOWNLOAD_TIMEOUT), Timeout::new(block_verifier, BLOCK_VERIFY_TIMEOUT), - self.state.clone(), + state.clone(), )); result = Ok(()); @@ -193,27 +212,25 @@ impl Service for Inbound { address_book, block_downloads, mempool, + state, } } Err(TryRecvError::Empty) => { // There's no setup data yet, so keep waiting for it result = Ok(()); - Setup::AwaitingNetwork { - network_setup, - block_verifier, - } + Setup::Pending { setup } } Err(error @ TryRecvError::Closed) => { - // Mark the service as failed, because network setup failed - error!(?error, "inbound network setup failed"); + // Mark the service as failed, because setup failed + error!(?error, "inbound setup failed"); let error: SharedRecvError = error.into(); result = Err(error.clone().into()); Setup::FailedRecv { error } } }, - // Make sure previous network setups were left in a valid state + // Make sure previous setups were left in a valid state Setup::FailedInit => unreachable!("incomplete previous Inbound initialization"), - // If network setup failed, report service failure + // If setup failed, report service failure Setup::FailedRecv { error } => { result = Err(error.clone().into()); Setup::FailedRecv { error } @@ -223,6 +240,7 @@ impl Service for Inbound { address_book, mut block_downloads, mempool, + state, } => { while let Poll::Ready(Some(_)) = block_downloads.as_mut().poll_next(cx) {} @@ -231,12 +249,13 @@ impl Service for Inbound { address_book, block_downloads, mempool, + state, } } }; - // Make sure we're leaving the network setup in a valid state - if matches!(self.network_setup, Setup::FailedInit) { + // Make sure we're leaving the setup in a valid state + if matches!(self.setup, Setup::FailedInit) { unreachable!("incomplete Inbound initialization after poll_ready state handling"); } @@ -258,38 +277,46 @@ impl Service for Inbound { /// and will cause callers to disconnect from the remote peer. #[instrument(name = "inbound", skip(self, req))] fn call(&mut self, req: zn::Request) -> Self::Future { + let (address_book, block_downloads, mempool, state) = match &mut self.setup { + Setup::Initialized { + address_book, + block_downloads, + mempool, + state, + } => (address_book, block_downloads, mempool, state), + _ => { + debug!("ignoring request from remote peer during setup"); + return async { Ok(zn::Response::Nil) }.boxed(); + } + }; + match req { zn::Request::Peers => { - if let Setup::Initialized { address_book, .. } = &self.network_setup { - // # Security - // - // We truncate the list to not reveal our entire peer set in one call. - // But we don't monitor repeated requests and the results are shuffled, - // a crawler could just send repeated queries and get the full list. - // - // # Correctness - // - // Briefly hold the address book threaded mutex while - // cloning the address book. Then sanitize after releasing - // the lock. - let peers = address_book.lock().unwrap().clone(); - - // Send a sanitized response - let mut peers = peers.sanitized(); - - // Truncate the list - let truncate_at = MAX_ADDRS_IN_MESSAGE - .min((peers.len() as f64 * FRAC_OF_AVAILABLE_ADDRESS).ceil() as usize); - peers.truncate(truncate_at); - - if !peers.is_empty() { - async { Ok(zn::Response::Peers(peers)) }.boxed() - } else { - info!("ignoring `Peers` request from remote peer because our address book is empty"); - async { Ok(zn::Response::Nil) }.boxed() - } + // # Security + // + // We truncate the list to not reveal our entire peer set in one call. + // But we don't monitor repeated requests and the results are shuffled, + // a crawler could just send repeated queries and get the full list. + // + // # Correctness + // + // Briefly hold the address book threaded mutex while + // cloning the address book. Then sanitize after releasing + // the lock. + let peers = address_book.lock().unwrap().clone(); + + // Send a sanitized response + let mut peers = peers.sanitized(); + + // Truncate the list + let truncate_at = MAX_ADDRS_IN_MESSAGE + .min((peers.len() as f64 * FRAC_OF_AVAILABLE_ADDRESS).ceil() as usize); + peers.truncate(truncate_at); + + if !peers.is_empty() { + async { Ok(zn::Response::Peers(peers)) }.boxed() } else { - info!("ignoring `Peers` request from remote peer during network setup"); + info!("ignoring `Peers` request from remote peer because our address book is empty"); async { Ok(zn::Response::Nil) }.boxed() } } @@ -306,7 +333,7 @@ impl Service for Inbound { hashes .into_iter() .map(|hash| zs::Request::Block(hash.into())) - .map(|request| self.state.clone().oneshot(request)) + .map(|request| state.clone().oneshot(request)) .collect::>() .try_filter_map(|response| async move { Ok(match response { @@ -329,102 +356,60 @@ impl Service for Inbound { .boxed() } zn::Request::TransactionsById(transactions) => { - if let Setup::Initialized { mempool, .. } = &mut self.network_setup { - let request = mempool::Request::TransactionsById(transactions); - mempool.clone().oneshot(request).map_ok(|resp| match resp { - mempool::Response::Transactions(transactions) if transactions.is_empty() => zn::Response::Nil, - mempool::Response::Transactions(transactions) => zn::Response::Transactions(transactions), - _ => unreachable!("Mempool component should always respond to a `TransactionsById` request with a `Transactions` response"), - }) + let request = mempool::Request::TransactionsById(transactions); + mempool.clone().oneshot(request).map_ok(|resp| match resp { + mempool::Response::Transactions(transactions) if transactions.is_empty() => zn::Response::Nil, + mempool::Response::Transactions(transactions) => zn::Response::Transactions(transactions), + _ => unreachable!("Mempool component should always respond to a `TransactionsById` request with a `Transactions` response"), + }) .boxed() - } else { - info!( - transaction_hash_count = ?transactions.len(), - "ignoring `TransactionsById` request from remote peer during network setup" - ); - async { Ok(zn::Response::Nil) }.boxed() - } } zn::Request::FindBlocks { known_blocks, stop } => { let request = zs::Request::FindBlockHashes { known_blocks, stop }; - self.state.clone().oneshot(request).map_ok(|resp| match resp { - zs::Response::BlockHashes(hashes) if hashes.is_empty() => zn::Response::Nil, - zs::Response::BlockHashes(hashes) => zn::Response::BlockHashes(hashes), - _ => unreachable!("zebra-state should always respond to a `FindBlockHashes` request with a `BlockHashes` response"), - }) - .boxed() + state.clone().oneshot(request).map_ok(|resp| match resp { + zs::Response::BlockHashes(hashes) if hashes.is_empty() => zn::Response::Nil, + zs::Response::BlockHashes(hashes) => zn::Response::BlockHashes(hashes), + _ => unreachable!("zebra-state should always respond to a `FindBlockHashes` request with a `BlockHashes` response"), + }) + .boxed() } zn::Request::FindHeaders { known_blocks, stop } => { let request = zs::Request::FindBlockHeaders { known_blocks, stop }; - self.state.clone().oneshot(request).map_ok(|resp| match resp { - zs::Response::BlockHeaders(headers) if headers.is_empty() => zn::Response::Nil, - zs::Response::BlockHeaders(headers) => zn::Response::BlockHeaders(headers), - _ => unreachable!("zebra-state should always respond to a `FindBlockHeaders` request with a `BlockHeaders` response"), - }) - .boxed() + state.clone().oneshot(request).map_ok(|resp| match resp { + zs::Response::BlockHeaders(headers) if headers.is_empty() => zn::Response::Nil, + zs::Response::BlockHeaders(headers) => zn::Response::BlockHeaders(headers), + _ => unreachable!("zebra-state should always respond to a `FindBlockHeaders` request with a `BlockHeaders` response"), + }) + .boxed() } zn::Request::PushTransaction(transaction) => { - if let Setup::Initialized { mempool, .. } = &mut self.network_setup { - mempool - .clone() - .oneshot(mempool::Request::Queue(vec![transaction.into()])) - // The response just indicates if processing was queued or not; ignore it - .map_ok(|_resp| zn::Response::Nil) - .boxed() - } else { - info!( - ?transaction.id, - "ignoring `PushTransaction` request from remote peer during network setup" - ); - async { Ok(zn::Response::Nil) }.boxed() - } + mempool + .clone() + .oneshot(mempool::Request::Queue(vec![transaction.into()])) + // The response just indicates if processing was queued or not; ignore it + .map_ok(|_resp| zn::Response::Nil) + .boxed() } zn::Request::AdvertiseTransactionIds(transactions) => { - if let Setup::Initialized { mempool, .. } = &mut self.network_setup { - let transactions = transactions.into_iter().map(Into::into).collect(); - mempool - .clone() - .oneshot(mempool::Request::Queue(transactions)) - // The response just indicates if processing was queued or not; ignore it - .map_ok(|_resp| zn::Response::Nil) - .boxed() - } else { - // Peers send a lot of these when we first connect to them. - debug!( - "ignoring `AdvertiseTransactionIds` request from remote peer during network setup" - ); - async { Ok(zn::Response::Nil) }.boxed() - } + let transactions = transactions.into_iter().map(Into::into).collect(); + mempool + .clone() + .oneshot(mempool::Request::Queue(transactions)) + // The response just indicates if processing was queued or not; ignore it + .map_ok(|_resp| zn::Response::Nil) + .boxed() } zn::Request::AdvertiseBlock(hash) => { - if let Setup::Initialized { - block_downloads, .. - } = &mut self.network_setup - { - block_downloads.download_and_verify(hash); - } else { - // Peers send a lot of these when we first connect to them. - debug!( - ?hash, - "ignoring `AdvertiseBlock` request from remote peer during network setup" - ); - } + block_downloads.download_and_verify(hash); async { Ok(zn::Response::Nil) }.boxed() } zn::Request::MempoolTransactionIds => { - if let Setup::Initialized { mempool, .. } = &mut self.network_setup { - mempool.clone().oneshot(mempool::Request::TransactionIds).map_ok(|resp| match resp { - mempool::Response::TransactionIds(transaction_ids) if transaction_ids.is_empty() => zn::Response::Nil, - mempool::Response::TransactionIds(transaction_ids) => zn::Response::TransactionIds(transaction_ids.into_iter().collect()), - _ => unreachable!("Mempool component should always respond to a `TransactionIds` request with a `TransactionIds` response"), - }) + mempool.clone().oneshot(mempool::Request::TransactionIds).map_ok(|resp| match resp { + mempool::Response::TransactionIds(transaction_ids) if transaction_ids.is_empty() => zn::Response::Nil, + mempool::Response::TransactionIds(transaction_ids) => zn::Response::TransactionIds(transaction_ids.into_iter().collect()), + _ => unreachable!("Mempool component should always respond to a `TransactionIds` request with a `TransactionIds` response"), + }) .boxed() - } else { - info!( - "ignoring `MempoolTransactionIds` request from remote peer during network setup" - ); - async { Ok(zn::Response::Nil) }.boxed() - } } zn::Request::Ping(_) => { unreachable!("ping requests are handled internally"); diff --git a/zebrad/src/components/inbound/tests.rs b/zebrad/src/components/inbound/tests.rs index c7851395b08..ef5920d8213 100644 --- a/zebrad/src/components/inbound/tests.rs +++ b/zebrad/src/components/inbound/tests.rs @@ -24,6 +24,7 @@ use zebra_test::mock_service::{MockService, PanicAssertion}; use crate::{ components::{ + inbound::InboundSetupData, mempool::{self, gossip_mempool_transaction_id, unmined_transactions_in_blocks, Mempool}, sync::{self, BlockGossipError, SyncStatus}, }, @@ -661,15 +662,18 @@ async fn setup( let inbound_service = ServiceBuilder::new() .load_shed() - .service(super::Inbound::new( - setup_rx, - state_service.clone(), - block_verifier.clone(), - )); + .service(super::Inbound::new(setup_rx)); let inbound_service = BoxService::new(inbound_service); let inbound_service = ServiceBuilder::new().buffer(1).service(inbound_service); - let r = setup_tx.send((buffered_peer_set, address_book, mempool_service.clone())); + let setup_data = InboundSetupData { + address_book, + block_download_peer_set: buffered_peer_set, + block_verifier, + mempool: mempool_service.clone(), + state: state_service.clone(), + }; + let r = setup_tx.send(setup_data); // We can't expect or unwrap because the returned Result does not implement Debug assert!(r.is_ok(), "unexpected setup channel send failure"); diff --git a/zebrad/src/components/tokio.rs b/zebrad/src/components/tokio.rs index 9bb935a1355..afa0d20c05c 100644 --- a/zebrad/src/components/tokio.rs +++ b/zebrad/src/components/tokio.rs @@ -46,16 +46,22 @@ pub(crate) trait RuntimeRun { impl RuntimeRun for Runtime { fn run(&mut self, fut: impl Future>) { let result = self.block_on(async move { - // If the run task and shutdown are both ready, select! chooses - // one of them at random. + // Always poll the shutdown future first. + // + // Otherwise, a busy Zebra instance could starve the shutdown future, + // and delay shutting down. tokio::select! { - result = fut => result, + biased; _ = shutdown() => Ok(()), + result = fut => result, } }); match result { - Ok(()) => {} + Ok(()) => { + // Don't wait for the runtime to shut down all the tasks. + app_writer().shutdown(Shutdown::Graceful); + } Err(e) => { eprintln!("Error: {:?}", e); app_writer().shutdown(Shutdown::Forced);