diff --git a/rust/processor/src/utils/counters.rs b/rust/processor/src/utils/counters.rs index f4a6a57e1..fa77e85a2 100644 --- a/rust/processor/src/utils/counters.rs +++ b/rust/processor/src/utils/counters.rs @@ -3,8 +3,8 @@ use once_cell::sync::Lazy; use prometheus::{ - register_gauge_vec, register_int_counter, register_int_counter_vec, register_int_gauge_vec, - GaugeVec, IntCounter, IntCounterVec, IntGaugeVec, + register_gauge_vec, register_histogram_vec, register_int_counter, register_int_counter_vec, + register_int_gauge_vec, GaugeVec, HistogramVec, IntCounter, IntCounterVec, IntGaugeVec, }; pub enum ProcessorStep { @@ -188,8 +188,8 @@ pub static FETCHER_THREAD_CHANNEL_SIZE: Lazy = Lazy::new(|| { }); /// Overall processing time for a single batch of transactions (per task) -pub static SINGLE_BATCH_PROCESSING_TIME_IN_SECS: Lazy = Lazy::new(|| { - register_gauge_vec!( +pub static SINGLE_BATCH_PROCESSING_TIME_IN_SECS: Lazy = Lazy::new(|| { + register_histogram_vec!( "indexer_processor_single_batch_processing_time_in_secs", "Time taken to process a single batch of transactions", &["processor_name", "task_index"] @@ -198,8 +198,8 @@ pub static SINGLE_BATCH_PROCESSING_TIME_IN_SECS: Lazy = Lazy::new(|| { }); /// Parsing time for a single batch of transactions -pub static SINGLE_BATCH_PARSING_TIME_IN_SECS: Lazy = Lazy::new(|| { - register_gauge_vec!( +pub static SINGLE_BATCH_PARSING_TIME_IN_SECS: Lazy = Lazy::new(|| { + register_histogram_vec!( "indexer_processor_single_batch_parsing_time_in_secs", "Time taken to parse a single batch of transactions", &["processor_name", "task_index"] @@ -208,8 +208,8 @@ pub static SINGLE_BATCH_PARSING_TIME_IN_SECS: Lazy = Lazy::new(|| { }); /// DB insertion time for a single batch of transactions -pub static SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS: Lazy = Lazy::new(|| { - register_gauge_vec!( +pub static SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS: Lazy = Lazy::new(|| { + register_histogram_vec!( "indexer_processor_single_batch_db_insertion_time_in_secs", "Time taken to insert to DB for a single batch of transactions", &["processor_name", "task_index"] @@ -246,8 +246,8 @@ pub static PARQUET_PROCESSOR_DATA_GAP_COUNT: Lazy = Lazy::new(|| { }); /// GRPC latency. -pub static GRPC_LATENCY_BY_PROCESSOR_IN_SECS: Lazy = Lazy::new(|| { - register_gauge_vec!( +pub static GRPC_LATENCY_BY_PROCESSOR_IN_SECS: Lazy = Lazy::new(|| { + register_histogram_vec!( "indexer_processor_grpc_latency_in_secs", "GRPC latency observed by processor", &["processor_name", "task_index"] diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index 7ccb9b61f..14b0662c8 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -55,7 +55,6 @@ use std::collections::HashSet; use tokio::task::JoinHandle; use tracing::{debug, error, info}; use url::Url; - // this is how large the fetch queue should be. Each bucket should have a max of 80MB or so, so a batch // of 50 means that we could potentially have at least 4.8GB of data in memory at any given time and that we should provision // machines accordingly. @@ -574,7 +573,7 @@ impl Worker { // TODO: For these three, do an atomic thing, or ideally move to an async metrics collector! GRPC_LATENCY_BY_PROCESSOR_IN_SECS .with_label_values(&[processor_name, &task_index_str]) - .set(time_diff_since_pb_timestamp_in_secs( + .observe(time_diff_since_pb_timestamp_in_secs( end_txn_timestamp.as_ref().unwrap(), )); LATEST_PROCESSED_VERSION @@ -614,13 +613,13 @@ impl Worker { SINGLE_BATCH_PROCESSING_TIME_IN_SECS .with_label_values(&[processor_name, &task_index_str]) - .set(processing_time); + .observe(processing_time); SINGLE_BATCH_PARSING_TIME_IN_SECS .with_label_values(&[processor_name, &task_index_str]) - .set(processing_result.processing_duration_in_secs); + .observe(processing_result.processing_duration_in_secs); SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS .with_label_values(&[processor_name, &task_index_str]) - .set(processing_result.db_insertion_duration_in_secs); + .observe(processing_result.db_insertion_duration_in_secs); gap_detector_sender .send(ProcessingResult::DefaultProcessingResult(