Skip to content
This repository has been archived by the owner on Oct 17, 2022. It is now read-only.

Avoid blocking tokio::select branches on a potent. full channel #705

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions executor/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,31 @@ macro_rules! ensure {
};
}

#[macro_export]
macro_rules! try_fut_and_permit {
($fut:expr, $sender:expr) => {
futures::future::TryFutureExt::unwrap_or_else(
futures::future::try_join(
$fut,
futures::TryFutureExt::map_err($sender.reserve(), |_e| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very elegant idea to use the reserve here. I am not fully fluent with macros: this will keep and return what is provided by the reservation (permit) until the send happens right?

SubscriberError::ClosedChannel(stringify!(sender).to_owned())
}),
),
|e| {
tracing::error!("{e}");
panic!("I/O failure, killing the node.");
},
)
};
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets stick this macro in the utils crate, and re-use it in places in Sui where we risk blocking select loops.


pub type SubscriberResult<T> = Result<T, SubscriberError>;

#[derive(Debug, Error, Clone)]
pub enum SubscriberError {
#[error("channel {0} closed unexpectedly")]
ClosedChannel(String),

#[error("Storage failure: {0}")]
StoreError(#[from] StoreError),

Expand Down
25 changes: 9 additions & 16 deletions executor/src/subscriber.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::errors::{SubscriberError, SubscriberResult};
use consensus::ConsensusOutput;
use futures::{
future::try_join_all,
stream::{FuturesOrdered, StreamExt},
use crate::{
errors::{SubscriberError, SubscriberResult},
try_fut_and_permit,
};
use consensus::ConsensusOutput;
use futures::{future::try_join_all, stream::FuturesOrdered, FutureExt, TryStreamExt};
use store::Store;
use tokio::{
sync::{
Expand All @@ -14,7 +14,6 @@ use tokio::{
},
task::JoinHandle,
};
use tracing::debug;
use types::{BatchDigest, ReconfigureNotification, SerializedBatchMessage};

#[cfg(test)]
Expand Down Expand Up @@ -83,12 +82,9 @@ impl Subscriber {
loop {
tokio::select! {
// Receive the ordered sequence of consensus messages from a consensus node.
Some(message) = self.rx_consensus.recv() => {
(Some(message), permit) = try_fut_and_permit!(self.rx_consensus.recv().map(Ok), self.tx_batch_loader) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So @huitseeker should we expect on every iteration to try and acquire a permit for the requested channel? I am trying to wrap my head around of how the underlying join will work here on the select branch.

// Send the certificate to the batch loader to download all transactions' data.
self.tx_batch_loader
.send(message.clone())
.await
.expect("Failed to send message ot batch loader");
permit.send(message.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I confirmed the answer to my own question above is "yes". All is good!


// Wait for the transaction data to be available in the store. We will then forward these
// transactions to the Executor Core for execution.
Expand All @@ -98,11 +94,8 @@ impl Subscriber {
},

// Receive here consensus messages for which we have downloaded all transactions data.
Some(message) = waiting.next() => {
if self.tx_executor.send(message?).await.is_err() {
debug!("Executor core is shutting down");
return Ok(());
}
(Some(message), permit) = try_fut_and_permit!(waiting.try_next(), self.tx_executor) => {
permit.send(message)
},

// Check whether the committee changed.
Expand Down
21 changes: 6 additions & 15 deletions primary/src/certificate_waiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
use crate::metrics::PrimaryMetrics;
use config::Committee;
use dashmap::DashMap;
use futures::{
future::try_join_all,
stream::{futures_unordered::FuturesUnordered, StreamExt as _},
};
use futures::{future::try_join_all, stream::futures_unordered::FuturesUnordered, TryStreamExt};
use once_cell::sync::OnceCell;
use std::sync::Arc;
use store::Store;
Expand All @@ -16,11 +13,12 @@ use tokio::{
task::JoinHandle,
time::{sleep, Duration, Instant},
};
use tracing::{error, info};
use tracing::info;
use types::{
error::{DagError, DagResult},
metered_channel::{Receiver, Sender},
Certificate, CertificateDigest, HeaderDigest, ReconfigureNotification, Round,
try_fut_and_permit, Certificate, CertificateDigest, HeaderDigest, ReconfigureNotification,
Round,
};

#[cfg(test)]
Expand Down Expand Up @@ -142,18 +140,11 @@ impl CertificateWaiter {
let fut = Self::waiter(wait_for, &self.store, certificate, rx_cancel);
waiting.push(fut);
}
Some(result) = waiting.next() => match result {
Ok(certificate) => {
(Some(certificate), permit) = try_fut_and_permit!(waiting.try_next(), self.tx_core) => {
// TODO [issue #115]: To ensure crash-recovery of consensus, it is not enough to send every
// certificate for which their ancestors are in the storage. After recovery, we may also
// need to send a all parents certificates with rounds greater then `last_committed`.

self.tx_core.send(certificate).await.expect("Failed to send certificate");
},
Err(e) => {
error!("{e}");
panic!("Storage failure: killing node.");
}
permit.send(certificate);
},
result = self.rx_reconfigure.changed() => {
result.expect("Committee channel dropped");
Expand Down
41 changes: 16 additions & 25 deletions primary/src/header_waiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use config::{Committee, WorkerId};
use crypto::PublicKey;
use futures::{
future::{try_join_all, BoxFuture},
stream::{futures_unordered::FuturesUnordered, StreamExt as _},
stream::futures_unordered::FuturesUnordered,
TryStreamExt,
};
use network::{LuckyNetwork, PrimaryNetwork, PrimaryToWorkerNetwork, UnreliableNetwork};
use serde::{de::DeserializeOwned, Serialize};
Expand All @@ -24,12 +25,12 @@ use tokio::{
task::JoinHandle,
time::{sleep, Duration, Instant},
};
use tracing::{debug, error, info};
use tracing::{debug, info};
use types::{
error::{DagError, DagResult},
metered_channel::{Receiver, Sender},
BatchDigest, Certificate, CertificateDigest, Header, HeaderDigest, ReconfigureNotification,
Round,
try_fut_and_permit, BatchDigest, Certificate, CertificateDigest, Header, HeaderDigest,
ReconfigureNotification, Round,
};

#[cfg(test)]
Expand Down Expand Up @@ -132,15 +133,6 @@ impl HeaderWaiter {
})
}

/// Update the committee and cleanup internal state.
fn change_epoch(&mut self, committee: Committee) {
self.committee = committee;

self.pending.clear();
self.batch_requests.clear();
self.parent_requests.clear();
}

/// Helper function. It waits for particular data to become available in the storage
/// and then delivers the specified header.
async fn waiter<T, V>(
Expand Down Expand Up @@ -266,27 +258,21 @@ impl HeaderWaiter {
}
}
},

Some(result) = waiting.next() => match result {
Ok(Some(header)) => {
// Note : we poll the availability of a slot to send the result to the core simultaneously
(Some(result), permit) = try_fut_and_permit!(waiting.try_next(), self.tx_core) => match result {
Some(header) => {
let _ = self.pending.remove(&header.id);
for x in header.payload.keys() {
let _ = self.batch_requests.remove(x);
}
for x in &header.parents {
let _ = self.parent_requests.remove(x);
}
if self.tx_core.send(header).await.is_err() {
debug!("{}", DagError::ShuttingDown)
}
permit.send(header);
},
Ok(None) => {
None => {
// This request has been canceled.
},
Err(e) => {
error!("{e}");
panic!("Storage failure: killing node.");
}
},

() = &mut timer => {
Expand Down Expand Up @@ -327,7 +313,12 @@ impl HeaderWaiter {
let message = self.rx_reconfigure.borrow().clone();
match message {
ReconfigureNotification::NewEpoch(new_committee) => {
self.change_epoch(new_committee);
// Update the committee and cleanup internal state.
self.committee = new_committee;

self.pending.clear();
self.batch_requests.clear();
self.parent_requests.clear();
},
ReconfigureNotification::UpdateCommittee(new_committee) => {
self.committee = new_committee;
Expand Down
21 changes: 21 additions & 0 deletions types/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,31 @@ macro_rules! ensure {
};
}

#[macro_export]
macro_rules! try_fut_and_permit {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to define twice?

($fut:expr, $sender:expr) => {
futures::future::TryFutureExt::unwrap_or_else(
futures::future::try_join(
$fut,
futures::TryFutureExt::map_err($sender.reserve(), |_e| {
DagError::ClosedChannel(stringify!(sender).to_owned())
}),
),
|e| {
tracing::error!("{e}");
panic!("I/O failure, killing the node.");
},
)
};
}

pub type DagResult<T> = Result<T, DagError>;

#[derive(Debug, Error)]
pub enum DagError {
#[error("Channel {0} has closed unexpectedly")]
ClosedChannel(String),

#[error("Invalid signature")]
InvalidSignature(#[from] signature::Error),

Expand Down