From 8fc52be12e6ca812be65b46d30f2d65a5eaf7c03 Mon Sep 17 00:00:00 2001 From: Joel Nordell <94570446+joel-u410@users.noreply.github.com> Date: Mon, 11 Mar 2024 07:27:43 -0500 Subject: [PATCH] Add metrics for indexer_last_get_block_height and indexer_last_save_block_height (#154) This adds `gauge` metrics for last height retrieved from RPC and saved to the database for easier monitoring. Similar to the `tendermint_consensus_latest_block_height{chain_id}` metric in the node, these also provide `chain_name` as a label. Example of what this adds to prometheus: ``` # TYPE indexer_last_save_block_height gauge indexer_last_save_block_height{chain_name="shielded_expedition"} 80144 # TYPE indexer_last_get_block_height gauge indexer_last_get_block_height{chain_name="shielded-expedition"} 80246 ``` --------- Co-authored-by: sleepy ramen --- src/bin/indexer.rs | 10 +++++++++- src/database.rs | 11 ++++++++--- src/indexer/mod.rs | 29 ++++++++++++++++++++++------- src/lib.rs | 2 ++ 4 files changed, 41 insertions(+), 11 deletions(-) diff --git a/src/bin/indexer.rs b/src/bin/indexer.rs index 4618e35..2eabeb6 100644 --- a/src/bin/indexer.rs +++ b/src/bin/indexer.rs @@ -59,6 +59,14 @@ async fn main() -> Result<(), Error> { #[cfg(feature = "prometheus")] start_metrics_server(cfg.prometheus_config()).await?; + let network = db.network.clone(); + info!("Starting indexer"); - start_indexing(db, cfg.indexer_config(), cfg.database.create_index).await + start_indexing( + db, + cfg.indexer_config(), + network.as_str(), + cfg.database.create_index, + ) + .await } diff --git a/src/database.rs b/src/database.rs index f69dac6..8d96bfd 100644 --- a/src/database.rs +++ b/src/database.rs @@ -40,7 +40,7 @@ use tracing::{debug, info, instrument}; use crate::{ DB_SAVE_BLOCK_COUNTER, DB_SAVE_BLOCK_DURATION, DB_SAVE_COMMIT_SIG_DURATION, - DB_SAVE_EVDS_DURATION, DB_SAVE_TXS_DURATION, MASP_ADDR, + DB_SAVE_EVDS_DURATION, DB_SAVE_TXS_DURATION, INDEXER_LAST_SAVE_BLOCK_HEIGHT, MASP_ADDR, }; use crate::tables::{ @@ -48,7 +48,7 @@ use crate::tables::{ get_create_evidences_table_query, get_create_transactions_table_query, }; -use metrics::{histogram, increment_counter}; +use metrics::{gauge, histogram, increment_counter}; const BLOCKS_TABLE_NAME: &str = "blocks"; const TX_TABLE_NAME: &str = "transactions"; @@ -60,7 +60,7 @@ const DATABASE_TIMEOUT: u64 = 60; pub struct Database { pool: Arc, // we use the network as the name of the schema to allow diffrent net on the same database - network: String, + pub network: String, } impl Database { @@ -303,6 +303,11 @@ impl Database { if res.is_ok() { // update our counter for processed blocks since service started. increment_counter!(DB_SAVE_BLOCK_COUNTER, &labels); + + // update the gauge indicating last block height saved. + gauge!(INDEXER_LAST_SAVE_BLOCK_HEIGHT, + block.header.height.value() as f64, + "chain_name" => self.network.clone()); } res diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index a4add30..6bb3fc3 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -35,7 +35,11 @@ const MAX_BLOCKS_IN_CHANNEL: usize = 100; type BlockInfo = (Block, block_results::Response); #[instrument(skip(client))] -async fn get_block(block_height: u32, client: &HttpClient) -> (Block, block_results::Response) { +async fn get_block( + block_height: u32, + chain_name: &str, + client: &HttpClient, +) -> (Block, block_results::Response) { loop { let height = Height::from(block_height); tracing::trace!(message = "Requesting block: ", block_height); @@ -60,6 +64,13 @@ async fn get_block(block_height: u32, client: &HttpClient) -> (Block, block_resu &labels ); + // update the gauge indicating last block height retrieved. + metrics::gauge!( + crate::INDEXER_LAST_GET_BLOCK_HEIGHT, + resp.block.header.height.value() as f64, + "chain_name" => chain_name.to_string(), + ); + // If we successfully retrieved a block we want to get the block result. // It is used to know if a transaction has been successfully or not. let block_results = get_block_results(height, client).await; @@ -152,12 +163,13 @@ async fn get_block_results( #[allow(clippy::let_with_type_underscore)] #[instrument(name = "Indexer::blocks_stream", skip(client, block))] -fn blocks_stream( +fn blocks_stream<'a>( block: u64, - client: &HttpClient, -) -> impl Stream + '_ { + chain_name: &'a str, + client: &'a HttpClient, +) -> impl Stream + 'a { futures::stream::iter(block..).then(move |i| async move { - timeout(Duration::from_secs(30), get_block(i as u32, client)) + timeout(Duration::from_secs(30), get_block(i as u32, chain_name, client)) .await .unwrap() }) @@ -173,6 +185,7 @@ fn blocks_stream( pub async fn start_indexing( db: Database, config: &IndexerConfig, + chain_name: &str, create_index: bool, ) -> Result<(), Error> { info!("***** Starting indexer *****"); @@ -222,7 +235,7 @@ pub async fn start_indexing( // Spaw block producer task, this could speed up saving blocks // because it does not need to wait for database to finish saving a block. let (mut rx, producer_handler) = - spawn_block_producer(current_height as _, client, producer_shutdown); + spawn_block_producer(current_height as _, chain_name, client, producer_shutdown); // Block consumer that stores block into the database while let Some(block) = rx.recv().await { @@ -265,6 +278,7 @@ pub async fn start_indexing( fn spawn_block_producer( current_height: u64, + chain_name: &str, client: HttpClient, producer_shutdown: Arc, ) -> (Receiver, JoinHandle>) { @@ -273,8 +287,9 @@ fn spawn_block_producer( tokio::sync::mpsc::channel(MAX_BLOCKS_IN_CHANNEL); // Spawn the task + let chain_name = chain_name.to_string(); let handler = tokio::spawn(async move { - let stream = blocks_stream(current_height as _, &client); + let stream = blocks_stream(current_height as _, chain_name.as_str(), &client); pin_mut!(stream); while let Some(block) = stream.next().await { diff --git a/src/lib.rs b/src/lib.rs index 4464335..5f234ea 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,5 +23,7 @@ const DB_SAVE_BLOCK_DURATION: &str = "db_save_block_duration"; const DB_SAVE_TXS_DURATION: &str = "db_save_transactions_duration"; const DB_SAVE_EVDS_DURATION: &str = "db_save_evidences_duration"; const DB_SAVE_COMMIT_SIG_DURATION: &str = "db_save_commit_sig_duration"; +const INDEXER_LAST_SAVE_BLOCK_HEIGHT: &str = "indexer_last_save_block_height"; +const INDEXER_LAST_GET_BLOCK_HEIGHT: &str = "indexer_last_get_block_height"; pub const MASP_ADDR: &str = "tnam1pcqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqzmefah";