Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rocksdb] add metric for pending compaction bytes and memtable flushes #19112

Merged
merged 2 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion crates/typed-store/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ impl SamplingInterval {
pub struct ColumnFamilyMetrics {
pub rocksdb_total_sst_files_size: IntGaugeVec,
pub rocksdb_total_blob_files_size: IntGaugeVec,
pub rocksdb_current_size_active_mem_tables: IntGaugeVec,
pub rocksdb_size_all_mem_tables: IntGaugeVec,
pub rocksdb_num_snapshots: IntGaugeVec,
pub rocksdb_oldest_snapshot_time: IntGaugeVec,
Expand All @@ -86,13 +87,16 @@ pub struct ColumnFamilyMetrics {
pub rocksdb_block_cache_usage: IntGaugeVec,
pub rocksdb_block_cache_pinned_usage: IntGaugeVec,
pub rocksdb_estimate_table_readers_mem: IntGaugeVec,
pub rocksdb_num_immutable_mem_tables: IntGaugeVec,
pub rocksdb_mem_table_flush_pending: IntGaugeVec,
pub rocksdb_compaction_pending: IntGaugeVec,
pub rocksdb_estimate_pending_compaction_bytes: IntGaugeVec,
pub rocksdb_num_running_compactions: IntGaugeVec,
pub rocksdb_num_running_flushes: IntGaugeVec,
pub rocksdb_estimate_oldest_key_time: IntGaugeVec,
pub rocksdb_background_errors: IntGaugeVec,
pub rocksdb_estimated_num_keys: IntGaugeVec,
pub rocksdb_base_level: IntGaugeVec,
}

impl ColumnFamilyMetrics {
Expand All @@ -112,6 +116,13 @@ impl ColumnFamilyMetrics {
registry,
)
.unwrap(),
rocksdb_current_size_active_mem_tables: register_int_gauge_vec_with_registry!(
"rocksdb_current_size_active_mem_tables",
"The current approximate size of active memtable (bytes).",
&["cf_name"],
registry,
)
.unwrap(),
rocksdb_size_all_mem_tables: register_int_gauge_vec_with_registry!(
"rocksdb_size_all_mem_tables",
"The memory size occupied by the column family's in-memory buffer",
Expand Down Expand Up @@ -177,6 +188,13 @@ impl ColumnFamilyMetrics {
registry,
)
.unwrap(),
rocksdb_num_immutable_mem_tables: register_int_gauge_vec_with_registry!(
"rocksdb_num_immutable_mem_tables",
"The number of immutable memtables that have not yet been flushed.",
&["cf_name"],
registry,
)
.unwrap(),
rocksdb_mem_table_flush_pending: register_int_gauge_vec_with_registry!(
"rocksdb_mem_table_flush_pending",
"A 1 or 0 flag indicating whether a memtable flush is pending.
Expand All @@ -198,6 +216,14 @@ impl ColumnFamilyMetrics {
registry,
)
.unwrap(),
rocksdb_estimate_pending_compaction_bytes: register_int_gauge_vec_with_registry!(
"rocksdb_estimate_pending_compaction_bytes",
"Estimated total number of bytes compaction needs to rewrite to get all levels down
to under target size. Not valid for other compactions than level-based.",
&["cf_name"],
registry,
)
.unwrap(),
rocksdb_num_running_compactions: register_int_gauge_vec_with_registry!(
"rocksdb_num_running_compactions",
"The number of compactions that are currently running for the column family.",
Expand Down Expand Up @@ -234,7 +260,13 @@ impl ColumnFamilyMetrics {
registry,
)
.unwrap(),

rocksdb_base_level: register_int_gauge_vec_with_registry!(
"rocksdb_base_level",
"The number of level to which L0 data will be compacted.",
&["cf_name"],
registry,
)
.unwrap(),
}
}
}
Expand Down
36 changes: 34 additions & 2 deletions crates/typed-store/src/rocks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ impl MetricConf {
}
}
}
const CF_METRICS_REPORT_PERIOD_MILLIS: u64 = 1000;
const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
const METRICS_ERROR: i64 = -1;

/// An interface to a rocksDB database, keyed by a columnfamily
Expand Down Expand Up @@ -740,7 +740,7 @@ impl<K, V> DBMap<K, V> {
if !is_deprecated {
tokio::task::spawn(async move {
let mut interval =
tokio::time::interval(Duration::from_millis(CF_METRICS_REPORT_PERIOD_MILLIS));
tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
loop {
tokio::select! {
_ = interval.tick() => {
Expand Down Expand Up @@ -983,6 +983,14 @@ impl<K, V> DBMap<K, V> {
Self::get_int_property(rocksdb, &cf, ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE)
.unwrap_or(METRICS_ERROR),
);
db_metrics
.cf_metrics
.rocksdb_current_size_active_mem_tables
.with_label_values(&[cf_name])
.set(
Self::get_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
.unwrap_or(METRICS_ERROR),
);
db_metrics
.cf_metrics
.rocksdb_size_all_mem_tables
Expand Down Expand Up @@ -1063,6 +1071,14 @@ impl<K, V> DBMap<K, V> {
Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
.unwrap_or(METRICS_ERROR),
);
db_metrics
.cf_metrics
.rocksdb_num_immutable_mem_tables
.with_label_values(&[cf_name])
.set(
Self::get_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
.unwrap_or(METRICS_ERROR),
);
db_metrics
.cf_metrics
.rocksdb_mem_table_flush_pending
Expand All @@ -1079,6 +1095,14 @@ impl<K, V> DBMap<K, V> {
Self::get_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
.unwrap_or(METRICS_ERROR),
);
db_metrics
.cf_metrics
.rocksdb_estimate_pending_compaction_bytes
.with_label_values(&[cf_name])
.set(
Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_PENDING_COMPACTION_BYTES)
.unwrap_or(METRICS_ERROR),
);
db_metrics
.cf_metrics
.rocksdb_num_running_compactions
Expand Down Expand Up @@ -1111,6 +1135,14 @@ impl<K, V> DBMap<K, V> {
Self::get_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
.unwrap_or(METRICS_ERROR),
);
db_metrics
.cf_metrics
.rocksdb_base_level
.with_label_values(&[cf_name])
.set(
Self::get_int_property(rocksdb, &cf, properties::BASE_LEVEL)
.unwrap_or(METRICS_ERROR),
);
}

pub fn transaction(&self) -> Result<DBTransaction<'_>, TypedStoreError> {
Expand Down
Loading