diff --git a/kvdb-rocksdb/CHANGELOG.md b/kvdb-rocksdb/CHANGELOG.md index dbebb5087..da022f21a 100644 --- a/kvdb-rocksdb/CHANGELOG.md +++ b/kvdb-rocksdb/CHANGELOG.md @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog]. ## [Unreleased] - License changed from GPL3 to dual MIT/Apache2. [#342](https://github.com/paritytech/parity-common/pull/342) +- Added `get_statistics` method and `enable_statistics` config parameter. [#347](https://github.com/paritytech/parity-common/pull/347) ## [0.5.0] - 2019-02-05 - Bump parking_lot to 0.10. [#332](https://github.com/paritytech/parity-common/pull/332) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index 42ee1388c..ac8958919 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -167,6 +167,12 @@ pub struct DatabaseConfig { pub columns: u32, /// Specify the maximum number of info/debug log files to be kept. pub keep_log_file_num: i32, + /// Enable native RocksDB statistics. + /// Disabled by default. + /// + /// It can have a negative performance impact up to 10% according to + /// https://github.com/facebook/rocksdb/wiki/Statistics. + pub enable_statistics: bool, } impl DatabaseConfig { @@ -215,6 +221,7 @@ impl Default for DatabaseConfig { compaction: CompactionProfile::default(), columns: 1, keep_log_file_num: 1, + enable_statistics: false, } } } @@ -267,6 +274,8 @@ pub struct Database { config: DatabaseConfig, path: String, #[ignore_malloc_size_of = "insignificant"] + opts: Options, + #[ignore_malloc_size_of = "insignificant"] write_opts: WriteOptions, #[ignore_malloc_size_of = "insignificant"] read_opts: ReadOptions, @@ -304,6 +313,10 @@ fn is_corrupted(err: &Error) -> bool { fn generate_options(config: &DatabaseConfig) -> Options { let mut opts = Options::default(); + opts.set_report_bg_io_stats(true); + if config.enable_statistics { + opts.enable_statistics(); + } opts.set_use_fsync(false); opts.create_if_missing(true); opts.set_max_open_files(config.max_open_files); @@ -409,6 +422,7 @@ impl Database { flushing: RwLock::new((0..config.columns).map(|_| HashMap::new()).collect()), flushing_lock: Mutex::new(false), path: path.to_owned(), + opts, read_opts, write_opts, block_opts, @@ -715,6 +729,15 @@ impl Database { None => Ok(()), } } + + /// Get RocksDB statistics. + pub fn get_statistics(&self) -> HashMap { + if let Some(stats) = self.opts.get_statistics() { + stats::parse_rocksdb_stats(&stats) + } else { + HashMap::new() + } + } } // duplicate declaration of methods here to avoid trait import in certain existing cases @@ -755,6 +778,13 @@ impl KeyValueDB for Database { } fn io_stats(&self, kind: kvdb::IoStatsKind) -> kvdb::IoStats { + let rocksdb_stats = self.get_statistics(); + let cache_hit_count = rocksdb_stats.get("block.cache.hit").map(|s| s.count).unwrap_or(0u64); + let overall_stats = self.stats.overall(); + let old_cache_hit_count = overall_stats.raw.cache_hit_count; + + self.stats.tally_cache_hit_count(cache_hit_count - old_cache_hit_count); + let taken_stats = match kind { kvdb::IoStatsKind::Overall => self.stats.overall(), kvdb::IoStatsKind::SincePrevious => self.stats.since_previous(), @@ -767,7 +797,7 @@ impl KeyValueDB for Database { stats.transactions = taken_stats.raw.transactions; stats.bytes_written = taken_stats.raw.bytes_written; stats.bytes_read = taken_stats.raw.bytes_read; - + stats.cache_reads = taken_stats.raw.cache_hit_count; stats.started = taken_stats.started; stats.span = taken_stats.started.elapsed(); @@ -847,6 +877,7 @@ mod tests { compaction: CompactionProfile::default(), columns: 11, keep_log_file_num: 1, + enable_statistics: false, }; let db = Database::open(&config, tempdir.path().to_str().unwrap()).unwrap(); @@ -984,20 +1015,40 @@ mod tests { assert_eq!(c.memory_budget(), 45 * MB, "total budget is the sum of the column budget"); } + #[test] + fn test_stats_parser() { + let raw = r#"rocksdb.row.cache.hit COUNT : 1 +rocksdb.db.get.micros P50 : 2.000000 P95 : 3.000000 P99 : 4.000000 P100 : 5.000000 COUNT : 0 SUM : 15 +"#; + let stats = stats::parse_rocksdb_stats(raw); + assert_eq!(stats["row.cache.hit"].count, 1); + assert!(stats["row.cache.hit"].times.is_none()); + assert_eq!(stats["db.get.micros"].count, 0); + let get_times = stats["db.get.micros"].times.unwrap(); + assert_eq!(get_times.sum, 15); + assert_eq!(get_times.p50, 2.0); + assert_eq!(get_times.p95, 3.0); + assert_eq!(get_times.p99, 4.0); + assert_eq!(get_times.p100, 5.0); + } + #[test] fn rocksdb_settings() { const NUM_COLS: usize = 2; - let mut cfg = DatabaseConfig::with_columns(NUM_COLS as u32); + let mut cfg = DatabaseConfig { enable_statistics: true, ..DatabaseConfig::with_columns(NUM_COLS as u32) }; cfg.max_open_files = 123; // is capped by the OS fd limit (typically 1024) cfg.compaction.block_size = 323232; cfg.compaction.initial_file_size = 102030; cfg.memory_budget = [(0, 30), (1, 300)].iter().cloned().collect(); let db_path = TempDir::new("config_test").expect("the OS can create tmp dirs"); - let _db = Database::open(&cfg, db_path.path().to_str().unwrap()).expect("can open a db"); + let db = Database::open(&cfg, db_path.path().to_str().unwrap()).expect("can open a db"); let mut rocksdb_log = std::fs::File::open(format!("{}/LOG", db_path.path().to_str().unwrap())) .expect("rocksdb creates a LOG file"); let mut settings = String::new(); + let statistics = db.get_statistics(); + assert!(statistics.contains_key("block.cache.hit")); + rocksdb_log.read_to_string(&mut settings).unwrap(); // Check column count assert!(settings.contains("Options for column family [default]"), "no default col"); diff --git a/kvdb-rocksdb/src/stats.rs b/kvdb-rocksdb/src/stats.rs index 80fa85ce2..c028b1948 100644 --- a/kvdb-rocksdb/src/stats.rs +++ b/kvdb-rocksdb/src/stats.rs @@ -7,15 +7,68 @@ // except according to those terms. use parking_lot::RwLock; +use std::collections::HashMap; +use std::str::FromStr; use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering}; use std::time::Instant; +#[derive(Default, Clone, Copy)] pub struct RawDbStats { pub reads: u64, pub writes: u64, pub bytes_written: u64, pub bytes_read: u64, pub transactions: u64, + pub cache_hit_count: u64, +} + +#[derive(Default, Debug, Clone, Copy)] +pub struct RocksDbStatsTimeValue { + /// 50% percentile + pub p50: f64, + /// 95% percentile + pub p95: f64, + /// 99% percentile + pub p99: f64, + /// 100% percentile + pub p100: f64, + pub sum: u64, +} + +#[derive(Default, Debug, Clone, Copy)] +pub struct RocksDbStatsValue { + pub count: u64, + pub times: Option, +} + +pub fn parse_rocksdb_stats(stats: &str) -> HashMap { + stats.lines().map(|line| parse_rocksdb_stats_row(line.splitn(2, ' '))).collect() +} + +fn parse_rocksdb_stats_row<'a>(mut iter: impl Iterator) -> (String, RocksDbStatsValue) { + const PROOF: &str = "rocksdb statistics format is valid and hasn't changed"; + const SEPARATOR: &str = " : "; + let key = iter.next().expect(PROOF).trim_start_matches("rocksdb.").to_owned(); + let values = iter.next().expect(PROOF); + let value = if values.starts_with("COUNT") { + // rocksdb.row.cache.hit COUNT : 0 + RocksDbStatsValue { + count: u64::from_str(values.rsplit(SEPARATOR).next().expect(PROOF)).expect(PROOF), + times: None, + } + } else { + // rocksdb.db.get.micros P50 : 0.000000 P95 : 0.000000 P99 : 0.000000 P100 : 0.000000 COUNT : 0 SUM : 0 + let values: Vec<&str> = values.split_whitespace().filter(|s| *s != ":").collect(); + let times = RocksDbStatsTimeValue { + p50: f64::from_str(values.get(1).expect(PROOF)).expect(PROOF), + p95: f64::from_str(values.get(3).expect(PROOF)).expect(PROOF), + p99: f64::from_str(values.get(5).expect(PROOF)).expect(PROOF), + p100: f64::from_str(values.get(7).expect(PROOF)).expect(PROOF), + sum: u64::from_str(values.get(11).expect(PROOF)).expect(PROOF), + }; + RocksDbStatsValue { count: u64::from_str(values.get(9).expect(PROOF)).expect(PROOF), times: Some(times) } + }; + (key, value) } impl RawDbStats { @@ -26,6 +79,7 @@ impl RawDbStats { bytes_written: self.bytes_written + other.bytes_written, bytes_read: self.bytes_read + other.bytes_written, transactions: self.transactions + other.transactions, + cache_hit_count: self.cache_hit_count + other.cache_hit_count, } } } @@ -38,11 +92,7 @@ struct OverallDbStats { impl OverallDbStats { fn new() -> Self { - OverallDbStats { - stats: RawDbStats { reads: 0, writes: 0, bytes_written: 0, bytes_read: 0, transactions: 0 }, - last_taken: Instant::now(), - started: Instant::now(), - } + OverallDbStats { stats: RawDbStats::default(), last_taken: Instant::now(), started: Instant::now() } } } @@ -52,6 +102,7 @@ pub struct RunningDbStats { bytes_written: AtomicU64, bytes_read: AtomicU64, transactions: AtomicU64, + cache_hit_count: AtomicU64, overall: RwLock, } @@ -68,6 +119,7 @@ impl RunningDbStats { writes: 0.into(), bytes_written: 0.into(), transactions: 0.into(), + cache_hit_count: 0.into(), overall: OverallDbStats::new().into(), } } @@ -92,6 +144,10 @@ impl RunningDbStats { self.transactions.fetch_add(val, AtomicOrdering::Relaxed); } + pub fn tally_cache_hit_count(&self, val: u64) { + self.cache_hit_count.fetch_add(val, AtomicOrdering::Relaxed); + } + fn take_current(&self) -> RawDbStats { RawDbStats { reads: self.reads.swap(0, AtomicOrdering::Relaxed), @@ -99,6 +155,7 @@ impl RunningDbStats { bytes_written: self.bytes_written.swap(0, AtomicOrdering::Relaxed), bytes_read: self.bytes_read.swap(0, AtomicOrdering::Relaxed), transactions: self.transactions.swap(0, AtomicOrdering::Relaxed), + cache_hit_count: self.cache_hit_count.swap(0, AtomicOrdering::Relaxed), } } @@ -109,6 +166,7 @@ impl RunningDbStats { bytes_written: self.bytes_written.load(AtomicOrdering::Relaxed), bytes_read: self.bytes_read.load(AtomicOrdering::Relaxed), transactions: self.transactions.load(AtomicOrdering::Relaxed), + cache_hit_count: self.cache_hit_count.load(AtomicOrdering::Relaxed), } }