Skip to content

Commit

Permalink
Indexer processor metrics improvements. (#469)
Browse files Browse the repository at this point in the history
  • Loading branch information
sitalkedia authored Jul 25, 2024
1 parent 312280b commit b11e05a
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 15 deletions.
20 changes: 10 additions & 10 deletions rust/processor/src/utils/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

use once_cell::sync::Lazy;
use prometheus::{
register_gauge_vec, register_int_counter, register_int_counter_vec, register_int_gauge_vec,
GaugeVec, IntCounter, IntCounterVec, IntGaugeVec,
register_gauge_vec, register_histogram_vec, register_int_counter, register_int_counter_vec,
register_int_gauge_vec, GaugeVec, HistogramVec, IntCounter, IntCounterVec, IntGaugeVec,
};

pub enum ProcessorStep {
Expand Down Expand Up @@ -188,8 +188,8 @@ pub static FETCHER_THREAD_CHANNEL_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
});

/// Overall processing time for a single batch of transactions (per task)
pub static SINGLE_BATCH_PROCESSING_TIME_IN_SECS: Lazy<GaugeVec> = Lazy::new(|| {
register_gauge_vec!(
pub static SINGLE_BATCH_PROCESSING_TIME_IN_SECS: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"indexer_processor_single_batch_processing_time_in_secs",
"Time taken to process a single batch of transactions",
&["processor_name", "task_index"]
Expand All @@ -198,8 +198,8 @@ pub static SINGLE_BATCH_PROCESSING_TIME_IN_SECS: Lazy<GaugeVec> = Lazy::new(|| {
});

/// Parsing time for a single batch of transactions
pub static SINGLE_BATCH_PARSING_TIME_IN_SECS: Lazy<GaugeVec> = Lazy::new(|| {
register_gauge_vec!(
pub static SINGLE_BATCH_PARSING_TIME_IN_SECS: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"indexer_processor_single_batch_parsing_time_in_secs",
"Time taken to parse a single batch of transactions",
&["processor_name", "task_index"]
Expand All @@ -208,8 +208,8 @@ pub static SINGLE_BATCH_PARSING_TIME_IN_SECS: Lazy<GaugeVec> = Lazy::new(|| {
});

/// DB insertion time for a single batch of transactions
pub static SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS: Lazy<GaugeVec> = Lazy::new(|| {
register_gauge_vec!(
pub static SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"indexer_processor_single_batch_db_insertion_time_in_secs",
"Time taken to insert to DB for a single batch of transactions",
&["processor_name", "task_index"]
Expand Down Expand Up @@ -246,8 +246,8 @@ pub static PARQUET_PROCESSOR_DATA_GAP_COUNT: Lazy<IntGaugeVec> = Lazy::new(|| {
});

/// GRPC latency.
pub static GRPC_LATENCY_BY_PROCESSOR_IN_SECS: Lazy<GaugeVec> = Lazy::new(|| {
register_gauge_vec!(
pub static GRPC_LATENCY_BY_PROCESSOR_IN_SECS: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"indexer_processor_grpc_latency_in_secs",
"GRPC latency observed by processor",
&["processor_name", "task_index"]
Expand Down
9 changes: 4 additions & 5 deletions rust/processor/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ use std::{
use tokio::task::JoinHandle;
use tracing::{debug, error, info};
use url::Url;

// this is how large the fetch queue should be. Each bucket should have a max of 80MB or so, so a batch
// of 50 means that we could potentially have at least 4.8GB of data in memory at any given time and that we should provision
// machines accordingly.
Expand Down Expand Up @@ -588,7 +587,7 @@ impl Worker {
// TODO: For these three, do an atomic thing, or ideally move to an async metrics collector!
GRPC_LATENCY_BY_PROCESSOR_IN_SECS
.with_label_values(&[processor_name, &task_index_str])
.set(time_diff_since_pb_timestamp_in_secs(
.observe(time_diff_since_pb_timestamp_in_secs(
end_txn_timestamp.as_ref().unwrap(),
));
LATEST_PROCESSED_VERSION
Expand Down Expand Up @@ -628,13 +627,13 @@ impl Worker {

SINGLE_BATCH_PROCESSING_TIME_IN_SECS
.with_label_values(&[processor_name, &task_index_str])
.set(processing_time);
.observe(processing_time);
SINGLE_BATCH_PARSING_TIME_IN_SECS
.with_label_values(&[processor_name, &task_index_str])
.set(processing_result.processing_duration_in_secs);
.observe(processing_result.processing_duration_in_secs);
SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS
.with_label_values(&[processor_name, &task_index_str])
.set(processing_result.db_insertion_duration_in_secs);
.observe(processing_result.db_insertion_duration_in_secs);

gap_detector_sender
.send(ProcessingResult::DefaultProcessingResult(
Expand Down

0 comments on commit b11e05a

Please sign in to comment.