Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Indexer processor metrics improvements. #469

Merged
merged 3 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions rust/processor/src/utils/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

use once_cell::sync::Lazy;
use prometheus::{
register_gauge_vec, register_int_counter, register_int_counter_vec, register_int_gauge_vec,
GaugeVec, IntCounter, IntCounterVec, IntGaugeVec,
register_gauge_vec, register_histogram_vec, register_int_counter, register_int_counter_vec,
register_int_gauge_vec, GaugeVec, HistogramVec, IntCounter, IntCounterVec, IntGaugeVec,
};

pub enum ProcessorStep {
Expand Down Expand Up @@ -188,8 +188,8 @@ pub static FETCHER_THREAD_CHANNEL_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
});

/// Overall processing time for a single batch of transactions (per task)
pub static SINGLE_BATCH_PROCESSING_TIME_IN_SECS: Lazy<GaugeVec> = Lazy::new(|| {
register_gauge_vec!(
pub static SINGLE_BATCH_PROCESSING_TIME_IN_SECS: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"indexer_processor_single_batch_processing_time_in_secs",
"Time taken to process a single batch of transactions",
&["processor_name", "task_index"]
Expand All @@ -198,8 +198,8 @@ pub static SINGLE_BATCH_PROCESSING_TIME_IN_SECS: Lazy<GaugeVec> = Lazy::new(|| {
});

/// Parsing time for a single batch of transactions
pub static SINGLE_BATCH_PARSING_TIME_IN_SECS: Lazy<GaugeVec> = Lazy::new(|| {
register_gauge_vec!(
pub static SINGLE_BATCH_PARSING_TIME_IN_SECS: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"indexer_processor_single_batch_parsing_time_in_secs",
"Time taken to parse a single batch of transactions",
&["processor_name", "task_index"]
Expand All @@ -208,8 +208,8 @@ pub static SINGLE_BATCH_PARSING_TIME_IN_SECS: Lazy<GaugeVec> = Lazy::new(|| {
});

/// DB insertion time for a single batch of transactions
pub static SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS: Lazy<GaugeVec> = Lazy::new(|| {
register_gauge_vec!(
pub static SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"indexer_processor_single_batch_db_insertion_time_in_secs",
"Time taken to insert to DB for a single batch of transactions",
&["processor_name", "task_index"]
Expand Down Expand Up @@ -246,8 +246,8 @@ pub static PARQUET_PROCESSOR_DATA_GAP_COUNT: Lazy<IntGaugeVec> = Lazy::new(|| {
});

/// GRPC latency.
pub static GRPC_LATENCY_BY_PROCESSOR_IN_SECS: Lazy<GaugeVec> = Lazy::new(|| {
register_gauge_vec!(
pub static GRPC_LATENCY_BY_PROCESSOR_IN_SECS: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"indexer_processor_grpc_latency_in_secs",
"GRPC latency observed by processor",
&["processor_name", "task_index"]
Expand Down
9 changes: 4 additions & 5 deletions rust/processor/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ use std::collections::HashSet;
use tokio::task::JoinHandle;
use tracing::{debug, error, info};
use url::Url;

// this is how large the fetch queue should be. Each bucket should have a max of 80MB or so, so a batch
// of 50 means that we could potentially have at least 4.8GB of data in memory at any given time and that we should provision
// machines accordingly.
Expand Down Expand Up @@ -574,7 +573,7 @@ impl Worker {
// TODO: For these three, do an atomic thing, or ideally move to an async metrics collector!
GRPC_LATENCY_BY_PROCESSOR_IN_SECS
.with_label_values(&[processor_name, &task_index_str])
.set(time_diff_since_pb_timestamp_in_secs(
.observe(time_diff_since_pb_timestamp_in_secs(
end_txn_timestamp.as_ref().unwrap(),
));
LATEST_PROCESSED_VERSION
Expand Down Expand Up @@ -614,13 +613,13 @@ impl Worker {

SINGLE_BATCH_PROCESSING_TIME_IN_SECS
.with_label_values(&[processor_name, &task_index_str])
.set(processing_time);
.observe(processing_time);
SINGLE_BATCH_PARSING_TIME_IN_SECS
.with_label_values(&[processor_name, &task_index_str])
.set(processing_result.processing_duration_in_secs);
.observe(processing_result.processing_duration_in_secs);
SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS
.with_label_values(&[processor_name, &task_index_str])
.set(processing_result.db_insertion_duration_in_secs);
.observe(processing_result.db_insertion_duration_in_secs);

gap_detector_sender
.send(ProcessingResult::DefaultProcessingResult(
Expand Down
Loading