Skip to content
This repository has been archived by the owner on Oct 17, 2022. It is now read-only.

feat: Avoid blocking tokio::select branches on a potent. full channel #724

Merged
merged 8 commits into from
Aug 18, 2022
21 changes: 21 additions & 0 deletions executor/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,31 @@ macro_rules! ensure {
};
}

#[macro_export]
macro_rules! try_fut_and_permit {
($fut:expr, $sender:expr) => {
futures::future::TryFutureExt::unwrap_or_else(
futures::future::try_join(
$fut,
futures::TryFutureExt::map_err($sender.reserve(), |_e| {
SubscriberError::ClosedChannel(stringify!(sender).to_owned())
}),
),
|e| {
tracing::error!("{e}");
panic!("I/O failure, killing the node.");
},
)
};
}

pub type SubscriberResult<T> = Result<T, SubscriberError>;

#[derive(Debug, Error, Clone)]
pub enum SubscriberError {
#[error("channel {0} closed unexpectedly")]
ClosedChannel(String),

#[error("Storage failure: {0}")]
StoreError(#[from] StoreError),

Expand Down
4 changes: 4 additions & 0 deletions executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,17 @@ impl Executor {
SubscriberError::OnlyOneConsensusClientPermitted
);

// We expect this will ultimately be needed in the `Core` as well as the `Subscriber`.
let arc_metrics = Arc::new(metrics);

// Spawn the subscriber.
let subscriber_handle = Subscriber::spawn(
store.clone(),
tx_get_block_commands,
tx_reconfigure.subscribe(),
rx_consensus,
tx_executor,
arc_metrics,
);

// Spawn the executor's core.
Expand Down
8 changes: 8 additions & 0 deletions executor/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use prometheus::{default_registry, register_int_gauge_with_registry, IntGauge, R
pub struct ExecutorMetrics {
/// occupancy of the channel from the `Subscriber` to `Core`
pub tx_executor: IntGauge,
/// Number of elements in the waiting (ready-to-deliver) list of subscriber
pub waiting_elements_subscriber: IntGauge,
}

impl ExecutorMetrics {
Expand All @@ -17,6 +19,12 @@ impl ExecutorMetrics {
registry
)
.unwrap(),
waiting_elements_subscriber: register_int_gauge_with_registry!(
"waiting_elements_subscriber",
"Number of waiting elements in the subscriber",
registry
)
.unwrap(),
}
}
}
Expand Down
40 changes: 27 additions & 13 deletions executor/src/subscriber.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::{errors::SubscriberResult, SubscriberError, SubscriberError::PayloadRetrieveError};
use crate::{
errors::SubscriberResult, metrics::ExecutorMetrics, try_fut_and_permit, SubscriberError,
SubscriberError::PayloadRetrieveError,
};
use backoff::{Error, ExponentialBackoff};
use consensus::ConsensusOutput;
use crypto::Hash;
use futures::stream::{FuturesOrdered, StreamExt};
use primary::BlockCommand;
use std::time::Duration;
use std::{sync::Arc, time::Duration};
use store::Store;
use tokio::{
sync::{oneshot, watch},
task::JoinHandle,
};
use tracing::{debug, error};
use types::{metered_channel, Batch, BatchDigest, ReconfigureNotification};
use tracing::error;
use types::{
bounded_future_queue::BoundedFuturesOrdered, metered_channel, Batch, BatchDigest,
ReconfigureNotification,
};

#[cfg(test)]
#[path = "tests/subscriber_tests.rs"]
Expand All @@ -38,9 +43,14 @@ pub struct Subscriber {
// When asking for a certificate's payload we want to retry until we succeed, unless
// some irrecoverable error occurs. For that reason a backoff policy is defined
get_block_retry_policy: ExponentialBackoff,
/// The metrics handler
metrics: Arc<ExecutorMetrics>,
}

impl Subscriber {
/// Returns the max amount of pending consensus messages we should expect.
const MAX_PENDING_CONSENSUS_MESSAGES: usize = 2000;

/// Spawn a new subscriber in a new tokio task.
#[must_use]
pub fn spawn(
Expand All @@ -49,6 +59,7 @@ impl Subscriber {
rx_reconfigure: watch::Receiver<ReconfigureNotification>,
rx_consensus: metered_channel::Receiver<ConsensusOutput>,
tx_executor: metered_channel::Sender<ConsensusOutput>,
metrics: Arc<ExecutorMetrics>,
) -> JoinHandle<()> {
let get_block_retry_policy = ExponentialBackoff {
initial_interval: Duration::from_millis(500),
Expand All @@ -67,6 +78,7 @@ impl Subscriber {
tx_executor,
tx_get_block_commands,
get_block_retry_policy,
metrics,
}
.run()
.await
Expand All @@ -82,13 +94,14 @@ impl Subscriber {
// mater if we somehow managed to fetch the batches from a later
// certificate. Unless the earlier certificate's payload has been
// fetched, no later certificate will be delivered.
let mut waiting = FuturesOrdered::new();
let mut waiting =
BoundedFuturesOrdered::with_capacity(Self::MAX_PENDING_CONSENSUS_MESSAGES);

// Listen to sequenced consensus message and process them.
loop {
tokio::select! {
// Receive the ordered sequence of consensus messages from a consensus node.
Some(message) = self.rx_consensus.recv() => {
Some(message) = self.rx_consensus.recv(), if waiting.available_permits() > 0 => {
// Fetch the certificate's payload from the workers. This is done via the
// block_waiter component. If the batches are not available in the workers then
// block_waiter will do its best to sync from the other peers. Once all batches
Expand All @@ -98,15 +111,12 @@ impl Subscriber {
self.store.clone(),
self.tx_get_block_commands.clone(),
message);
waiting.push_back(future);
waiting.push(future).await;
},

// Receive here consensus messages for which we have downloaded all transactions data.
Some(message) = waiting.next() => {
if self.tx_executor.send(message?).await.is_err() {
debug!("Executor core is shutting down");
return Ok(());
}
(Some(message), permit) = try_fut_and_permit!(waiting.try_next(), self.tx_executor) => {
permit.send(message)
},

// Check whether the committee changed.
Expand All @@ -118,6 +128,10 @@ impl Subscriber {
}
}
}

self.metrics
.waiting_elements_subscriber
.set(waiting.len() as i64);
}
}

Expand Down
3 changes: 3 additions & 0 deletions executor/src/tests/subscriber_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use super::*;
use crate::fixtures::{test_store, test_u64_certificates};
use primary::GetBlockResponse;
use prometheus::Registry;
use test_utils::{committee, test_channel};
use types::{
BatchMessage, BlockError, BlockErrorKind, BlockResult, CertificateDigest, SequenceNumber,
Expand All @@ -24,12 +25,14 @@ async fn spawn_subscriber(

// Spawn a test subscriber.
let store = test_store();
let executor_metrics = ExecutorMetrics::new(&Registry::new());
let subscriber_handle = Subscriber::spawn(
store.clone(),
tx_get_block_commands,
rx_reconfigure,
rx_sequence,
tx_executor,
Arc::new(executor_metrics),
);

(store, tx_reconfigure, subscriber_handle)
Expand Down
44 changes: 24 additions & 20 deletions primary/src/certificate_waiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
use crate::metrics::PrimaryMetrics;
use config::Committee;
use dashmap::DashMap;
use futures::{
future::try_join_all,
stream::{futures_unordered::FuturesUnordered, StreamExt as _},
};
use futures::future::try_join_all;
use once_cell::sync::OnceCell;
use std::sync::Arc;
use store::Store;
Expand All @@ -16,11 +13,13 @@ use tokio::{
task::JoinHandle,
time::{sleep, Duration, Instant},
};
use tracing::{error, info};
use tracing::info;
use types::{
bounded_future_queue::BoundedFuturesUnordered,
error::{DagError, DagResult},
metered_channel::{Receiver, Sender},
Certificate, CertificateDigest, HeaderDigest, ReconfigureNotification, Round,
try_fut_and_permit, Certificate, CertificateDigest, HeaderDigest, ReconfigureNotification,
Round,
};

#[cfg(test)]
Expand Down Expand Up @@ -58,6 +57,12 @@ pub struct CertificateWaiter {
}

impl CertificateWaiter {
/// Returns the max amount of pending certificates we should expect. In the worst case of causal completion,
/// this can be `self.gc_depth` x `self.committee.len()`
pub fn max_pending_certificates(&self) -> usize {
self.gc_depth as usize * self.committee.size() * 2
}

#[must_use]
pub fn spawn(
committee: Committee,
Expand Down Expand Up @@ -106,7 +111,7 @@ impl CertificateWaiter {
}

async fn run(&mut self) {
let mut waiting = FuturesUnordered::new();
let mut waiting = BoundedFuturesUnordered::with_capacity(self.max_pending_certificates());
let timer = sleep(Duration::from_millis(GC_RESOLUTION));
tokio::pin!(timer);
let mut attempt_garbage_collection;
Expand All @@ -117,7 +122,8 @@ impl CertificateWaiter {
attempt_garbage_collection = false;

tokio::select! {
Some(certificate) = self.rx_synchronizer.recv() => {
// We only accept new elements if we have "room" for them
Some(certificate) = self.rx_synchronizer.recv(), if waiting.available_permits() > 0 => {
if certificate.epoch() < self.committee.epoch() {
continue;
}
Expand All @@ -140,20 +146,14 @@ impl CertificateWaiter {
};
self.pending.insert(header_id, (certificate.round(), once_cancel));
let fut = Self::waiter(wait_for, &self.store, certificate, rx_cancel);
waiting.push(fut);
waiting.push(fut).await;
}
Some(result) = waiting.next() => match result {
Ok(certificate) => {
// we poll the availability of a slot to send the result to the core simultaneously
(Some(certificate), permit) = try_fut_and_permit!(waiting.try_next(), self.tx_core) => {
// TODO [issue #115]: To ensure crash-recovery of consensus, it is not enough to send every
// certificate for which their ancestors are in the storage. After recovery, we may also
// need to send a all parents certificates with rounds greater then `last_committed`.

self.tx_core.send(certificate).await.expect("Failed to send certificate");
},
Err(e) => {
error!("{e}");
panic!("Storage failure: killing node.");
}
permit.send(certificate);
},
result = self.rx_reconfigure.changed() => {
result.expect("Committee channel dropped");
Expand Down Expand Up @@ -211,14 +211,18 @@ impl CertificateWaiter {
}
}

self.update_metrics();
self.update_metrics(waiting.len());
}
}

fn update_metrics(&self) {
fn update_metrics(&self, waiting_len: usize) {
self.metrics
.pending_elements_certificate_waiter
.with_label_values(&[&self.committee.epoch.to_string()])
.set(self.pending.len() as i64);
self.metrics
.waiting_elements_certificate_waiter
.with_label_values(&[&self.committee.epoch.to_string()])
.set(waiting_len as i64);
}
}
Loading