diff --git a/executor/src/lib.rs b/executor/src/lib.rs index b6aa7a354..0edba58ab 100644 --- a/executor/src/lib.rs +++ b/executor/src/lib.rs @@ -112,6 +112,9 @@ impl Executor { SubscriberError::OnlyOneConsensusClientPermitted ); + // We expect this will ultimately be needed in the `Core` as well as the `Subscriber`. + let arc_metrics = Arc::new(metrics); + // Spawn the subscriber. let subscriber_handle = Subscriber::spawn( store.clone(), @@ -119,6 +122,7 @@ impl Executor { tx_reconfigure.subscribe(), rx_consensus, tx_executor, + arc_metrics, ); // Spawn the executor's core. diff --git a/executor/src/metrics.rs b/executor/src/metrics.rs index 188b88959..d9adaad42 100644 --- a/executor/src/metrics.rs +++ b/executor/src/metrics.rs @@ -1,11 +1,16 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use prometheus::{default_registry, register_int_gauge_with_registry, IntGauge, Registry}; +use prometheus::{ + default_registry, register_int_gauge_with_registry, + IntGauge, Registry, +}; #[derive(Clone, Debug)] pub struct ExecutorMetrics { /// occupancy of the channel from the `Subscriber` to `Core` pub tx_executor: IntGauge, + /// Number of elements in the waiting (ready-to-deliver) list of subscriber + pub waiting_elements_subscriber: IntGauge, } impl ExecutorMetrics { @@ -17,6 +22,12 @@ impl ExecutorMetrics { registry ) .unwrap(), + waiting_elements_subscriber: register_int_gauge_with_registry!( + "waiting_elements_subscriber", + "Number of waiting elements in the subscriber", + registry + ) + .unwrap(), } } } diff --git a/executor/src/subscriber.rs b/executor/src/subscriber.rs index fcd67c32d..e4d5ce5b3 100644 --- a/executor/src/subscriber.rs +++ b/executor/src/subscriber.rs @@ -1,14 +1,14 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 use crate::{ - errors::SubscriberResult, try_fut_and_permit, SubscriberError, + errors::SubscriberResult, metrics::ExecutorMetrics, try_fut_and_permit, SubscriberError, SubscriberError::PayloadRetrieveError, }; use backoff::{Error, ExponentialBackoff}; use consensus::ConsensusOutput; use crypto::Hash; use primary::BlockCommand; -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use store::Store; use tokio::{ sync::{oneshot, watch}, @@ -43,6 +43,8 @@ pub struct Subscriber { // When asking for a certificate's payload we want to retry until we succeed, unless // some irrecoverable error occurs. For that reason a backoff policy is defined get_block_retry_policy: ExponentialBackoff, + /// The metrics handler + metrics: Arc, } impl Subscriber { @@ -57,6 +59,7 @@ impl Subscriber { rx_reconfigure: watch::Receiver, rx_consensus: metered_channel::Receiver, tx_executor: metered_channel::Sender, + metrics: Arc, ) -> JoinHandle<()> { let get_block_retry_policy = ExponentialBackoff { initial_interval: Duration::from_millis(500), @@ -75,6 +78,7 @@ impl Subscriber { tx_executor, tx_get_block_commands, get_block_retry_policy, + metrics, } .run() .await @@ -124,6 +128,10 @@ impl Subscriber { } } } + + self.metrics + .waiting_elements_subscriber + .set(waiting.len() as i64); } } diff --git a/executor/src/tests/subscriber_tests.rs b/executor/src/tests/subscriber_tests.rs index f72bc7d5b..aea45e1e4 100644 --- a/executor/src/tests/subscriber_tests.rs +++ b/executor/src/tests/subscriber_tests.rs @@ -3,6 +3,7 @@ use super::*; use crate::fixtures::{test_store, test_u64_certificates}; use primary::GetBlockResponse; +use prometheus::Registry; use test_utils::{committee, test_channel}; use types::{ BatchMessage, BlockError, BlockErrorKind, BlockResult, CertificateDigest, SequenceNumber, @@ -24,12 +25,14 @@ async fn spawn_subscriber( // Spawn a test subscriber. let store = test_store(); + let executor_metrics = ExecutorMetrics::new(&Registry::new()); let subscriber_handle = Subscriber::spawn( store.clone(), tx_get_block_commands, rx_reconfigure, rx_sequence, tx_executor, + Arc::new(executor_metrics), ); (store, tx_reconfigure, subscriber_handle)