Skip to content
This repository has been archived by the owner on Jun 19, 2024. It is now read-only.

Commit

Permalink
Add metrics for indexer_last_get_block_height and indexer_last_save_b…
Browse files Browse the repository at this point in the history
…lock_height (#154)

This adds `gauge` metrics for last height retrieved from RPC and saved
to the database for easier monitoring.

Similar to the `tendermint_consensus_latest_block_height{chain_id}`
metric in the node, these also provide `chain_name` as a label.

Example of what this adds to prometheus:
```
# TYPE indexer_last_save_block_height gauge
indexer_last_save_block_height{chain_name="shielded_expedition"} 80144

# TYPE indexer_last_get_block_height gauge
indexer_last_get_block_height{chain_name="shielded-expedition"} 80246
```

---------

Co-authored-by: sleepy ramen <[email protected]>
  • Loading branch information
joel-u410 and rllola authored Mar 11, 2024
1 parent b69a478 commit 8fc52be
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 11 deletions.
10 changes: 9 additions & 1 deletion src/bin/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ async fn main() -> Result<(), Error> {
#[cfg(feature = "prometheus")]
start_metrics_server(cfg.prometheus_config()).await?;

let network = db.network.clone();

info!("Starting indexer");
start_indexing(db, cfg.indexer_config(), cfg.database.create_index).await
start_indexing(
db,
cfg.indexer_config(),
network.as_str(),
cfg.database.create_index,
)
.await
}
11 changes: 8 additions & 3 deletions src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ use tracing::{debug, info, instrument};

use crate::{
DB_SAVE_BLOCK_COUNTER, DB_SAVE_BLOCK_DURATION, DB_SAVE_COMMIT_SIG_DURATION,
DB_SAVE_EVDS_DURATION, DB_SAVE_TXS_DURATION, MASP_ADDR,
DB_SAVE_EVDS_DURATION, DB_SAVE_TXS_DURATION, INDEXER_LAST_SAVE_BLOCK_HEIGHT, MASP_ADDR,
};

use crate::tables::{
get_create_block_table_query, get_create_commit_signatures_table_query,
get_create_evidences_table_query, get_create_transactions_table_query,
};

use metrics::{histogram, increment_counter};
use metrics::{gauge, histogram, increment_counter};

const BLOCKS_TABLE_NAME: &str = "blocks";
const TX_TABLE_NAME: &str = "transactions";
Expand All @@ -60,7 +60,7 @@ const DATABASE_TIMEOUT: u64 = 60;
pub struct Database {
pool: Arc<PgPool>,
// we use the network as the name of the schema to allow diffrent net on the same database
network: String,
pub network: String,
}

impl Database {
Expand Down Expand Up @@ -303,6 +303,11 @@ impl Database {
if res.is_ok() {
// update our counter for processed blocks since service started.
increment_counter!(DB_SAVE_BLOCK_COUNTER, &labels);

// update the gauge indicating last block height saved.
gauge!(INDEXER_LAST_SAVE_BLOCK_HEIGHT,
block.header.height.value() as f64,
"chain_name" => self.network.clone());
}

res
Expand Down
29 changes: 22 additions & 7 deletions src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ const MAX_BLOCKS_IN_CHANNEL: usize = 100;
type BlockInfo = (Block, block_results::Response);

#[instrument(skip(client))]
async fn get_block(block_height: u32, client: &HttpClient) -> (Block, block_results::Response) {
async fn get_block(
block_height: u32,
chain_name: &str,
client: &HttpClient,
) -> (Block, block_results::Response) {
loop {
let height = Height::from(block_height);
tracing::trace!(message = "Requesting block: ", block_height);
Expand All @@ -60,6 +64,13 @@ async fn get_block(block_height: u32, client: &HttpClient) -> (Block, block_resu
&labels
);

// update the gauge indicating last block height retrieved.
metrics::gauge!(
crate::INDEXER_LAST_GET_BLOCK_HEIGHT,
resp.block.header.height.value() as f64,
"chain_name" => chain_name.to_string(),
);

// If we successfully retrieved a block we want to get the block result.
// It is used to know if a transaction has been successfully or not.
let block_results = get_block_results(height, client).await;
Expand Down Expand Up @@ -152,12 +163,13 @@ async fn get_block_results(

#[allow(clippy::let_with_type_underscore)]
#[instrument(name = "Indexer::blocks_stream", skip(client, block))]
fn blocks_stream(
fn blocks_stream<'a>(
block: u64,
client: &HttpClient,
) -> impl Stream<Item = (Block, block_results::Response)> + '_ {
chain_name: &'a str,
client: &'a HttpClient,
) -> impl Stream<Item = (Block, block_results::Response)> + 'a {
futures::stream::iter(block..).then(move |i| async move {
timeout(Duration::from_secs(30), get_block(i as u32, client))
timeout(Duration::from_secs(30), get_block(i as u32, chain_name, client))
.await
.unwrap()
})
Expand All @@ -173,6 +185,7 @@ fn blocks_stream(
pub async fn start_indexing(
db: Database,
config: &IndexerConfig,
chain_name: &str,
create_index: bool,
) -> Result<(), Error> {
info!("***** Starting indexer *****");
Expand Down Expand Up @@ -222,7 +235,7 @@ pub async fn start_indexing(
// Spaw block producer task, this could speed up saving blocks
// because it does not need to wait for database to finish saving a block.
let (mut rx, producer_handler) =
spawn_block_producer(current_height as _, client, producer_shutdown);
spawn_block_producer(current_height as _, chain_name, client, producer_shutdown);

// Block consumer that stores block into the database
while let Some(block) = rx.recv().await {
Expand Down Expand Up @@ -265,6 +278,7 @@ pub async fn start_indexing(

fn spawn_block_producer(
current_height: u64,
chain_name: &str,
client: HttpClient,
producer_shutdown: Arc<AtomicBool>,
) -> (Receiver<BlockInfo>, JoinHandle<Result<(), Error>>) {
Expand All @@ -273,8 +287,9 @@ fn spawn_block_producer(
tokio::sync::mpsc::channel(MAX_BLOCKS_IN_CHANNEL);

// Spawn the task
let chain_name = chain_name.to_string();
let handler = tokio::spawn(async move {
let stream = blocks_stream(current_height as _, &client);
let stream = blocks_stream(current_height as _, chain_name.as_str(), &client);
pin_mut!(stream);

while let Some(block) = stream.next().await {
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,7 @@ const DB_SAVE_BLOCK_DURATION: &str = "db_save_block_duration";
const DB_SAVE_TXS_DURATION: &str = "db_save_transactions_duration";
const DB_SAVE_EVDS_DURATION: &str = "db_save_evidences_duration";
const DB_SAVE_COMMIT_SIG_DURATION: &str = "db_save_commit_sig_duration";
const INDEXER_LAST_SAVE_BLOCK_HEIGHT: &str = "indexer_last_save_block_height";
const INDEXER_LAST_GET_BLOCK_HEIGHT: &str = "indexer_last_get_block_height";

pub const MASP_ADDR: &str = "tnam1pcqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqzmefah";

0 comments on commit 8fc52be

Please sign in to comment.