Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Indexer processor metrics improvements. #469

Merged
merged 3 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions rust/processor/src/utils/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

use once_cell::sync::Lazy;
use prometheus::{
register_gauge_vec, register_int_counter, register_int_counter_vec, register_int_gauge_vec,
GaugeVec, IntCounter, IntCounterVec, IntGaugeVec,
register_gauge_vec, register_histogram_vec, register_int_counter, register_int_counter_vec,
register_int_gauge_vec, GaugeVec, HistogramVec, IntCounter, IntCounterVec, IntGaugeVec,
};

pub enum ProcessorStep {
Expand Down Expand Up @@ -147,6 +147,15 @@ pub static PROCESSED_BYTES_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});

pub static BATCH_SIZE: Lazy<HistogramVec> = Lazy::new(|| {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already have a metric like this. it's NUM_TRANSACTIONS_PROCESSED_COUNT

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a slight semantic difference because this is batch_end-batch_start but for all practical purpose, this is good. Removed the counter for now.

register_histogram_vec!(
"indexer_processor_batch_size",
"Histogram of the received batch size",
&["processor_name", "step", "message", "task_index"]
)
.unwrap()
});

/// The amount of time that a task spent waiting for a protobuf bundle of transactions
pub static PB_CHANNEL_FETCH_WAIT_TIME_SECS: Lazy<GaugeVec> = Lazy::new(|| {
register_gauge_vec!(
Expand Down Expand Up @@ -188,8 +197,8 @@ pub static FETCHER_THREAD_CHANNEL_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
});

/// Overall processing time for a single batch of transactions (per task)
pub static SINGLE_BATCH_PROCESSING_TIME_IN_SECS: Lazy<GaugeVec> = Lazy::new(|| {
register_gauge_vec!(
pub static SINGLE_BATCH_PROCESSING_TIME_IN_SECS: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"indexer_processor_single_batch_processing_time_in_secs",
"Time taken to process a single batch of transactions",
&["processor_name", "task_index"]
Expand All @@ -198,8 +207,8 @@ pub static SINGLE_BATCH_PROCESSING_TIME_IN_SECS: Lazy<GaugeVec> = Lazy::new(|| {
});

/// Parsing time for a single batch of transactions
pub static SINGLE_BATCH_PARSING_TIME_IN_SECS: Lazy<GaugeVec> = Lazy::new(|| {
register_gauge_vec!(
pub static SINGLE_BATCH_PARSING_TIME_IN_SECS: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"indexer_processor_single_batch_parsing_time_in_secs",
"Time taken to parse a single batch of transactions",
&["processor_name", "task_index"]
Expand All @@ -208,8 +217,8 @@ pub static SINGLE_BATCH_PARSING_TIME_IN_SECS: Lazy<GaugeVec> = Lazy::new(|| {
});

/// DB insertion time for a single batch of transactions
pub static SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS: Lazy<GaugeVec> = Lazy::new(|| {
register_gauge_vec!(
pub static SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"indexer_processor_single_batch_db_insertion_time_in_secs",
"Time taken to insert to DB for a single batch of transactions",
&["processor_name", "task_index"]
Expand Down Expand Up @@ -246,8 +255,8 @@ pub static PARQUET_PROCESSOR_DATA_GAP_COUNT: Lazy<IntGaugeVec> = Lazy::new(|| {
});

/// GRPC latency.
pub static GRPC_LATENCY_BY_PROCESSOR_IN_SECS: Lazy<GaugeVec> = Lazy::new(|| {
register_gauge_vec!(
pub static GRPC_LATENCY_BY_PROCESSOR_IN_SECS: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"indexer_processor_grpc_latency_in_secs",
"GRPC latency observed by processor",
&["processor_name", "task_index"]
Expand Down
17 changes: 11 additions & 6 deletions rust/processor/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::{
transaction_filter::TransactionFilter,
utils::{
counters::{
ProcessorStep, GRPC_LATENCY_BY_PROCESSOR_IN_SECS, LATEST_PROCESSED_VERSION,
ProcessorStep, BATCH_SIZE, GRPC_LATENCY_BY_PROCESSOR_IN_SECS, LATEST_PROCESSED_VERSION,
NUM_TRANSACTIONS_PROCESSED_COUNT, PB_CHANNEL_FETCH_WAIT_TIME_SECS,
PROCESSED_BYTES_COUNT, PROCESSOR_DATA_PROCESSED_LATENCY_IN_SECS,
PROCESSOR_DATA_RECEIVED_LATENCY_IN_SECS, PROCESSOR_ERRORS_COUNT,
Expand All @@ -55,7 +55,6 @@ use std::collections::HashSet;
use tokio::task::JoinHandle;
use tracing::{debug, error, info};
use url::Url;

// this is how large the fetch queue should be. Each bucket should have a max of 80MB or so, so a batch
// of 50 means that we could potentially have at least 4.8GB of data in memory at any given time and that we should provision
// machines accordingly.
Expand Down Expand Up @@ -574,7 +573,7 @@ impl Worker {
// TODO: For these three, do an atomic thing, or ideally move to an async metrics collector!
GRPC_LATENCY_BY_PROCESSOR_IN_SECS
.with_label_values(&[processor_name, &task_index_str])
.set(time_diff_since_pb_timestamp_in_secs(
.observe(time_diff_since_pb_timestamp_in_secs(
end_txn_timestamp.as_ref().unwrap(),
));
LATEST_PROCESSED_VERSION
Expand Down Expand Up @@ -603,6 +602,12 @@ impl Worker {
&task_index_str,
])
.inc_by(size_in_bytes as u64);
BATCH_SIZE
.with_label_values(&[processor_name, &task_index_str])
.observe(
(batch_last_txn_version - batch_first_txn_version) as f64,
);

NUM_TRANSACTIONS_PROCESSED_COUNT
.with_label_values(&[
processor_name,
Expand All @@ -614,13 +619,13 @@ impl Worker {

SINGLE_BATCH_PROCESSING_TIME_IN_SECS
.with_label_values(&[processor_name, &task_index_str])
.set(processing_time);
.observe(processing_time);
SINGLE_BATCH_PARSING_TIME_IN_SECS
.with_label_values(&[processor_name, &task_index_str])
.set(processing_result.processing_duration_in_secs);
.observe(processing_result.processing_duration_in_secs);
SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS
.with_label_values(&[processor_name, &task_index_str])
.set(processing_result.db_insertion_duration_in_secs);
.observe(processing_result.db_insertion_duration_in_secs);

gap_detector_sender
.send(ProcessingResult::DefaultProcessingResult(
Expand Down
Loading