diff --git a/narwhal/executor/src/metrics.rs b/narwhal/executor/src/metrics.rs index 812c3c890e058..d5a2d157d55c8 100644 --- a/narwhal/executor/src/metrics.rs +++ b/narwhal/executor/src/metrics.rs @@ -1,13 +1,22 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use prometheus::{default_registry, register_int_gauge_with_registry, IntGauge, Registry}; +use prometheus::{ + default_registry, register_histogram_with_registry, register_int_counter_with_registry, + register_int_gauge_with_registry, Histogram, IntCounter, IntGauge, Registry, +}; #[derive(Clone, Debug)] pub struct ExecutorMetrics { /// occupancy of the channel from the `Subscriber` to `Core` pub tx_executor: IntGauge, - /// Number of elements in the waiting (ready-to-deliver) list of subscriber - pub waiting_elements_subscriber: IntGauge, + /// Time it takes to download a payload on the Subscriber + pub subscriber_download_payload_latency: Histogram, + /// The number of attempts to successfully download + /// a certificate's payload in Subscriber. + pub subscriber_download_payload_attempts: Histogram, + /// The number of certificates processed by Subscriber + /// during the recovery period to fetch their payloads. + pub subscriber_recovered_certificates_count: IntCounter, } impl ExecutorMetrics { @@ -19,12 +28,30 @@ impl ExecutorMetrics { registry ) .unwrap(), - waiting_elements_subscriber: register_int_gauge_with_registry!( - "waiting_elements_subscriber", - "Number of waiting elements in the subscriber", + subscriber_download_payload_latency: register_histogram_with_registry!( + "subscriber_download_payload_latency", + "Time it takes to download a payload on the Subscriber", + // the buckets defined in seconds + vec![ + 0.005, 0.01, 0.02, 0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 3.0, 5.0, 10.0, 20.0, 40.0, + 60.0 + ], registry ) .unwrap(), + subscriber_recovered_certificates_count: register_int_counter_with_registry!( + "subscriber_recovered_certificates_count", + "The number of certificates processed by Subscriber during the recovery period to fetch their payloads", + registry + ).unwrap(), + subscriber_download_payload_attempts: register_histogram_with_registry!( + "subscriber_download_payload_attempts", + "The number of attempts to successfully download a certificate's payload in Subscriber", + vec![ + 1.0, 2.0, 3.0, 4.0, 5.0, 7.0, 10.0, 15.0, 20.0 + ], + registry + ).unwrap() } } } diff --git a/narwhal/executor/src/subscriber.rs b/narwhal/executor/src/subscriber.rs index 5dfbfb24498d6..0759e1f6511b9 100644 --- a/narwhal/executor/src/subscriber.rs +++ b/narwhal/executor/src/subscriber.rs @@ -1,7 +1,7 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 use crate::{ - errors::SubscriberResult, metrics::ExecutorMetrics, try_fut_and_permit, SubscriberError, + errors::SubscriberResult, metrics::ExecutorMetrics, SubscriberError, SubscriberError::PayloadRetrieveError, }; use backoff::{Error, ExponentialBackoff}; @@ -14,11 +14,8 @@ use tokio::{ sync::{oneshot, watch}, task::JoinHandle, }; -use tracing::error; -use types::{ - bounded_future_queue::BoundedFuturesOrdered, metered_channel, Batch, BatchDigest, - CertificateDigest, ReconfigureNotification, -}; +use tracing::{debug_span, error, instrument, Instrument}; +use types::{metered_channel, Batch, BatchDigest, CertificateDigest, ReconfigureNotification}; #[cfg(test)] #[path = "tests/subscriber_tests.rs"] @@ -48,9 +45,6 @@ pub struct Subscriber { } impl Subscriber { - /// Returns the max amount of pending consensus messages we should expect. - const MAX_PENDING_CONSENSUS_MESSAGES: usize = 2000; - /// Spawn a new subscriber in a new tokio task. #[must_use] pub fn spawn( @@ -92,47 +86,29 @@ impl Subscriber { &mut self, restored_consensus_output: Vec, ) -> SubscriberResult<()> { - // It's important to have the futures in ordered fashion as we want - // to guarantee that will deliver to the executor the certificates - // in the same order we received from rx_consensus. So it doesn't - // mater if we somehow managed to fetch the batches from a later - // certificate. Unless the earlier certificate's payload has been - // fetched, no later certificate will be delivered. - let mut waiting = - BoundedFuturesOrdered::with_capacity(Self::MAX_PENDING_CONSENSUS_MESSAGES); + // It's important to process the consensus output in strictly ordered + // fashion to guarantee that we will deliver to the executor the certificates + // in the same order we received from rx_consensus. // First handle any consensus output messages that were restored due to a restart. - // This needs to happen before we start listening on rx_consensus and receive messages sequenced after these. - for message in restored_consensus_output { - let future = Self::wait_on_payload( - self.get_block_retry_policy.clone(), - self.store.clone(), - self.tx_get_block_commands.clone(), - message, - ); - waiting.push(future).await; + // This needs to happen before we start listening on rx_consensus and receive messages + // sequenced after these. + if let Err(err) = self + .recover_from_consensus_output(restored_consensus_output) + .await + { + error!("Executor subscriber is shutting down: {err}"); + return Ok(()); } - // Listen to sequenced consensus message and process them. loop { tokio::select! { // Receive the ordered sequence of consensus messages from a consensus node. - Some(message) = self.rx_consensus.recv(), if waiting.available_permits() > 0 => { - // Fetch the certificate's payload from the workers. This is done via the - // block_waiter component. If the batches are not available in the workers then - // block_waiter will do its best to sync from the other peers. Once all batches - // are available, we forward the certificate to the Executor Core. - let future = Self::wait_on_payload( - self.get_block_retry_policy.clone(), - self.store.clone(), - self.tx_get_block_commands.clone(), - message); - waiting.push(future).await; - }, - - // Receive here consensus messages for which we have downloaded all transactions data. - (Some(message), permit) = try_fut_and_permit!(waiting.try_next(), self.tx_executor) => { - permit.send(message) + Some(message) = self.rx_consensus.recv() => { + if let Err(err) = self.download_payload_and_forward(message).await { + error!("Executor subscriber is shutting down: {err}"); + return Ok(()); + } }, // Check whether the committee changed. @@ -144,11 +120,65 @@ impl Subscriber { } } } + } + } + + /// Reads all the restored_consensus_output one by one, fetches their payload + /// in order, and delivers them to the tx_executor channel. This is a sequential + /// blocking operation. We should expect to block if executor is saturated, but + /// this is desired to avoid overloading our system making this easier to trace. + #[instrument(level="info", skip_all, fields(num_of_certificates = restored_consensus_output.len()), err)] + async fn recover_from_consensus_output( + &self, + restored_consensus_output: Vec, + ) -> SubscriberResult<()> { + for message in restored_consensus_output { + // we are making this a sequential/blocking operation as the number of payloads + // that needs to be fetched might exceed the size of the waiting list and then + // we'll never be able to empty it until as we'll never reach the following loop. + // Also throttling the recovery is another measure to ensure we don't flood our + // network with messages. + self.download_payload_and_forward(message).await?; - self.metrics - .waiting_elements_subscriber - .set(waiting.len() as i64); + self.metrics.subscriber_recovered_certificates_count.inc(); } + + Ok(()) + } + + /// Downloads the payload from the worker and forwards the output to the + /// executor channel if the operation is successful. An error is returned + /// if we can't forward the output to the executor. If an irrecoverable error + /// has occurred while downloading the payload then this method panics. + #[instrument(level="debug", skip_all, fields(certificate_id = ?message.certificate.digest()), err)] + async fn download_payload_and_forward(&self, message: ConsensusOutput) -> SubscriberResult<()> { + // Fetch the certificate's payload from the workers. This is done via the + // block_waiter component. If the batches are not available in the workers then + // block_waiter will do its best to sync from the other peers. Once all batches + // are available, we forward the certificate to the Executor Core. + let result = Self::wait_on_payload( + self.metrics.clone(), + self.get_block_retry_policy.clone(), + self.store.clone(), + self.tx_get_block_commands.clone(), + message, + ) + .await; + + match result { + Ok(output) => { + if self.tx_executor.send(output).await.is_err() { + return Err(SubscriberError::ClosedChannel( + stringify!(self.tx_executor).to_owned(), + )); + } + } + Err(err) => { + panic!("Irrecoverable error occurred while retrieving block payload: {err}"); + } + } + + Ok(()) } /// The wait_on_payload will try to retrieve the certificate's payload @@ -157,17 +187,29 @@ impl Subscriber { /// sequenced we will not quit this method until we have successfully /// fetched the payload. async fn wait_on_payload( + metrics: Arc, back_off_policy: ExponentialBackoff, store: Store<(CertificateDigest, BatchDigest), Batch>, tx_get_block_commands: metered_channel::Sender, deliver: ConsensusOutput, ) -> SubscriberResult { + // the latency will be measured automatically once the guard + // goes out of scope and dropped + let _start_guard = metrics.subscriber_download_payload_latency.start_timer(); + let mut attempts_count = 0; + let get_block = move || { let message = deliver.clone(); let certificate_id = message.certificate.digest(); let tx_get_block = tx_get_block_commands.clone(); let batch_store = store.clone(); + let executor_metrics = metrics.clone(); + let attempts = { + attempts_count += 1; + attempts_count + }; + let span = debug_span!("get_block", attempt = attempts); async move { let (sender, receiver) = oneshot::channel(); @@ -196,6 +238,10 @@ impl Subscriber { .await .map_err(|err| Error::permanent(SubscriberError::from(err)))?; + executor_metrics + .subscriber_download_payload_attempts + .observe(attempts as f64); + Ok(message) } Err(err) => { @@ -209,6 +255,7 @@ impl Subscriber { } } } + .instrument(span) }; backoff::future::retry(back_off_policy, get_block).await diff --git a/narwhal/executor/src/tests/subscriber_tests.rs b/narwhal/executor/src/tests/subscriber_tests.rs index 0a8668b85f574..64d9ccb5dc1c4 100644 --- a/narwhal/executor/src/tests/subscriber_tests.rs +++ b/narwhal/executor/src/tests/subscriber_tests.rs @@ -21,6 +21,7 @@ fn spawn_subscriber( Store<(CertificateDigest, BatchDigest), Batch>, watch::Sender, JoinHandle<()>, + Arc, ) { let message = ReconfigureNotification::NewEpoch(committee.to_owned()); let (tx_reconfigure, rx_reconfigure) = watch::channel(message); @@ -28,17 +29,18 @@ fn spawn_subscriber( // Spawn a test subscriber. let store = test_store(); let executor_metrics = ExecutorMetrics::new(&Registry::new()); + let metrics = Arc::new(executor_metrics); let subscriber_handle = Subscriber::spawn( store.clone(), tx_get_block_commands, rx_reconfigure, rx_sequence, tx_executor, - Arc::new(executor_metrics), + metrics.clone(), restored_consensus_output, ); - (store, tx_reconfigure, subscriber_handle) + (store, tx_reconfigure, subscriber_handle, metrics) } #[tokio::test] @@ -50,7 +52,7 @@ async fn handle_certificate_with_downloaded_batch() { let (tx_get_block_command, mut rx_get_block_command) = test_utils::test_get_block_commands!(1); // Spawn a subscriber. - let (store, _tx_reconfigure, _) = spawn_subscriber( + let (store, _tx_reconfigure, _, _) = spawn_subscriber( &committee, rx_sequence, tx_executor, @@ -120,7 +122,7 @@ async fn should_retry_when_failed_to_get_payload() { let (tx_get_block_command, mut rx_get_block_command) = test_utils::test_get_block_commands!(1); // Spawn a subscriber. - let (store, _tx_reconfigure, _) = spawn_subscriber( + let (store, _tx_reconfigure, _, _) = spawn_subscriber( &committee, rx_sequence, tx_executor, @@ -146,6 +148,7 @@ async fn should_retry_when_failed_to_get_payload() { }; // send the certificate to download payload + tx_sequence.send(message.clone()).await.unwrap(); tx_sequence.send(message).await.unwrap(); // Now assume that the block_wait is responding with error for the @@ -201,7 +204,7 @@ async fn subscriber_should_crash_when_irrecoverable_error() { let (tx_get_block_command, mut rx_get_block_command) = test_utils::test_get_block_commands!(1); // Spawn a subscriber. - let (_store, _tx_reconfigure, handle) = spawn_subscriber( + let (_store, _tx_reconfigure, handle, _) = spawn_subscriber( &committee, rx_sequence, tx_executor, @@ -266,7 +269,7 @@ async fn test_subscriber_with_restored_consensus_output() { .collect(); // Spawn a subscriber. - let (_store, _tx_reconfigure, _handle) = spawn_subscriber( + let (_store, _tx_reconfigure, _handle, _) = spawn_subscriber( &committee, rx_sequence, tx_executor,