Skip to content
This repository has been archived by the owner on Oct 17, 2022. It is now read-only.

Commit

Permalink
feat: add metric for waiting elements to subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
huitseeker committed Aug 18, 2022
1 parent 8c00d3b commit 4eb7523
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 3 deletions.
4 changes: 4 additions & 0 deletions executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,17 @@ impl Executor {
SubscriberError::OnlyOneConsensusClientPermitted
);

// We expect this will ultimately be needed in the `Core` as well as the `Subscriber`.
let arc_metrics = Arc::new(metrics);

// Spawn the subscriber.
let subscriber_handle = Subscriber::spawn(
store.clone(),
tx_get_block_commands,
tx_reconfigure.subscribe(),
rx_consensus,
tx_executor,
arc_metrics,
);

// Spawn the executor's core.
Expand Down
13 changes: 12 additions & 1 deletion executor/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use prometheus::{default_registry, register_int_gauge_with_registry, IntGauge, Registry};
use prometheus::{
default_registry, register_int_gauge_with_registry,
IntGauge, Registry,
};

#[derive(Clone, Debug)]
pub struct ExecutorMetrics {
/// occupancy of the channel from the `Subscriber` to `Core`
pub tx_executor: IntGauge,
/// Number of elements in the waiting (ready-to-deliver) list of subscriber
pub waiting_elements_subscriber: IntGauge,
}

impl ExecutorMetrics {
Expand All @@ -17,6 +22,12 @@ impl ExecutorMetrics {
registry
)
.unwrap(),
waiting_elements_subscriber: register_int_gauge_with_registry!(
"waiting_elements_subscriber",
"Number of waiting elements in the subscriber",
registry
)
.unwrap(),
}
}
}
Expand Down
12 changes: 10 additions & 2 deletions executor/src/subscriber.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::{
errors::SubscriberResult, try_fut_and_permit, SubscriberError,
errors::SubscriberResult, metrics::ExecutorMetrics, try_fut_and_permit, SubscriberError,
SubscriberError::PayloadRetrieveError,
};
use backoff::{Error, ExponentialBackoff};
use consensus::ConsensusOutput;
use crypto::Hash;
use primary::BlockCommand;
use std::time::Duration;
use std::{sync::Arc, time::Duration};
use store::Store;
use tokio::{
sync::{oneshot, watch},
Expand Down Expand Up @@ -43,6 +43,8 @@ pub struct Subscriber {
// When asking for a certificate's payload we want to retry until we succeed, unless
// some irrecoverable error occurs. For that reason a backoff policy is defined
get_block_retry_policy: ExponentialBackoff,
/// The metrics handler
metrics: Arc<ExecutorMetrics>,
}

impl Subscriber {
Expand All @@ -57,6 +59,7 @@ impl Subscriber {
rx_reconfigure: watch::Receiver<ReconfigureNotification>,
rx_consensus: metered_channel::Receiver<ConsensusOutput>,
tx_executor: metered_channel::Sender<ConsensusOutput>,
metrics: Arc<ExecutorMetrics>,
) -> JoinHandle<()> {
let get_block_retry_policy = ExponentialBackoff {
initial_interval: Duration::from_millis(500),
Expand All @@ -75,6 +78,7 @@ impl Subscriber {
tx_executor,
tx_get_block_commands,
get_block_retry_policy,
metrics,
}
.run()
.await
Expand Down Expand Up @@ -124,6 +128,10 @@ impl Subscriber {
}
}
}

self.metrics
.waiting_elements_subscriber
.set(waiting.len() as i64);
}
}

Expand Down
3 changes: 3 additions & 0 deletions executor/src/tests/subscriber_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use super::*;
use crate::fixtures::{test_store, test_u64_certificates};
use primary::GetBlockResponse;
use prometheus::Registry;
use test_utils::{committee, test_channel};
use types::{
BatchMessage, BlockError, BlockErrorKind, BlockResult, CertificateDigest, SequenceNumber,
Expand All @@ -24,12 +25,14 @@ async fn spawn_subscriber(

// Spawn a test subscriber.
let store = test_store();
let executor_metrics = ExecutorMetrics::new(&Registry::new());
let subscriber_handle = Subscriber::spawn(
store.clone(),
tx_get_block_commands,
rx_reconfigure,
rx_sequence,
tx_executor,
Arc::new(executor_metrics),
);

(store, tx_reconfigure, subscriber_handle)
Expand Down

0 comments on commit 4eb7523

Please sign in to comment.