diff --git a/executor/src/errors.rs b/executor/src/errors.rs index 1feb670b4..3630ceb15 100644 --- a/executor/src/errors.rs +++ b/executor/src/errors.rs @@ -22,10 +22,31 @@ macro_rules! ensure { }; } +#[macro_export] +macro_rules! try_fut_and_permit { + ($fut:expr, $sender:expr) => { + futures::future::TryFutureExt::unwrap_or_else( + futures::future::try_join( + $fut, + futures::TryFutureExt::map_err($sender.reserve(), |_e| { + SubscriberError::ClosedChannel(stringify!(sender).to_owned()) + }), + ), + |e| { + tracing::error!("{e}"); + panic!("I/O failure, killing the node."); + }, + ) + }; +} + pub type SubscriberResult = Result; #[derive(Debug, Error, Clone)] pub enum SubscriberError { + #[error("channel {0} closed unexpectedly")] + ClosedChannel(String), + #[error("Storage failure: {0}")] StoreError(#[from] StoreError), diff --git a/executor/src/lib.rs b/executor/src/lib.rs index b6aa7a354..0edba58ab 100644 --- a/executor/src/lib.rs +++ b/executor/src/lib.rs @@ -112,6 +112,9 @@ impl Executor { SubscriberError::OnlyOneConsensusClientPermitted ); + // We expect this will ultimately be needed in the `Core` as well as the `Subscriber`. + let arc_metrics = Arc::new(metrics); + // Spawn the subscriber. let subscriber_handle = Subscriber::spawn( store.clone(), @@ -119,6 +122,7 @@ impl Executor { tx_reconfigure.subscribe(), rx_consensus, tx_executor, + arc_metrics, ); // Spawn the executor's core. diff --git a/executor/src/metrics.rs b/executor/src/metrics.rs index 188b88959..812c3c890 100644 --- a/executor/src/metrics.rs +++ b/executor/src/metrics.rs @@ -6,6 +6,8 @@ use prometheus::{default_registry, register_int_gauge_with_registry, IntGauge, R pub struct ExecutorMetrics { /// occupancy of the channel from the `Subscriber` to `Core` pub tx_executor: IntGauge, + /// Number of elements in the waiting (ready-to-deliver) list of subscriber + pub waiting_elements_subscriber: IntGauge, } impl ExecutorMetrics { @@ -17,6 +19,12 @@ impl ExecutorMetrics { registry ) .unwrap(), + waiting_elements_subscriber: register_int_gauge_with_registry!( + "waiting_elements_subscriber", + "Number of waiting elements in the subscriber", + registry + ) + .unwrap(), } } } diff --git a/executor/src/subscriber.rs b/executor/src/subscriber.rs index a97524ab2..e4d5ce5b3 100644 --- a/executor/src/subscriber.rs +++ b/executor/src/subscriber.rs @@ -1,19 +1,24 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::{errors::SubscriberResult, SubscriberError, SubscriberError::PayloadRetrieveError}; +use crate::{ + errors::SubscriberResult, metrics::ExecutorMetrics, try_fut_and_permit, SubscriberError, + SubscriberError::PayloadRetrieveError, +}; use backoff::{Error, ExponentialBackoff}; use consensus::ConsensusOutput; use crypto::Hash; -use futures::stream::{FuturesOrdered, StreamExt}; use primary::BlockCommand; -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use store::Store; use tokio::{ sync::{oneshot, watch}, task::JoinHandle, }; -use tracing::{debug, error}; -use types::{metered_channel, Batch, BatchDigest, ReconfigureNotification}; +use tracing::error; +use types::{ + bounded_future_queue::BoundedFuturesOrdered, metered_channel, Batch, BatchDigest, + ReconfigureNotification, +}; #[cfg(test)] #[path = "tests/subscriber_tests.rs"] @@ -38,9 +43,14 @@ pub struct Subscriber { // When asking for a certificate's payload we want to retry until we succeed, unless // some irrecoverable error occurs. For that reason a backoff policy is defined get_block_retry_policy: ExponentialBackoff, + /// The metrics handler + metrics: Arc, } impl Subscriber { + /// Returns the max amount of pending consensus messages we should expect. + const MAX_PENDING_CONSENSUS_MESSAGES: usize = 2000; + /// Spawn a new subscriber in a new tokio task. #[must_use] pub fn spawn( @@ -49,6 +59,7 @@ impl Subscriber { rx_reconfigure: watch::Receiver, rx_consensus: metered_channel::Receiver, tx_executor: metered_channel::Sender, + metrics: Arc, ) -> JoinHandle<()> { let get_block_retry_policy = ExponentialBackoff { initial_interval: Duration::from_millis(500), @@ -67,6 +78,7 @@ impl Subscriber { tx_executor, tx_get_block_commands, get_block_retry_policy, + metrics, } .run() .await @@ -82,13 +94,14 @@ impl Subscriber { // mater if we somehow managed to fetch the batches from a later // certificate. Unless the earlier certificate's payload has been // fetched, no later certificate will be delivered. - let mut waiting = FuturesOrdered::new(); + let mut waiting = + BoundedFuturesOrdered::with_capacity(Self::MAX_PENDING_CONSENSUS_MESSAGES); // Listen to sequenced consensus message and process them. loop { tokio::select! { // Receive the ordered sequence of consensus messages from a consensus node. - Some(message) = self.rx_consensus.recv() => { + Some(message) = self.rx_consensus.recv(), if waiting.available_permits() > 0 => { // Fetch the certificate's payload from the workers. This is done via the // block_waiter component. If the batches are not available in the workers then // block_waiter will do its best to sync from the other peers. Once all batches @@ -98,15 +111,12 @@ impl Subscriber { self.store.clone(), self.tx_get_block_commands.clone(), message); - waiting.push_back(future); + waiting.push(future).await; }, // Receive here consensus messages for which we have downloaded all transactions data. - Some(message) = waiting.next() => { - if self.tx_executor.send(message?).await.is_err() { - debug!("Executor core is shutting down"); - return Ok(()); - } + (Some(message), permit) = try_fut_and_permit!(waiting.try_next(), self.tx_executor) => { + permit.send(message) }, // Check whether the committee changed. @@ -118,6 +128,10 @@ impl Subscriber { } } } + + self.metrics + .waiting_elements_subscriber + .set(waiting.len() as i64); } } diff --git a/executor/src/tests/subscriber_tests.rs b/executor/src/tests/subscriber_tests.rs index f72bc7d5b..aea45e1e4 100644 --- a/executor/src/tests/subscriber_tests.rs +++ b/executor/src/tests/subscriber_tests.rs @@ -3,6 +3,7 @@ use super::*; use crate::fixtures::{test_store, test_u64_certificates}; use primary::GetBlockResponse; +use prometheus::Registry; use test_utils::{committee, test_channel}; use types::{ BatchMessage, BlockError, BlockErrorKind, BlockResult, CertificateDigest, SequenceNumber, @@ -24,12 +25,14 @@ async fn spawn_subscriber( // Spawn a test subscriber. let store = test_store(); + let executor_metrics = ExecutorMetrics::new(&Registry::new()); let subscriber_handle = Subscriber::spawn( store.clone(), tx_get_block_commands, rx_reconfigure, rx_sequence, tx_executor, + Arc::new(executor_metrics), ); (store, tx_reconfigure, subscriber_handle) diff --git a/primary/src/certificate_waiter.rs b/primary/src/certificate_waiter.rs index 15ba88fe3..aa53819bf 100644 --- a/primary/src/certificate_waiter.rs +++ b/primary/src/certificate_waiter.rs @@ -4,10 +4,7 @@ use crate::metrics::PrimaryMetrics; use config::Committee; use dashmap::DashMap; -use futures::{ - future::try_join_all, - stream::{futures_unordered::FuturesUnordered, StreamExt as _}, -}; +use futures::future::try_join_all; use once_cell::sync::OnceCell; use std::sync::Arc; use store::Store; @@ -16,11 +13,13 @@ use tokio::{ task::JoinHandle, time::{sleep, Duration, Instant}, }; -use tracing::{error, info}; +use tracing::info; use types::{ + bounded_future_queue::BoundedFuturesUnordered, error::{DagError, DagResult}, metered_channel::{Receiver, Sender}, - Certificate, CertificateDigest, HeaderDigest, ReconfigureNotification, Round, + try_fut_and_permit, Certificate, CertificateDigest, HeaderDigest, ReconfigureNotification, + Round, }; #[cfg(test)] @@ -58,6 +57,12 @@ pub struct CertificateWaiter { } impl CertificateWaiter { + /// Returns the max amount of pending certificates we should expect. In the worst case of causal completion, + /// this can be `self.gc_depth` x `self.committee.len()` + pub fn max_pending_certificates(&self) -> usize { + self.gc_depth as usize * self.committee.size() * 2 + } + #[must_use] pub fn spawn( committee: Committee, @@ -106,7 +111,7 @@ impl CertificateWaiter { } async fn run(&mut self) { - let mut waiting = FuturesUnordered::new(); + let mut waiting = BoundedFuturesUnordered::with_capacity(self.max_pending_certificates()); let timer = sleep(Duration::from_millis(GC_RESOLUTION)); tokio::pin!(timer); let mut attempt_garbage_collection; @@ -117,7 +122,8 @@ impl CertificateWaiter { attempt_garbage_collection = false; tokio::select! { - Some(certificate) = self.rx_synchronizer.recv() => { + // We only accept new elements if we have "room" for them + Some(certificate) = self.rx_synchronizer.recv(), if waiting.available_permits() > 0 => { if certificate.epoch() < self.committee.epoch() { continue; } @@ -140,20 +146,14 @@ impl CertificateWaiter { }; self.pending.insert(header_id, (certificate.round(), once_cancel)); let fut = Self::waiter(wait_for, &self.store, certificate, rx_cancel); - waiting.push(fut); + waiting.push(fut).await; } - Some(result) = waiting.next() => match result { - Ok(certificate) => { + // we poll the availability of a slot to send the result to the core simultaneously + (Some(certificate), permit) = try_fut_and_permit!(waiting.try_next(), self.tx_core) => { // TODO [issue #115]: To ensure crash-recovery of consensus, it is not enough to send every // certificate for which their ancestors are in the storage. After recovery, we may also // need to send a all parents certificates with rounds greater then `last_committed`. - - self.tx_core.send(certificate).await.expect("Failed to send certificate"); - }, - Err(e) => { - error!("{e}"); - panic!("Storage failure: killing node."); - } + permit.send(certificate); }, result = self.rx_reconfigure.changed() => { result.expect("Committee channel dropped"); @@ -211,14 +211,18 @@ impl CertificateWaiter { } } - self.update_metrics(); + self.update_metrics(waiting.len()); } } - fn update_metrics(&self) { + fn update_metrics(&self, waiting_len: usize) { self.metrics .pending_elements_certificate_waiter .with_label_values(&[&self.committee.epoch.to_string()]) .set(self.pending.len() as i64); + self.metrics + .waiting_elements_certificate_waiter + .with_label_values(&[&self.committee.epoch.to_string()]) + .set(waiting_len as i64); } } diff --git a/primary/src/header_waiter.rs b/primary/src/header_waiter.rs index 2c1a4742d..526d7a0b9 100644 --- a/primary/src/header_waiter.rs +++ b/primary/src/header_waiter.rs @@ -7,10 +7,7 @@ use crate::{ }; use config::{Committee, WorkerId}; use crypto::PublicKey; -use futures::{ - future::{try_join_all, BoxFuture}, - stream::{futures_unordered::FuturesUnordered, StreamExt as _}, -}; +use futures::future::{try_join_all, BoxFuture}; use network::{LuckyNetwork, PrimaryNetwork, PrimaryToWorkerNetwork, UnreliableNetwork}; use serde::{de::DeserializeOwned, Serialize}; use std::{ @@ -24,12 +21,13 @@ use tokio::{ task::JoinHandle, time::{sleep, Duration, Instant}, }; -use tracing::{debug, error, info}; +use tracing::{debug, info}; use types::{ + bounded_future_queue::BoundedFuturesUnordered, error::{DagError, DagResult}, metered_channel::{Receiver, Sender}, - BatchDigest, Certificate, CertificateDigest, Header, HeaderDigest, ReconfigureNotification, - Round, + try_fut_and_permit, BatchDigest, Certificate, CertificateDigest, Header, HeaderDigest, + ReconfigureNotification, Round, }; #[cfg(test)] @@ -90,6 +88,12 @@ pub struct HeaderWaiter { } impl HeaderWaiter { + /// Returns the max amount of pending certificates x pending parents messages we should expect. In the worst case of causal completion, + /// this can be `self.gc_depth` x `self.committee.len()` for each + pub fn max_pending_header_waiter_requests(&self) -> usize { + self.gc_depth as usize * self.committee.size() * 4 + } + #[must_use] pub fn spawn( name: PublicKey, @@ -132,20 +136,6 @@ impl HeaderWaiter { }) } - /// Update the committee and cleanup internal state. - fn change_epoch(&mut self, committee: Committee) { - self.primary_network - .cleanup(self.committee.network_diff(&committee)); - self.worker_network - .cleanup(self.committee.network_diff(&committee)); - - self.committee = committee; - - self.pending.clear(); - self.batch_requests.clear(); - self.parent_requests.clear(); - } - /// Helper function. It waits for particular data to become available in the storage /// and then delivers the specified header. async fn waiter( @@ -169,7 +159,8 @@ impl HeaderWaiter { /// Main loop listening to the `Synchronizer` messages. async fn run(&mut self) { - let mut waiting: FuturesUnordered> = FuturesUnordered::new(); + let mut waiting: BoundedFuturesUnordered> = + BoundedFuturesUnordered::with_capacity(self.max_pending_header_waiter_requests()); let timer = sleep(Duration::from_millis(TIMER_RESOLUTION)); tokio::pin!(timer); @@ -182,7 +173,7 @@ impl HeaderWaiter { let mut attempt_garbage_collection = false; tokio::select! { - Some(message) = self.rx_synchronizer.recv() => { + Some(message) = self.rx_synchronizer.recv(), if waiting.available_permits() > 0 => { match message { WaiterMessage::SyncBatches(missing, header) => { debug!("Synching the payload of {header}"); @@ -204,7 +195,7 @@ impl HeaderWaiter { self.pending.insert(header_id, (round, tx_cancel)); let fut = Self::waiter(wait_for, self.payload_store.clone(), header, rx_cancel); // pointer-size allocation, bounded by the # of blocks (may eventually go away, see rust RFC #1909) - waiting.push(Box::pin(fut)); + waiting.push(Box::pin(fut)).await; // Ensure we didn't already send a sync request for these parents. let mut requires_sync = HashMap::new(); @@ -244,7 +235,7 @@ impl HeaderWaiter { self.pending.insert(header_id, (round, tx_cancel)); let fut = Self::waiter(wait_for, self.certificate_store.clone(), header, rx_cancel); // pointer-size allocation, bounded by the # of blocks (may eventually go away, see rust RFC #1909) - waiting.push(Box::pin(fut)); + waiting.push(Box::pin(fut)).await; // Ensure we didn't already sent a sync request for these parents. // Optimistically send the sync request to the node that created the certificate. @@ -271,9 +262,9 @@ impl HeaderWaiter { } } }, - - Some(result) = waiting.next() => match result { - Ok(Some(header)) => { + // we poll the availability of a slot to send the result to the core simultaneously + (Some(result), permit) = try_fut_and_permit!(waiting.try_next(), self.tx_core) => match result { + Some(header) => { let _ = self.pending.remove(&header.id); for x in header.payload.keys() { let _ = self.batch_requests.remove(x); @@ -281,17 +272,11 @@ impl HeaderWaiter { for x in &header.parents { let _ = self.parent_requests.remove(x); } - if self.tx_core.send(header).await.is_err() { - debug!("{}", DagError::ShuttingDown) - } + permit.send(header); }, - Ok(None) => { + None => { // This request has been canceled. }, - Err(e) => { - error!("{e}"); - panic!("Storage failure: killing node."); - } }, () = &mut timer => { @@ -332,7 +317,15 @@ impl HeaderWaiter { let message = self.rx_reconfigure.borrow().clone(); match message { ReconfigureNotification::NewEpoch(new_committee) => { - self.change_epoch(new_committee); + // Update the committee and cleanup internal state. + self.primary_network.cleanup(self.committee.network_diff(&new_committee)); + self.worker_network.cleanup(self.committee.network_diff(&new_committee)); + + self.committee = new_committee; + + self.pending.clear(); + self.batch_requests.clear(); + self.parent_requests.clear(); }, ReconfigureNotification::UpdateCommittee(new_committee) => { self.primary_network.cleanup(self.committee.network_diff(&new_committee)); @@ -394,6 +387,11 @@ impl HeaderWaiter { .parent_requests_header_waiter .with_label_values(&[&self.committee.epoch.to_string()]) .set(self.parent_requests.len() as i64); + + self.metrics + .waiting_elements_header_waiter + .with_label_values(&[&self.committee.epoch.to_string()]) + .set(waiting.len() as i64); } } } diff --git a/primary/src/metrics.rs b/primary/src/metrics.rs index c24c6acad..8f89e62eb 100644 --- a/primary/src/metrics.rs +++ b/primary/src/metrics.rs @@ -296,8 +296,12 @@ pub struct PrimaryMetrics { pub pending_elements_header_waiter: IntGaugeVec, /// Number of parent requests list of header_waiter pub parent_requests_header_waiter: IntGaugeVec, + /// Number of elements in the waiting (ready-to-deliver) list of header_waiter + pub waiting_elements_header_waiter: IntGaugeVec, /// Number of elements in pending list of certificate_waiter pub pending_elements_certificate_waiter: IntGaugeVec, + /// Number of elements in the waiting (ready-to-deliver) list of certificate_waiter + pub waiting_elements_certificate_waiter: IntGaugeVec, } impl PrimaryMetrics { @@ -394,6 +398,13 @@ impl PrimaryMetrics { registry ) .unwrap(), + waiting_elements_header_waiter: register_int_gauge_vec_with_registry!( + "waiting_elements_header_waiter", + "Number of waiting elements in header waiter", + &["epoch"], + registry + ) + .unwrap(), pending_elements_certificate_waiter: register_int_gauge_vec_with_registry!( "pending_elements_certificate_waiter", "Number of pending elements in certificate waiter", @@ -401,6 +412,13 @@ impl PrimaryMetrics { registry ) .unwrap(), + waiting_elements_certificate_waiter: register_int_gauge_vec_with_registry!( + "waiting_elements_certificate_waiter", + "Number of waiting elements in certificate waiter", + &["epoch"], + registry + ) + .unwrap(), } } } diff --git a/types/src/bounded_future_queue.rs b/types/src/bounded_future_queue.rs new file mode 100644 index 000000000..2c65dc486 --- /dev/null +++ b/types/src/bounded_future_queue.rs @@ -0,0 +1,280 @@ +// Copyright (c) 2022, Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 +use futures::{ + stream::{FuturesOrdered, FuturesUnordered}, + Future, TryFutureExt, TryStreamExt, +}; +use tokio::sync::{AcquireError, Semaphore, SemaphorePermit}; + +pub struct UnorderedPermit<'a, T: Future> { + permit: SemaphorePermit<'a>, + futures: &'a BoundedFuturesUnordered, +} + +/// An async-friendly FuturesUnordered of bounded size. In contrast to a bounded channel, +/// the queue makes it possible to modify and remove entries in it. In contrast to a FuturesUnordered, +/// the queue makes it possible to enforce a bound on the number of items in the queue. +pub struct BoundedFuturesUnordered { + /// The maximum number of entries allowed in the queue + capacity: usize, + /// The actual items in the queue. + queue: FuturesUnordered, + /// This semaphore has as many permits as there are empty spots in the + /// `queue`, i.e., `capacity - queue.len()` many permits + push_semaphore: Semaphore, +} + +impl std::fmt::Debug for BoundedFuturesUnordered { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "BoundedFuturesUnordered[cap: {}, free: {}]", + self.capacity, + self.push_semaphore.available_permits(), + ) + } +} + +unsafe impl Sync for BoundedFuturesUnordered {} +unsafe impl Send for BoundedFuturesUnordered {} + +// We expect to grow this facade over time +impl BoundedFuturesUnordered { + pub fn with_capacity(capacity: usize) -> Self { + Self { + capacity, + queue: FuturesUnordered::new(), + push_semaphore: Semaphore::new(capacity), + } + } + + /// Push an item into the queue. If the queue is currently full this method + /// blocks until an item is available + pub async fn push(&self, item: T) { + let permit = self.push_semaphore.acquire().await.unwrap(); + self.queue.push(item); + permit.forget(); + } + + pub fn push_with_permit(&self, item: T, _permit: SemaphorePermit<'_>) { + self.queue.push(item); + } + + /// Waits for queue capacity. Once capacity to push one future is available, it is reserved for the caller. + /// + /// WARNING: the order of pushing to the queue is not guaranteed. It must be enforced by the caller. + pub async fn reserve(&self) -> Result, AcquireError> { + let permit = self.push_semaphore.acquire().await?; + Ok(UnorderedPermit { + permit, + futures: self, + }) + } + + /// Report the available permits + pub fn available_permits(&self) -> usize { + self.push_semaphore.available_permits() + } + + /// Report the length of the queue + pub fn len(&self) -> usize { + self.queue.len() + } + + /// Report if the queue is empty + pub fn is_empty(&self) -> bool { + self.queue.is_empty() + } +} + +impl>> BoundedFuturesUnordered { + /// Creates a future that attempts to resolve the next item in the stream. + /// If an error is encountered before the next item, the error is returned instead. + pub fn try_next(&mut self) -> impl Future, V>> + '_ { + self.queue.try_next().inspect_ok(|val| { + if val.is_some() { + self.push_semaphore.add_permits(1) + } + }) + } +} + +impl<'a, T: Future> UnorderedPermit<'a, T> { + /// Push an item using the reserved permit + pub fn push(self, item: T) { + self.futures.push_with_permit(item, self.permit); + } +} + +/// An async-friendly FuturesUnordered of bounded size. In contrast to a bounded channel, +/// the queue makes it possible to modify and remove entries in it. In contrast to a FuturesUnordered, +/// the queue makes it possible to enforce a bound on the number of items in the queue. +pub struct BoundedFuturesOrdered { + /// The maximum number of entries allowed in the queue + capacity: usize, + /// The actual items in the queue. New items are appended at the back. + queue: FuturesOrdered, + /// This semaphore has as many permits as there are empty spots in the + /// `queue`, i.e., `capacity - queue.len()` many permits + push_semaphore: Semaphore, +} + +impl std::fmt::Debug for BoundedFuturesOrdered { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "BoundedFuturesUnordered[cap: {}, queue: {}, push: {}]", + self.capacity, + self.queue.len(), + self.push_semaphore.available_permits(), + ) + } +} + +unsafe impl Sync for BoundedFuturesOrdered {} +unsafe impl Send for BoundedFuturesOrdered {} + +// We expect to grow this facade over time +impl BoundedFuturesOrdered { + pub fn with_capacity(capacity: usize) -> Self { + Self { + capacity, + queue: FuturesOrdered::new(), + push_semaphore: Semaphore::new(capacity), + } + } + + /// Push an item into the queue. If the queue is currently full this method + /// blocks until an item is available + pub async fn push(&mut self, item: T) { + let permit = self.push_semaphore.acquire().await.unwrap(); + self.queue.push_back(item); + permit.forget(); + } + + /// Report the available permits + pub fn available_permits(&self) -> usize { + self.push_semaphore.available_permits() + } + + /// Report the length of the queue + pub fn len(&self) -> usize { + self.queue.len() + } + + /// Report if the queue is empty + pub fn is_empty(&self) -> bool { + self.queue.is_empty() + } +} + +impl>> BoundedFuturesOrdered { + /// Creates a future that attempts to resolve the next item in the stream. + /// If an error is encountered before the next item, the error is returned instead. + pub fn try_next(&mut self) -> impl Future, V>> + '_ { + self.queue.try_next().inspect_ok(|val| { + if val.is_some() { + self.push_semaphore.add_permits(1) + } + }) + } +} + +#[cfg(test)] +mod tests { + + use super::{BoundedFuturesOrdered, BoundedFuturesUnordered}; + use futures::{future, FutureExt}; + + #[tokio::test] + async fn test_capacity_up() { + let cap = 10; + let futs = BoundedFuturesUnordered::with_capacity(cap); + for i in 0..cap { + futs.push(future::ready(i)).await; + assert_eq!(futs.push_semaphore.available_permits(), cap - i - 1); + } + assert!(futs.push(future::ready(10)).now_or_never().is_none()); + } + + #[tokio::test] + async fn test_reserve_up() { + let cap = 10; + let futs = BoundedFuturesUnordered::with_capacity(cap); + let mut permits = Vec::new(); + // this forces the type of the future + futs.push(future::ready(0)).await; + for i in 1..cap { + let permit = futs.reserve().await.unwrap(); + permits.push(permit); + assert_eq!(futs.push_semaphore.available_permits(), cap - i - 1); + } + assert_eq!(futs.push_semaphore.available_permits(), 0); + assert!(futs.push(future::ready(1)).now_or_never().is_none()); + drop(permits); + assert_eq!(futs.push_semaphore.available_permits(), 9); + } + + #[tokio::test] + async fn test_reserve_down() { + let cap = 10; + let futs = BoundedFuturesUnordered::with_capacity(cap); + let mut permits = Vec::new(); + // this forces the type of the future + futs.push(future::ready(0)).await; + for i in 1..cap { + let permit = futs.reserve().await.unwrap(); + permits.push(permit); + assert_eq!(futs.push_semaphore.available_permits(), cap - i - 1); + } + assert_eq!(futs.push_semaphore.available_permits(), 0); + assert!(futs.push(future::ready(1)).now_or_never().is_none()); + for (i, permit) in permits.into_iter().enumerate() { + permit.push(future::ready(i)); + } + assert_eq!(futs.push_semaphore.available_permits(), 9); + } + + #[tokio::test] + async fn test_capacity_down() { + let cap = 10; + let mut futs = BoundedFuturesUnordered::with_capacity(cap); + + for i in 0..10 { + futs.push(future::ready(Result::::Ok(i))).await + } + for i in 0..10 { + assert!(futs.try_next().await.unwrap().is_some()); + assert_eq!(futs.push_semaphore.available_permits(), i + 1) + } + assert!(futs.try_next().await.unwrap().is_none()); + assert_eq!(futs.push_semaphore.available_permits(), cap) + } + + #[tokio::test] + async fn test_capacity_up_ordered() { + let cap = 10; + let mut futs = BoundedFuturesOrdered::with_capacity(cap); + for i in 0..cap { + futs.push(future::ready(i)).await; + assert_eq!(futs.push_semaphore.available_permits(), cap - i - 1); + } + assert!(futs.push(future::ready(10)).now_or_never().is_none()); + } + + #[tokio::test] + async fn test_capacity_down_ordered() { + let cap = 10; + let mut futs = BoundedFuturesOrdered::with_capacity(cap); + + for i in 0..10 { + futs.push(future::ready(Result::::Ok(i))).await + } + for i in 0..10 { + assert!(futs.try_next().await.unwrap().is_some()); + assert_eq!(futs.push_semaphore.available_permits(), i + 1) + } + assert!(futs.try_next().await.unwrap().is_none()); + assert_eq!(futs.push_semaphore.available_permits(), cap) + } +} diff --git a/types/src/error.rs b/types/src/error.rs index 6d564a839..3ccb21061 100644 --- a/types/src/error.rs +++ b/types/src/error.rs @@ -23,10 +23,31 @@ macro_rules! ensure { }; } +#[macro_export] +macro_rules! try_fut_and_permit { + ($fut:expr, $sender:expr) => { + futures::future::TryFutureExt::unwrap_or_else( + futures::future::try_join( + $fut, + futures::TryFutureExt::map_err($sender.reserve(), |_e| { + DagError::ClosedChannel(stringify!(sender).to_owned()) + }), + ), + |e| { + tracing::error!("{e}"); + panic!("I/O failure, killing the node."); + }, + ) + }; +} + pub type DagResult = Result; #[derive(Debug, Error)] pub enum DagError { + #[error("Channel {0} has closed unexpectedly")] + ClosedChannel(String), + #[error("Invalid signature")] InvalidSignature(#[from] signature::Error), diff --git a/types/src/lib.rs b/types/src/lib.rs index 5a2f35739..7706f4fe6 100644 --- a/types/src/lib.rs +++ b/types/src/lib.rs @@ -16,4 +16,5 @@ pub use proto::*; mod worker; pub use worker::*; +pub mod bounded_future_queue; pub mod metered_channel;