From a48c7ee8096b246663aed413c1183397717c3a7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Garillot?= Date: Sat, 6 Aug 2022 13:59:14 -0400 Subject: [PATCH 1/2] feat: don't block on sending on a full channel in primary We block on sending to a channel that may be full. This rather changes the behavior to poll the channel before blocking indefinitely on it. Percolates backpressure one level up from the output channels of: - header_waiter, - certificate_waiter, --- primary/src/certificate_waiter.rs | 21 +++++----------- primary/src/header_waiter.rs | 41 ++++++++++++------------------- types/src/error.rs | 21 ++++++++++++++++ 3 files changed, 43 insertions(+), 40 deletions(-) diff --git a/primary/src/certificate_waiter.rs b/primary/src/certificate_waiter.rs index 15ba88fe3..552c62b83 100644 --- a/primary/src/certificate_waiter.rs +++ b/primary/src/certificate_waiter.rs @@ -4,10 +4,7 @@ use crate::metrics::PrimaryMetrics; use config::Committee; use dashmap::DashMap; -use futures::{ - future::try_join_all, - stream::{futures_unordered::FuturesUnordered, StreamExt as _}, -}; +use futures::{future::try_join_all, stream::futures_unordered::FuturesUnordered, TryStreamExt}; use once_cell::sync::OnceCell; use std::sync::Arc; use store::Store; @@ -16,11 +13,12 @@ use tokio::{ task::JoinHandle, time::{sleep, Duration, Instant}, }; -use tracing::{error, info}; +use tracing::info; use types::{ error::{DagError, DagResult}, metered_channel::{Receiver, Sender}, - Certificate, CertificateDigest, HeaderDigest, ReconfigureNotification, Round, + try_fut_and_permit, Certificate, CertificateDigest, HeaderDigest, ReconfigureNotification, + Round, }; #[cfg(test)] @@ -142,18 +140,11 @@ impl CertificateWaiter { let fut = Self::waiter(wait_for, &self.store, certificate, rx_cancel); waiting.push(fut); } - Some(result) = waiting.next() => match result { - Ok(certificate) => { + (Some(certificate), permit) = try_fut_and_permit!(waiting.try_next(), self.tx_core) => { // TODO [issue #115]: To ensure crash-recovery of consensus, it is not enough to send every // certificate for which their ancestors are in the storage. After recovery, we may also // need to send a all parents certificates with rounds greater then `last_committed`. - - self.tx_core.send(certificate).await.expect("Failed to send certificate"); - }, - Err(e) => { - error!("{e}"); - panic!("Storage failure: killing node."); - } + permit.send(certificate); }, result = self.rx_reconfigure.changed() => { result.expect("Committee channel dropped"); diff --git a/primary/src/header_waiter.rs b/primary/src/header_waiter.rs index 7c2019ee8..d67b60ef2 100644 --- a/primary/src/header_waiter.rs +++ b/primary/src/header_waiter.rs @@ -9,7 +9,8 @@ use config::{Committee, WorkerId}; use crypto::PublicKey; use futures::{ future::{try_join_all, BoxFuture}, - stream::{futures_unordered::FuturesUnordered, StreamExt as _}, + stream::futures_unordered::FuturesUnordered, + TryStreamExt, }; use network::{LuckyNetwork, PrimaryNetwork, PrimaryToWorkerNetwork, UnreliableNetwork}; use serde::{de::DeserializeOwned, Serialize}; @@ -24,12 +25,12 @@ use tokio::{ task::JoinHandle, time::{sleep, Duration, Instant}, }; -use tracing::{debug, error, info}; +use tracing::{debug, info}; use types::{ error::{DagError, DagResult}, metered_channel::{Receiver, Sender}, - BatchDigest, Certificate, CertificateDigest, Header, HeaderDigest, ReconfigureNotification, - Round, + try_fut_and_permit, BatchDigest, Certificate, CertificateDigest, Header, HeaderDigest, + ReconfigureNotification, Round, }; #[cfg(test)] @@ -132,15 +133,6 @@ impl HeaderWaiter { }) } - /// Update the committee and cleanup internal state. - fn change_epoch(&mut self, committee: Committee) { - self.committee = committee; - - self.pending.clear(); - self.batch_requests.clear(); - self.parent_requests.clear(); - } - /// Helper function. It waits for particular data to become available in the storage /// and then delivers the specified header. async fn waiter( @@ -266,9 +258,9 @@ impl HeaderWaiter { } } }, - - Some(result) = waiting.next() => match result { - Ok(Some(header)) => { + // Note : we poll the availability of a slot to send the result to the core simultaneously + (Some(result), permit) = try_fut_and_permit!(waiting.try_next(), self.tx_core) => match result { + Some(header) => { let _ = self.pending.remove(&header.id); for x in header.payload.keys() { let _ = self.batch_requests.remove(x); @@ -276,17 +268,11 @@ impl HeaderWaiter { for x in &header.parents { let _ = self.parent_requests.remove(x); } - if self.tx_core.send(header).await.is_err() { - debug!("{}", DagError::ShuttingDown) - } + permit.send(header); }, - Ok(None) => { + None => { // This request has been canceled. }, - Err(e) => { - error!("{e}"); - panic!("Storage failure: killing node."); - } }, () = &mut timer => { @@ -327,7 +313,12 @@ impl HeaderWaiter { let message = self.rx_reconfigure.borrow().clone(); match message { ReconfigureNotification::NewEpoch(new_committee) => { - self.change_epoch(new_committee); + // Update the committee and cleanup internal state. + self.committee = new_committee; + + self.pending.clear(); + self.batch_requests.clear(); + self.parent_requests.clear(); }, ReconfigureNotification::UpdateCommittee(new_committee) => { self.committee = new_committee; diff --git a/types/src/error.rs b/types/src/error.rs index 3b0e01ea0..c55869879 100644 --- a/types/src/error.rs +++ b/types/src/error.rs @@ -23,10 +23,31 @@ macro_rules! ensure { }; } +#[macro_export] +macro_rules! try_fut_and_permit { + ($fut:expr, $sender:expr) => { + futures::future::TryFutureExt::unwrap_or_else( + futures::future::try_join( + $fut, + futures::TryFutureExt::map_err($sender.reserve(), |_e| { + DagError::ClosedChannel(stringify!(sender).to_owned()) + }), + ), + |e| { + tracing::error!("{e}"); + panic!("I/O failure, killing the node."); + }, + ) + }; +} + pub type DagResult = Result; #[derive(Debug, Error)] pub enum DagError { + #[error("Channel {0} has closed unexpectedly")] + ClosedChannel(String), + #[error("Invalid signature")] InvalidSignature(#[from] signature::Error), From 94813d373152138c8d33c89b91aba46e90f5afa1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Garillot?= Date: Sat, 6 Aug 2022 15:11:58 -0400 Subject: [PATCH 2/2] feat: don't block on sending on a full channel in executor --- executor/src/errors.rs | 21 +++++++++++++++++++++ executor/src/subscriber.rs | 25 +++++++++---------------- 2 files changed, 30 insertions(+), 16 deletions(-) diff --git a/executor/src/errors.rs b/executor/src/errors.rs index 99e5ec90b..034f20fb9 100644 --- a/executor/src/errors.rs +++ b/executor/src/errors.rs @@ -21,10 +21,31 @@ macro_rules! ensure { }; } +#[macro_export] +macro_rules! try_fut_and_permit { + ($fut:expr, $sender:expr) => { + futures::future::TryFutureExt::unwrap_or_else( + futures::future::try_join( + $fut, + futures::TryFutureExt::map_err($sender.reserve(), |_e| { + SubscriberError::ClosedChannel(stringify!(sender).to_owned()) + }), + ), + |e| { + tracing::error!("{e}"); + panic!("I/O failure, killing the node."); + }, + ) + }; +} + pub type SubscriberResult = Result; #[derive(Debug, Error, Clone)] pub enum SubscriberError { + #[error("channel {0} closed unexpectedly")] + ClosedChannel(String), + #[error("Storage failure: {0}")] StoreError(#[from] StoreError), diff --git a/executor/src/subscriber.rs b/executor/src/subscriber.rs index 460c1ffd1..221028417 100644 --- a/executor/src/subscriber.rs +++ b/executor/src/subscriber.rs @@ -1,11 +1,11 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::errors::{SubscriberError, SubscriberResult}; -use consensus::ConsensusOutput; -use futures::{ - future::try_join_all, - stream::{FuturesOrdered, StreamExt}, +use crate::{ + errors::{SubscriberError, SubscriberResult}, + try_fut_and_permit, }; +use consensus::ConsensusOutput; +use futures::{future::try_join_all, stream::FuturesOrdered, FutureExt, TryStreamExt}; use store::Store; use tokio::{ sync::{ @@ -14,7 +14,6 @@ use tokio::{ }, task::JoinHandle, }; -use tracing::debug; use types::{BatchDigest, ReconfigureNotification, SerializedBatchMessage}; #[cfg(test)] @@ -83,12 +82,9 @@ impl Subscriber { loop { tokio::select! { // Receive the ordered sequence of consensus messages from a consensus node. - Some(message) = self.rx_consensus.recv() => { + (Some(message), permit) = try_fut_and_permit!(self.rx_consensus.recv().map(Ok), self.tx_batch_loader) => { // Send the certificate to the batch loader to download all transactions' data. - self.tx_batch_loader - .send(message.clone()) - .await - .expect("Failed to send message ot batch loader"); + permit.send(message.clone()); // Wait for the transaction data to be available in the store. We will then forward these // transactions to the Executor Core for execution. @@ -98,11 +94,8 @@ impl Subscriber { }, // Receive here consensus messages for which we have downloaded all transactions data. - Some(message) = waiting.next() => { - if self.tx_executor.send(message?).await.is_err() { - debug!("Executor core is shutting down"); - return Ok(()); - } + (Some(message), permit) = try_fut_and_permit!(waiting.try_next(), self.tx_executor) => { + permit.send(message) }, // Check whether the committee changed.