Skip to content

Commit

Permalink
[subscriber] convert to sequential approach (MystenLabs#1005)
Browse files Browse the repository at this point in the history
This commit converts the Subscriber to blocking execution. Refactored the recovery method.
  • Loading branch information
akichidis authored Sep 28, 2022
1 parent 733907d commit cbd8ab6
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 58 deletions.
39 changes: 33 additions & 6 deletions narwhal/executor/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use prometheus::{default_registry, register_int_gauge_with_registry, IntGauge, Registry};
use prometheus::{
default_registry, register_histogram_with_registry, register_int_counter_with_registry,
register_int_gauge_with_registry, Histogram, IntCounter, IntGauge, Registry,
};

#[derive(Clone, Debug)]
pub struct ExecutorMetrics {
/// occupancy of the channel from the `Subscriber` to `Core`
pub tx_executor: IntGauge,
/// Number of elements in the waiting (ready-to-deliver) list of subscriber
pub waiting_elements_subscriber: IntGauge,
/// Time it takes to download a payload on the Subscriber
pub subscriber_download_payload_latency: Histogram,
/// The number of attempts to successfully download
/// a certificate's payload in Subscriber.
pub subscriber_download_payload_attempts: Histogram,
/// The number of certificates processed by Subscriber
/// during the recovery period to fetch their payloads.
pub subscriber_recovered_certificates_count: IntCounter,
}

impl ExecutorMetrics {
Expand All @@ -19,12 +28,30 @@ impl ExecutorMetrics {
registry
)
.unwrap(),
waiting_elements_subscriber: register_int_gauge_with_registry!(
"waiting_elements_subscriber",
"Number of waiting elements in the subscriber",
subscriber_download_payload_latency: register_histogram_with_registry!(
"subscriber_download_payload_latency",
"Time it takes to download a payload on the Subscriber",
// the buckets defined in seconds
vec![
0.005, 0.01, 0.02, 0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 3.0, 5.0, 10.0, 20.0, 40.0,
60.0
],
registry
)
.unwrap(),
subscriber_recovered_certificates_count: register_int_counter_with_registry!(
"subscriber_recovered_certificates_count",
"The number of certificates processed by Subscriber during the recovery period to fetch their payloads",
registry
).unwrap(),
subscriber_download_payload_attempts: register_histogram_with_registry!(
"subscriber_download_payload_attempts",
"The number of attempts to successfully download a certificate's payload in Subscriber",
vec![
1.0, 2.0, 3.0, 4.0, 5.0, 7.0, 10.0, 15.0, 20.0
],
registry
).unwrap()
}
}
}
Expand Down
139 changes: 93 additions & 46 deletions narwhal/executor/src/subscriber.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::{
errors::SubscriberResult, metrics::ExecutorMetrics, try_fut_and_permit, SubscriberError,
errors::SubscriberResult, metrics::ExecutorMetrics, SubscriberError,
SubscriberError::PayloadRetrieveError,
};
use backoff::{Error, ExponentialBackoff};
Expand All @@ -14,11 +14,8 @@ use tokio::{
sync::{oneshot, watch},
task::JoinHandle,
};
use tracing::error;
use types::{
bounded_future_queue::BoundedFuturesOrdered, metered_channel, Batch, BatchDigest,
CertificateDigest, ReconfigureNotification,
};
use tracing::{debug_span, error, instrument, Instrument};
use types::{metered_channel, Batch, BatchDigest, CertificateDigest, ReconfigureNotification};

#[cfg(test)]
#[path = "tests/subscriber_tests.rs"]
Expand Down Expand Up @@ -48,9 +45,6 @@ pub struct Subscriber {
}

impl Subscriber {
/// Returns the max amount of pending consensus messages we should expect.
const MAX_PENDING_CONSENSUS_MESSAGES: usize = 2000;

/// Spawn a new subscriber in a new tokio task.
#[must_use]
pub fn spawn(
Expand Down Expand Up @@ -92,47 +86,29 @@ impl Subscriber {
&mut self,
restored_consensus_output: Vec<ConsensusOutput>,
) -> SubscriberResult<()> {
// It's important to have the futures in ordered fashion as we want
// to guarantee that will deliver to the executor the certificates
// in the same order we received from rx_consensus. So it doesn't
// mater if we somehow managed to fetch the batches from a later
// certificate. Unless the earlier certificate's payload has been
// fetched, no later certificate will be delivered.
let mut waiting =
BoundedFuturesOrdered::with_capacity(Self::MAX_PENDING_CONSENSUS_MESSAGES);
// It's important to process the consensus output in strictly ordered
// fashion to guarantee that we will deliver to the executor the certificates
// in the same order we received from rx_consensus.

// First handle any consensus output messages that were restored due to a restart.
// This needs to happen before we start listening on rx_consensus and receive messages sequenced after these.
for message in restored_consensus_output {
let future = Self::wait_on_payload(
self.get_block_retry_policy.clone(),
self.store.clone(),
self.tx_get_block_commands.clone(),
message,
);
waiting.push(future).await;
// This needs to happen before we start listening on rx_consensus and receive messages
// sequenced after these.
if let Err(err) = self
.recover_from_consensus_output(restored_consensus_output)
.await
{
error!("Executor subscriber is shutting down: {err}");
return Ok(());
}

// Listen to sequenced consensus message and process them.
loop {
tokio::select! {
// Receive the ordered sequence of consensus messages from a consensus node.
Some(message) = self.rx_consensus.recv(), if waiting.available_permits() > 0 => {
// Fetch the certificate's payload from the workers. This is done via the
// block_waiter component. If the batches are not available in the workers then
// block_waiter will do its best to sync from the other peers. Once all batches
// are available, we forward the certificate to the Executor Core.
let future = Self::wait_on_payload(
self.get_block_retry_policy.clone(),
self.store.clone(),
self.tx_get_block_commands.clone(),
message);
waiting.push(future).await;
},

// Receive here consensus messages for which we have downloaded all transactions data.
(Some(message), permit) = try_fut_and_permit!(waiting.try_next(), self.tx_executor) => {
permit.send(message)
Some(message) = self.rx_consensus.recv() => {
if let Err(err) = self.download_payload_and_forward(message).await {
error!("Executor subscriber is shutting down: {err}");
return Ok(());
}
},

// Check whether the committee changed.
Expand All @@ -144,11 +120,65 @@ impl Subscriber {
}
}
}
}
}

/// Reads all the restored_consensus_output one by one, fetches their payload
/// in order, and delivers them to the tx_executor channel. This is a sequential
/// blocking operation. We should expect to block if executor is saturated, but
/// this is desired to avoid overloading our system making this easier to trace.
#[instrument(level="info", skip_all, fields(num_of_certificates = restored_consensus_output.len()), err)]
async fn recover_from_consensus_output(
&self,
restored_consensus_output: Vec<ConsensusOutput>,
) -> SubscriberResult<()> {
for message in restored_consensus_output {
// we are making this a sequential/blocking operation as the number of payloads
// that needs to be fetched might exceed the size of the waiting list and then
// we'll never be able to empty it until as we'll never reach the following loop.
// Also throttling the recovery is another measure to ensure we don't flood our
// network with messages.
self.download_payload_and_forward(message).await?;

self.metrics
.waiting_elements_subscriber
.set(waiting.len() as i64);
self.metrics.subscriber_recovered_certificates_count.inc();
}

Ok(())
}

/// Downloads the payload from the worker and forwards the output to the
/// executor channel if the operation is successful. An error is returned
/// if we can't forward the output to the executor. If an irrecoverable error
/// has occurred while downloading the payload then this method panics.
#[instrument(level="debug", skip_all, fields(certificate_id = ?message.certificate.digest()), err)]
async fn download_payload_and_forward(&self, message: ConsensusOutput) -> SubscriberResult<()> {
// Fetch the certificate's payload from the workers. This is done via the
// block_waiter component. If the batches are not available in the workers then
// block_waiter will do its best to sync from the other peers. Once all batches
// are available, we forward the certificate to the Executor Core.
let result = Self::wait_on_payload(
self.metrics.clone(),
self.get_block_retry_policy.clone(),
self.store.clone(),
self.tx_get_block_commands.clone(),
message,
)
.await;

match result {
Ok(output) => {
if self.tx_executor.send(output).await.is_err() {
return Err(SubscriberError::ClosedChannel(
stringify!(self.tx_executor).to_owned(),
));
}
}
Err(err) => {
panic!("Irrecoverable error occurred while retrieving block payload: {err}");
}
}

Ok(())
}

/// The wait_on_payload will try to retrieve the certificate's payload
Expand All @@ -157,17 +187,29 @@ impl Subscriber {
/// sequenced we will not quit this method until we have successfully
/// fetched the payload.
async fn wait_on_payload(
metrics: Arc<ExecutorMetrics>,
back_off_policy: ExponentialBackoff,
store: Store<(CertificateDigest, BatchDigest), Batch>,
tx_get_block_commands: metered_channel::Sender<BlockCommand>,
deliver: ConsensusOutput,
) -> SubscriberResult<ConsensusOutput> {
// the latency will be measured automatically once the guard
// goes out of scope and dropped
let _start_guard = metrics.subscriber_download_payload_latency.start_timer();
let mut attempts_count = 0;

let get_block = move || {
let message = deliver.clone();
let certificate_id = message.certificate.digest();
let tx_get_block = tx_get_block_commands.clone();
let batch_store = store.clone();
let executor_metrics = metrics.clone();
let attempts = {
attempts_count += 1;
attempts_count
};

let span = debug_span!("get_block", attempt = attempts);
async move {
let (sender, receiver) = oneshot::channel();

Expand Down Expand Up @@ -196,6 +238,10 @@ impl Subscriber {
.await
.map_err(|err| Error::permanent(SubscriberError::from(err)))?;

executor_metrics
.subscriber_download_payload_attempts
.observe(attempts as f64);

Ok(message)
}
Err(err) => {
Expand All @@ -209,6 +255,7 @@ impl Subscriber {
}
}
}
.instrument(span)
};

backoff::future::retry(back_off_policy, get_block).await
Expand Down
15 changes: 9 additions & 6 deletions narwhal/executor/src/tests/subscriber_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,26 @@ fn spawn_subscriber(
Store<(CertificateDigest, BatchDigest), Batch>,
watch::Sender<ReconfigureNotification>,
JoinHandle<()>,
Arc<ExecutorMetrics>,
) {
let message = ReconfigureNotification::NewEpoch(committee.to_owned());
let (tx_reconfigure, rx_reconfigure) = watch::channel(message);

// Spawn a test subscriber.
let store = test_store();
let executor_metrics = ExecutorMetrics::new(&Registry::new());
let metrics = Arc::new(executor_metrics);
let subscriber_handle = Subscriber::spawn(
store.clone(),
tx_get_block_commands,
rx_reconfigure,
rx_sequence,
tx_executor,
Arc::new(executor_metrics),
metrics.clone(),
restored_consensus_output,
);

(store, tx_reconfigure, subscriber_handle)
(store, tx_reconfigure, subscriber_handle, metrics)
}

#[tokio::test]
Expand All @@ -50,7 +52,7 @@ async fn handle_certificate_with_downloaded_batch() {
let (tx_get_block_command, mut rx_get_block_command) = test_utils::test_get_block_commands!(1);

// Spawn a subscriber.
let (store, _tx_reconfigure, _) = spawn_subscriber(
let (store, _tx_reconfigure, _, _) = spawn_subscriber(
&committee,
rx_sequence,
tx_executor,
Expand Down Expand Up @@ -120,7 +122,7 @@ async fn should_retry_when_failed_to_get_payload() {
let (tx_get_block_command, mut rx_get_block_command) = test_utils::test_get_block_commands!(1);

// Spawn a subscriber.
let (store, _tx_reconfigure, _) = spawn_subscriber(
let (store, _tx_reconfigure, _, _) = spawn_subscriber(
&committee,
rx_sequence,
tx_executor,
Expand All @@ -146,6 +148,7 @@ async fn should_retry_when_failed_to_get_payload() {
};

// send the certificate to download payload
tx_sequence.send(message.clone()).await.unwrap();
tx_sequence.send(message).await.unwrap();

// Now assume that the block_wait is responding with error for the
Expand Down Expand Up @@ -201,7 +204,7 @@ async fn subscriber_should_crash_when_irrecoverable_error() {
let (tx_get_block_command, mut rx_get_block_command) = test_utils::test_get_block_commands!(1);

// Spawn a subscriber.
let (_store, _tx_reconfigure, handle) = spawn_subscriber(
let (_store, _tx_reconfigure, handle, _) = spawn_subscriber(
&committee,
rx_sequence,
tx_executor,
Expand Down Expand Up @@ -266,7 +269,7 @@ async fn test_subscriber_with_restored_consensus_output() {
.collect();

// Spawn a subscriber.
let (_store, _tx_reconfigure, _handle) = spawn_subscriber(
let (_store, _tx_reconfigure, _handle, _) = spawn_subscriber(
&committee,
rx_sequence,
tx_executor,
Expand Down

0 comments on commit cbd8ab6

Please sign in to comment.