From 4b385bbad9e0e0818902424a89f3c164b891a582 Mon Sep 17 00:00:00 2001 From: Don Brady Date: Fri, 7 Jan 2022 15:11:12 -0700 Subject: [PATCH] DOSE-522 add zcache iostat subcommand (#44) --- cmd/zfs_object_agent/Cargo.lock | 111 ++++- cmd/zfs_object_agent/util/Cargo.toml | 3 + cmd/zfs_object_agent/util/src/lib.rs | 3 +- cmd/zfs_object_agent/util/src/nicenum.rs | 71 ++- .../util/src/zettacache_stats.rs | 341 +++++++++++++ cmd/zfs_object_agent/zcache/Cargo.toml | 1 + cmd/zfs_object_agent/zcache/src/iostat.rs | 469 ++++++++++++++++++ cmd/zfs_object_agent/zcache/src/main.rs | 4 +- .../zcache/src/report_hits.rs | 6 +- cmd/zfs_object_agent/zettacache/Cargo.toml | 2 +- .../zettacache/src/block_access.rs | 99 +++- .../zettacache/src/block_based_log.rs | 33 +- .../zettacache/src/superblock.rs | 12 +- .../zettacache/src/zettacache.rs | 41 +- .../zettaobject/src/public_connection.rs | 29 ++ 15 files changed, 1167 insertions(+), 58 deletions(-) create mode 100644 cmd/zfs_object_agent/util/src/zettacache_stats.rs create mode 100644 cmd/zfs_object_agent/zcache/src/iostat.rs diff --git a/cmd/zfs_object_agent/Cargo.lock b/cmd/zfs_object_agent/Cargo.lock index 4db61db5b804..b0292ddfb2d4 100644 --- a/cmd/zfs_object_agent/Cargo.lock +++ b/cmd/zfs_object_agent/Cargo.lock @@ -43,7 +43,7 @@ version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" dependencies = [ - "winapi", + "winapi 0.3.9", ] [[package]] @@ -164,7 +164,7 @@ dependencies = [ "slab", "socket2", "waker-fn", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -284,7 +284,7 @@ checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" dependencies = [ "hermit-abi", "libc", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -464,7 +464,7 @@ dependencies = [ "num-traits 0.2.14", "serde 1.0.133", "time", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -679,7 +679,7 @@ checksum = "4ebda144c4fe02d1f7ea1a7d9641b6fc6b580adcfa024ae48797ecdeb6825b4d" dependencies = [ "libc", "redox_users", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -695,6 +695,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e893a7ba6116821058dec84a6fb14fb2a97cd8ce5fd0f85d5a4e760ecd7329d9" dependencies = [ "enum-map-derive", + "serde 1.0.133", ] [[package]] @@ -806,7 +807,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213" dependencies = [ "libc", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1059,7 +1060,7 @@ checksum = "3c731c3e10504cc8ed35cfe2f1db4c9274c3d35fa486e3b31df46f068ef3e867" dependencies = [ "libc", "match_cfg", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1179,6 +1180,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kernel32-sys" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" +dependencies = [ + "winapi 0.2.8", + "winapi-build", +] + [[package]] name = "kv-log-macro" version = "1.0.7" @@ -1269,7 +1280,7 @@ dependencies = [ "thiserror", "thread-id", "typemap", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1394,7 +1405,7 @@ dependencies = [ "log", "miow", "ntapi", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1403,7 +1414,7 @@ version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" dependencies = [ - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1471,7 +1482,7 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44" dependencies = [ - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1569,6 +1580,12 @@ dependencies = [ "libc", ] +[[package]] +name = "numtoa" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8f8bdf33df195859076e54ab11ee78a1b208382d3a26ec40d142ffc1ecc49ef" + [[package]] name = "nvpair" version = "0.5.1-delphix0" @@ -1676,7 +1693,7 @@ dependencies = [ "libc", "redox_syscall 0.2.10", "smallvec", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1713,7 +1730,7 @@ dependencies = [ "libc", "log", "wepoll-ffi", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1826,6 +1843,15 @@ dependencies = [ "bitflags", ] +[[package]] +name = "redox_termios" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8440d8acb4fd3d277125b4bd01a6f38aee8d814b3b5fc09b3f2b825d37d3fe8f" +dependencies = [ + "redox_syscall 0.2.10", +] + [[package]] name = "redox_users" version = "0.4.0" @@ -1859,7 +1885,7 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" dependencies = [ - "winapi", + "winapi 0.3.9", ] [[package]] @@ -2010,7 +2036,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75" dependencies = [ "lazy_static", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -2193,7 +2219,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5dc90fe6c7be1a323296982db1836d1ea9e47b6839496dde9a541bc496df3516" dependencies = [ "libc", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -2258,7 +2284,7 @@ dependencies = [ "ntapi", "once_cell", "rayon", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -2272,7 +2298,32 @@ dependencies = [ "rand", "redox_syscall 0.2.10", "remove_dir_all", - "winapi", + "winapi 0.3.9", +] + +[[package]] +name = "termion" +version = "1.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "077185e2eac69c3f8379a4298e1e07cd36beb962290d4a51199acf0fdc10607e" +dependencies = [ + "libc", + "numtoa", + "redox_syscall 0.2.10", + "redox_termios", +] + +[[package]] +name = "termsize" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e86d824a8e90f342ad3ef4bd51ef7119a9b681b0cc9f8ee7b2852f02ccd2517" +dependencies = [ + "atty", + "kernel32-sys", + "libc", + "termion", + "winapi 0.2.8", ] [[package]] @@ -2312,7 +2363,7 @@ checksum = "c7fbf4c9d56b320106cd64fd024dadfa0be7cb4706725fc44a7d7ce952d820c1" dependencies = [ "libc", "redox_syscall 0.1.57", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -2323,7 +2374,7 @@ checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" dependencies = [ "libc", "wasi", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -2342,7 +2393,7 @@ dependencies = [ "pin-project-lite", "signal-hook-registry", "tokio-macros", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -2485,15 +2536,18 @@ name = "util" version = "0.1.0" dependencies = [ "anyhow", + "arr_macro", "async-trait", "backtrace", "bytes", "chrono", "config", + "enum-map", "lazy_static", "log", "log4rs", "more-asserts", + "num-traits 0.2.14", "rand", "roaring", "serde 1.0.133", @@ -2645,6 +2699,12 @@ dependencies = [ "cc", ] +[[package]] +name = "winapi" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" + [[package]] name = "winapi" version = "0.3.9" @@ -2655,6 +2715,12 @@ dependencies = [ "winapi-x86_64-pc-windows-gnu", ] +[[package]] +name = "winapi-build" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" + [[package]] name = "winapi-i686-pc-windows-gnu" version = "0.4.0" @@ -2696,8 +2762,9 @@ dependencies = [ "more-asserts", "num-traits 0.2.14", "nvpair", - "serde 1.0.130", + "serde 1.0.133", "serde_json", + "termsize", "tokio", "util", ] diff --git a/cmd/zfs_object_agent/util/Cargo.toml b/cmd/zfs_object_agent/util/Cargo.toml index 149107b012c2..9e9e96997d41 100644 --- a/cmd/zfs_object_agent/util/Cargo.toml +++ b/cmd/zfs_object_agent/util/Cargo.toml @@ -7,15 +7,18 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] anyhow = "1.0" +arr_macro = "0.1.3" async-trait = "0.1.51" backtrace = "0.3" bytes = "1.0" chrono = "0.4.19" config = "0.11" +enum-map = { version = "1.1.1", features = ["serde"] } lazy_static = "1.4.0" log = "0.4" log4rs = "1.0.0" more-asserts = "0.2.1" +num-traits = "0.2.14" rand = "0.8.3" roaring = "0.7.0" serde = { version = "1.0.125", features = ["derive"] } diff --git a/cmd/zfs_object_agent/util/src/lib.rs b/cmd/zfs_object_agent/util/src/lib.rs index 7710dea677fc..2e284e3c8fc6 100644 --- a/cmd/zfs_object_agent/util/src/lib.rs +++ b/cmd/zfs_object_agent/util/src/lib.rs @@ -15,6 +15,7 @@ mod range_tree; mod tunable; mod vec_ext; mod zcache_devices; +pub mod zettacache_stats; pub use bitmap_range_iterator::BitmapRangeIterator; pub use btreemap_ext::iter_wrapping; @@ -24,7 +25,7 @@ pub use lock_set::LockSet; pub use lock_set::LockedItem; pub use logging::setup_logging; pub use mutex_ext::MutexExt; -pub use nicenum::nice_p2size; +pub use nicenum::{nice_number_count, nice_number_time, nice_p2size}; pub use range_tree::RangeTree; pub use tunable::get_tunable; pub use tunable::read_tunable_config; diff --git a/cmd/zfs_object_agent/util/src/nicenum.rs b/cmd/zfs_object_agent/util/src/nicenum.rs index a6bfb52372db..cf59585ced81 100644 --- a/cmd/zfs_object_agent/util/src/nicenum.rs +++ b/cmd/zfs_object_agent/util/src/nicenum.rs @@ -1,4 +1,6 @@ -// Functions which provide "nice" looking output for numbers +//! Functions which provide "nice" looking (human-readable) output for numbers. + +use std::time::Duration; /// Convert an arbitrary integer into a scaled byte length specification /// with up to 4 significant digits (returned string will be <= 6 characters). @@ -22,3 +24,70 @@ pub fn nice_p2size(number: u64) -> String { number, scaled ); } + +/// Format a counter value with up to 4 significant digits and the appropriate +/// unit scale. The returned string will fit in 5 characters. +pub fn nice_number_count(number: f64) -> String { + let mut scaled = number; + + for unit in ["", "K", "M", "G", "T", "P", "E", "Z", "Y"] { + let places = match scaled { + x if x < 10.0 => 2, + x if x < 100.0 => 1, + x if x < 1000.0 => 0, + _ => { + scaled /= 1000.0; + continue; + } + }; + if places > 0 { + // Because format!() does some rounding of floating point numbers + // (e.g: 9.999 with places == 2 will format to 10.00), + // we need to take steps to enforce our character limit output. + let scaled_string = &format!("{:.*}", places, scaled)[0..4]; + return format!("{}{}", scaled_string, unit); + } else { + return format!("{:.0}{}", scaled, unit); + } + } + // it isn't possible to get here. + panic!("Coding error encountered; original number: {}", number); +} + +/// Format a time **duration** with up to 4 significant digits and the appropriate +/// time unit between 'ns' up to 's' (seconds). The returned string will fit in +/// 5 characters unless the represented time is > 9,999 seconds. +pub fn nice_number_time(time: Duration) -> String { + let nanoseconds: u64 = time.as_nanos().try_into().unwrap(); + if nanoseconds == 0 { + // Don't print zero latencies since they're invalid + return String::from("-"); + } + let mut scaled: f64 = nanoseconds as f64; + let mut base = 1; + for unit in ["ns", "us", "ms", "s"] { + let mut places = match scaled { + x if x < 10.0 => 2, + x if x < 100.0 => 1, + x if x < 1000.0 => 0, + _ => { + if unit != "s" { + scaled /= 1000.0; + base *= 1000; + continue; + } else { + 0 + } + } + }; + // If time is an even multiple of the base, then display without any decimal precision. + if base > 1 && (nanoseconds % base) == 0 { + places = 0; + } + return format!("{:.*}{}", places, scaled, unit); + } + panic!( + "Coding error encountered; original number: {}, scaled value: {}", + nanoseconds, scaled + ); +} diff --git a/cmd/zfs_object_agent/util/src/zettacache_stats.rs b/cmd/zfs_object_agent/util/src/zettacache_stats.rs new file mode 100644 index 000000000000..d1764c469936 --- /dev/null +++ b/cmd/zfs_object_agent/util/src/zettacache_stats.rs @@ -0,0 +1,341 @@ +//! This module provides common zettacache stat structures that are collected by +//! the **zettacache** runtime and consumed by **zcache** subcommands. +//! These structures on the zettacache side are serialized and then deserialized +//! by the zettacache subcommands. + +// +// Note: For zettacache side (stats collection) all stat values need to be Atomic64 +// values. However, atomicity is not required in the consumer. These Atomic64 could +// be avoided in the consumer by defining matching structs built on u64 values and +// informing `serde_json::from_str()` of the expected types for deserialization. +// +// The simplicity of having only one set of structs seems to outweigh the cost of +// having to maintain two sets of structs and the minor inconvenience of having to +// dereference the Atomic values to access them. +// + +use crate::{nice_number_count, nice_number_time, nice_p2size}; +use arr_macro::arr; +use enum_map::{Enum, EnumMap}; +use num_traits::cast::ToPrimitive; +use serde::{Deserialize, Serialize}; +use std::fmt::{Display, Formatter}; +use std::ops::{AddAssign, Sub}; +use std::sync::atomic::Ordering::Relaxed; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::Duration; + +/// The zettacache disk I/O types that are collected and displayed for each disk. +#[derive(Debug, Enum, Copy, Clone, Serialize, Deserialize)] +pub enum DiskIoType { + ReadDataForLookup, + ReadIndexForLookup, + WriteDataForInsert, + MaintenanceRead, + MaintenanceWrite, + // Add any new I/O types here +} + +impl Display for DiskIoType { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +// +// The collected stat values can be one of StatCount, StatBytes, or StatLatency. +// Each stat is backed by an AtomicU64 and in zcache command each stat value type +// has a unique way to display itself (via StatValueTrait). Since stat values are +// a structure they need to implement clone, add_assign, and SubAssign. However, +// a display function is not required since that is handled by StatValueTrait. +// + +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct StatCount(pub AtomicU64); + +impl StatCount { + pub fn display_pretty(&self, scale: Option) { + let mut value = self.0.load(Relaxed) as f64; + if let Some(s) = scale { + value *= s; + } + let nice_value = if value == 0.0 || (scale.is_none() && value < 999.0) { + self.0.load(Relaxed).to_string() + } else { + nice_number_count(value) + }; + + // right aligned for a width of 6, padded with 2 spaces + print!("{:>6} ", nice_value); + } +} + +impl Clone for StatCount { + fn clone(&self) -> Self { + StatCount(AtomicU64::new(self.0.load(Ordering::Relaxed))) + } +} + +impl AddAssign<&Self> for StatCount { + fn add_assign(&mut self, other: &Self) { + self.0.fetch_add(other.0.load(Relaxed), Relaxed); + } +} + +impl AddAssign for StatCount { + fn add_assign(&mut self, other: u64) { + self.0.fetch_add(other, Relaxed); + } +} + +impl Sub for &StatCount { + type Output = StatCount; + + fn sub(self, other: Self) -> Self::Output { + StatCount(AtomicU64::new(self.0.load(Relaxed) - other.0.load(Relaxed))) + } +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct StatBytes(pub AtomicU64); + +impl StatBytes { + pub fn display_pretty(&self, scale: Option) { + let mut value = self.0.load(Relaxed) as f64; + if let Some(s) = scale { + value *= s; + } + let nice_value = if value < 1.0 { + // Intentionally avoid displaying "0B" when 0 + String::from("0") + } else { + nice_p2size(value.round().to_u64().unwrap()) + }; + // right aligned for a width of 6, padded with 2 spaces + print!("{:>6} ", nice_value); + } +} + +impl Clone for StatBytes { + fn clone(&self) -> Self { + StatBytes(AtomicU64::new(self.0.load(Ordering::Relaxed))) + } +} + +impl AddAssign<&Self> for StatBytes { + fn add_assign(&mut self, other: &Self) { + self.0.fetch_add(other.0.load(Relaxed), Relaxed); + } +} + +impl Sub for &StatBytes { + type Output = StatBytes; + + fn sub(self, other: Self) -> Self::Output { + StatBytes(AtomicU64::new(self.0.load(Relaxed) - other.0.load(Relaxed))) + } +} + +#[derive(Debug)] +pub struct StatLatency(pub Duration); + +impl StatLatency { + pub fn display_pretty(&self) { + // right aligned for a width of 6, padded with 2 spaces + print!("{:>6} ", nice_number_time(self.0)); + } +} + +// +// There are two types of histogram stats: LatencyHistogram and RequestHistogram +// + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct LatencyHistogram(pub [StatCount; LatencyHistogram::BUCKETS]); + +impl LatencyHistogram { + pub const BUCKETS: usize = 28; +} + +impl Default for LatencyHistogram { + fn default() -> Self { + LatencyHistogram(arr![StatCount::default(); 28]) + } +} + +impl Sub for &LatencyHistogram { + type Output = LatencyHistogram; + + fn sub(self, other: Self) -> Self::Output { + let mut difference = LatencyHistogram::default(); + for i in 0..self.0.len() { + difference.0[i] = &self.0[i] - &other.0[i]; + } + difference + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct RequestHistogram(pub [StatCount; RequestHistogram::BUCKETS]); +impl RequestHistogram { + pub const BUCKETS: usize = 16; +} +impl Default for RequestHistogram { + fn default() -> Self { + RequestHistogram(arr![StatCount::default(); 16]) + } +} +impl Sub for &RequestHistogram { + type Output = RequestHistogram; + + fn sub(self, other: Self) -> Self::Output { + let mut difference = RequestHistogram::default(); + for i in 0..self.0.len() { + difference.0[i] = &self.0[i] - &other.0[i]; + } + difference + } +} + +/// Collected I/O stat values, one for each DiskIoType. +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct IoStatValues { + pub operations: StatCount, + pub total_bytes: StatBytes, + pub active_count: StatCount, + pub total_nanoseconds: StatCount, + pub latency_histogram: LatencyHistogram, + pub request_histogram: RequestHistogram, +} + +impl AddAssign<&Self> for IoStatValues { + fn add_assign(&mut self, other: &Self) { + self.operations += &other.operations; + self.total_bytes += &other.total_bytes; + self.active_count += &other.active_count; + self.total_nanoseconds += &other.total_nanoseconds; + + for (s, o) in self + .latency_histogram + .0 + .iter_mut() + .zip(other.latency_histogram.0.iter()) + { + *s += o; + } + for (s, o) in self + .request_histogram + .0 + .iter_mut() + .zip(other.request_histogram.0.iter()) + { + *s += o; + } + } +} + +impl Sub for &IoStatValues { + type Output = IoStatValues; + + fn sub(self, other: Self) -> Self::Output { + IoStatValues { + operations: &self.operations - &other.operations, + total_bytes: &self.total_bytes - &other.total_bytes, + active_count: self.active_count.clone(), + total_nanoseconds: &self.total_nanoseconds - &other.total_nanoseconds, + latency_histogram: &self.latency_histogram - &other.latency_histogram, + request_histogram: &self.request_histogram - &other.request_histogram, + } + } +} + +/// The collection of IoStatValues for each disk +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct DiskIoStats { + pub name: String, + pub stats: EnumMap, +} + +impl DiskIoStats { + pub fn new(name: String) -> DiskIoStats { + DiskIoStats { + name, + stats: Default::default(), + } + } +} + +impl AddAssign<&Self> for DiskIoStats { + fn add_assign(&mut self, other: &Self) { + for (s, o) in self.stats.iter_mut().zip(other.stats.iter()) { + *s.1 += o.1; + } + } +} + +impl Sub for &DiskIoStats { + type Output = DiskIoStats; + + fn sub(self, other: Self) -> Self::Output { + let mut difference = DiskIoStats::new(self.name.clone()); + + for (((_, self_values), (_, other_values)), (_, diff_values)) in self + .stats + .iter() + .zip(other.stats.iter()) + .zip(difference.stats.iter_mut()) + { + *diff_values = self_values - other_values; + } + + difference + } +} + +/// A snapshot of the I/O stats collected from the zettacache. +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct IoStats { + pub timestamp: Duration, + pub disk_stats: Vec, +} + +#[derive(Debug, Default, Serialize)] +pub struct IoStatsRef<'a> { + pub timestamp: Duration, + pub disk_stats: Vec<&'a DiskIoStats>, +} + +impl Sub for &IoStats { + type Output = IoStats; + + /// Subtract two IoStats. Used to create the net values between two IoStats samples. + fn sub(self, other: Self) -> IoStats { + if other.disk_stats.is_empty() { + return self.clone(); + } + + let mut difference = IoStats { + timestamp: self.timestamp - other.timestamp, + ..Default::default() + }; + + assert_eq!(self.disk_stats.len(), other.disk_stats.len()); + for (self_stat, other_stat) in self.disk_stats.iter().zip(other.disk_stats.iter()) { + assert_eq!(self_stat.name, other_stat.name); + difference.disk_stats.push(self_stat - other_stat); + } + + difference + } +} + +impl IoStats { + pub fn max_name_len(&self) -> usize { + self.disk_stats + .iter() + .max_by_key(|stats| stats.name.len()) + .unwrap() + .name + .len() + } +} diff --git a/cmd/zfs_object_agent/zcache/Cargo.toml b/cmd/zfs_object_agent/zcache/Cargo.toml index 04ae9cbb2778..5398d1ffe2c0 100644 --- a/cmd/zfs_object_agent/zcache/Cargo.toml +++ b/cmd/zfs_object_agent/zcache/Cargo.toml @@ -19,5 +19,6 @@ num-traits = "0.2.14" nvpair = { git = "https://github.com/ahrens/rust-libzfs", branch = "work"} serde = { version = "1.0.125", features = ["derive"] } serde_json = "1.0.64" +termsize = "0.1.6" tokio = { version = "1.4", features = ["full"] } util = { path = "../util" } diff --git a/cmd/zfs_object_agent/zcache/src/iostat.rs b/cmd/zfs_object_agent/zcache/src/iostat.rs new file mode 100644 index 000000000000..b2eac77acee4 --- /dev/null +++ b/cmd/zfs_object_agent/zcache/src/iostat.rs @@ -0,0 +1,469 @@ +//! The iostat subcommand for zcache. + +use crate::remote_channel::{RemoteChannel, RemoteError}; +use crate::subcommand::ZcacheSubCommand; +use anyhow::Result; +use async_trait::async_trait; +use chrono::Local; +use clap::{Arg, SubCommand}; +use log::*; +use std::cmp::max; +use std::io::{self, Write}; +use std::sync::atomic::Ordering::Relaxed; +use std::thread::sleep; +use std::time::Duration; +use util::zettacache_stats::*; +use util::{nice_number_time, nice_p2size}; + +static NAME: &str = "iostat"; +static REQUEST: &str = "zcache_iostat"; + +struct IoStatDisplay { + show_active: bool, + show_devices: bool, + show_time: bool, + interval: Option, + count: Option, + max_name_length: usize, // used for the device name column + histogram_name: Option, +} + +impl IoStatDisplay { + const TIME_WIDTH: usize = 10; + const VALUE_WIDTH: usize = 6; + const HEADER_HEIGHT: usize = 3; + + fn display_dashes(width: usize) { + print!("{0:-<1$} ", "-", width); + } + + /// Display centered column group titles, such as 'Lookup: Read-Data' + fn display_header(&self, titles: Vec<&str>, columns_per_group: usize) { + if self.show_time { + print!("{:^1$} ", "timestamp", IoStatDisplay::TIME_WIDTH); + } + if self.show_devices { + print!("{:^1$} ", "zcache", self.max_name_length); + } + let column_group_width = (columns_per_group * (IoStatDisplay::VALUE_WIDTH + 2)) - 2; + for title in titles { + print!("{:^1$} ", title, column_group_width); + } + println!(); + } + + /// Print a line comprised of a set of headers repeated `copies` times. + /// Also prints another line with a dashed line below each column. + fn display_column(&self, headers: Vec<&str>, copies: usize) { + let last_group = copies - 1; + + // First print column headers + if self.show_time { + // Note: we use the date as the header over the timestamp column + print!("{} ", Local::now().format("%Y-%m-%d")); + } + if self.show_devices { + print!("{:^1$} ", "device", self.max_name_length); + } + for cg in 0..copies { + for h in &headers { + let spacing: usize = if h.len() > IoStatDisplay::VALUE_WIDTH { + // Adjust spacing to accommodate column headers that are > 6, like 'latency' + h.len() - IoStatDisplay::VALUE_WIDTH + } else { + 2 // default is two spaces + }; + print!("{0:^1$}{2:3$}", h, IoStatDisplay::VALUE_WIDTH, "", spacing); + } + if cg != last_group { + print!(" "); // Separate column groups by two additional spaces + } + } + println!(); + + // Now print dashes underneath each column + if self.show_time { + IoStatDisplay::display_dashes(IoStatDisplay::TIME_WIDTH); + } + if self.show_devices { + IoStatDisplay::display_dashes(self.max_name_length); + } + for cg in 0..copies { + for _h in 0..headers.len() { + IoStatDisplay::display_dashes(IoStatDisplay::VALUE_WIDTH); + } + if cg != last_group { + print!(" "); // Separate column groups by 2 additional spaces + } + } + println!(); + } + + /// Display 3 rows worth of headers which includes a row of dashes below each column + /// A typical column group looks like: + /// ``` + /// Lookup: Read-Data + /// iops amount latency + /// ------ ------ ------ + /// ``` + fn display_all_headers(&self) { + let titles = vec![ + "Lookup: Read-Data", + "Lookup: Read-Index", + "Insert: Write-Data", + "Maintenance: Read", + "Maintenance: Write", + ]; + let column_groups = titles.len(); + + let mut columns = vec!["iops", "amount", "latency"]; + if self.show_active { + columns.push("active") + } + + self.display_header(titles, columns.len()); + self.display_column(columns, column_groups); + } + + fn iterations_per_header(&self, device_count: usize) -> u64 { + let mut terminal_height = max( + IoStatDisplay::HEADER_HEIGHT + device_count + 1, + match termsize::get() { + None => 24, + Some(size) => size.rows as usize, + }, + ); + + terminal_height -= IoStatDisplay::HEADER_HEIGHT; + if self.show_devices { + // With devices there will be device_count + 1 rows instead of one + terminal_height /= device_count + 1; + } + terminal_height as u64 + } + + fn display_one_row(&self, disk: &DiskIoStats, elapsed: Duration) { + // Some values are scaled to match the interval time + let scale = if elapsed.as_nanos() == 0 { + 1.0 + } else { + 1_000_000_000.0 / elapsed.as_nanos() as f64 + }; + + for stat_values in disk.stats.values() { + stat_values.operations.display_pretty(Some(scale)); + stat_values.total_bytes.display_pretty(Some(scale)); + + let nanoseconds = &stat_values.total_nanoseconds.0.load(Relaxed); + let operations = &stat_values.operations.0.load(Relaxed); + StatLatency(Duration::from_nanos( + nanoseconds.checked_div(*operations).unwrap_or_default(), + )) + .display_pretty(); + + // active count is only displayed if requested + if self.show_active { + stat_values.active_count.display_pretty(None); + } + print!(" "); // Note we pad with two additional spaces between groups + } + println!(); + } + + /// Display the default iostat output. + fn display_iostat_default(&self, iteration: u64, stat_delta: &IoStats) { + // Periodically display the column headers + if (iteration % self.iterations_per_header(stat_delta.disk_stats.len())) == 0 { + debug!( + "collected stats interval {}", + nice_number_time(stat_delta.timestamp) + ); + self.display_all_headers(); + } + + for (i, disk_stats) in stat_delta.disk_stats.iter().enumerate() { + if self.show_time { + let time = if i == 0 { + // Note: only display time here since the date is in the column header + format!("{}", Local::now().format("%H:%M:%S")) + } else { + // Show the time once (above) in 'summary' row, but not with each device row + String::from("") + }; + print!("{:>1$} ", time, IoStatDisplay::TIME_WIDTH); + } + if self.show_devices { + let (width, indent) = if i == 0 { + (self.max_name_length, "") + } else { + (self.max_name_length - 2, " ") + }; + print!("{}{:<2$} ", indent, disk_stats.name, width); + } + + self.display_one_row(disk_stats, stat_delta.timestamp); + if !self.show_devices { + break; + } + } + if self.show_devices { + println!() + } + } + + fn display_histogram_headers(&self, name: &str, headers: Vec<&str>) { + print!("{:<1$} ", name, self.max_name_length); + + for column in headers { + print!("{:^1$}", column, IoStatDisplay::VALUE_WIDTH + 2); + } + println!(); + } + + /// Print the histogram iostat output. + fn display_iostat_histogram(&self, histogram_name: &str, stat_delta: &IoStats) { + for disk_stat in stat_delta.disk_stats.iter() { + if self.show_time { + println!("{}", Local::now().format("%Y-%m-%d %H:%M:%S UTC")); + } + println!(); + let device_name = if self.show_devices { + &disk_stat.name + } else { + "" + }; + self.display_histogram_headers( + device_name, + vec!["lookup", "lookup", "insert", "maint", "maint"], + ); + self.display_histogram_headers( + histogram_name, + vec!["data", "index", "data", "read", "write"], + ); + self.display_histogram_headers( + &format!("{:-<1$}", "-", self.max_name_length), + vec!["------"; 5], + ); + + if histogram_name.eq("latency") { + for j in 0..LatencyHistogram::BUCKETS { + // First display each bucket name + // Use ending range of bucket for latency (first bucket is 1us) + let nice_value = nice_number_time(Duration::from_nanos((1024 << j) - 1)); + print!("{:>1$} ", nice_value, self.max_name_length); + + // Then display bucket values for each disk io type (total of 5) + for v in disk_stat.stats.values() { + let count = &v.latency_histogram.0[j]; + count.display_pretty(None); + } + println!(); + } + } else { + for j in 0..RequestHistogram::BUCKETS { + // First display each bucket name + // Use starting range of bucket for request sizes (first bucket is 512B) + let nice_value = nice_p2size(512 << j); + print!("{:>1$} ", nice_value, self.max_name_length); + + // Then display bucket values for each disk io type (total of 5) + for v in disk_stat.stats.values() { + let count = &v.request_histogram.0[j]; + count.display_pretty(None); + } + println!(); + } + } + + // Print a line of dashes after each histogram + println!( + "{:-<1$}", + "-", + self.max_name_length + ((IoStatDisplay::VALUE_WIDTH + 2) * disk_stat.stats.len()) + ); + if !self.show_devices { + break; + } + } + } + + async fn display_io_stats(&mut self) -> Result<()> { + let mut iteration = 0; + let mut previous = IoStats::default(); // place holder empty stats + + let mut remote = RemoteChannel::new(false).await?; + // TODO need to handle an agent restart (currently stops with signal SIGPIPE) + + loop { + let latest = match remote.call(REQUEST, None).await { + Ok(response) => { + let io_stats_json = response.lookup_string("iostats_json")?; + let mut latest: IoStats = serde_json::from_str(io_stats_json.to_str()?)?; + + if self.show_devices { + // +2 on device names to account for indenting devices under 'summary' + self.max_name_length = max(self.max_name_length, latest.max_name_len() + 2); + self.max_name_length = max(self.max_name_length, "summary".len()); + } + + // Create a summary disk stat of all the devices + IoStatDisplay::insert_summary_disk(&mut latest); + + debug!("iostats_json: {:?}", latest); + latest + } + Err(RemoteError::ResultError(_)) => { + println!("No cache found?"); + continue; + } + Err(RemoteError::Other(e)) => { + println!("remote call error: {}", e); + // typically something like "Connection reset by peer (os error 104)" + return Err(e); + } + }; + + let delta = &latest - &previous; + + match &self.histogram_name { + None => self.display_iostat_default(iteration, &delta), + Some(name) => self.display_iostat_histogram(name, &delta), + } + + // Flush stdout in case output is redirected to a file + io::stdout().flush()?; + + iteration += 1; + let interval: Duration = match self.interval { + None => return Ok(()), + Some(interval) => interval, + }; + + if self.interval.is_none() { + return Ok(()); + } + if let Some(count) = self.count { + if iteration >= count { + return Ok(()); + } + } + + previous = latest; + sleep(interval); + } + } + + fn insert_summary_disk(disk_stats: &mut IoStats) { + let mut summary = DiskIoStats::new("summary".to_string()); + + for disk in &disk_stats.disk_stats { + summary += disk; + } + disk_stats.disk_stats.insert(0, summary); + } +} + +pub struct IoStat; + +#[async_trait] +impl ZcacheSubCommand for IoStat { + fn subcommand(&self) -> clap::App<'static, 'static> { + fn valid_interval(value: String) -> Result<(), String> { + match value.parse::() { + Ok(_) => Ok(()), + Err(e) => Err(e.to_string()), + } + } + + fn valid_count(value: String) -> Result<(), String> { + match value.parse::() { + Ok(_) => Ok(()), + Err(e) => Err(e.to_string()), + } + } + + SubCommand::with_name(NAME) + .about("Display I/O statistics.") + .arg( + Arg::with_name("active") + .long("active") + .short("a") + .help("Include active queue statistics") + .conflicts_with("latency-histogram") + .conflicts_with("request-size-histogram"), + ) + .arg( + Arg::with_name("devices") + .long("devices") + .short("d") + .help("Reports the statistics for individual devices in the zettacache"), + ) + .arg( + Arg::with_name("latency-histogram") + .long("latency-histogram") + .short("l") + .help("Display latency histograms") + .conflicts_with("request-size-histogram"), + ) + .arg( + Arg::with_name("request-size-histogram") + .long("request-size-histogram") + .short("r") + .help("Display request size histograms for each I/O type"), + ) + .arg( + Arg::with_name("timestamp") + .long("timestamp") + .short("t") + .help("Display a timestamp on each line of iostats"), + ) + .arg( + Arg::with_name("interval") + .validator(valid_interval) + .help("Statistics are printed every interval seconds"), + ) + .arg( + Arg::with_name("count") + .validator(valid_count) + .help("Stop after count reports have been displayed"), + ) + } + + fn name(&self) -> String { + NAME.to_string() + } + + async fn invoke(&mut self, args: &clap::ArgMatches) -> Result<()> { + let interval = args + .value_of("interval") + .map(|interval| Duration::from_secs_f64(interval.parse().unwrap_or(0.0))); + let count = args + .value_of("count") + .map(|count| count.parse().unwrap_or(0)); + + let histogram_name = if args.is_present("latency-histogram") { + Some("latency".to_string()) + } else if args.is_present("request-size-histogram") { + Some("req-size".to_string()) + } else { + None + }; + + let max_name_length = histogram_name + .as_ref() + .map(|name| name.len()) + .unwrap_or_default(); + + IoStatDisplay { + show_time: args.is_present("timestamp"), + show_active: args.is_present("active"), + show_devices: args.is_present("devices"), + max_name_length, + histogram_name, + interval, + count, + } + .display_io_stats() + .await + } +} diff --git a/cmd/zfs_object_agent/zcache/src/main.rs b/cmd/zfs_object_agent/zcache/src/main.rs index 0f43b9870a69..3493d9be4010 100644 --- a/cmd/zfs_object_agent/zcache/src/main.rs +++ b/cmd/zfs_object_agent/zcache/src/main.rs @@ -3,6 +3,7 @@ #![warn(clippy::cast_possible_wrap)] #![warn(clippy::cast_sign_loss)] mod clear_hit_data; +mod iostat; mod list_devices; mod remote_channel; mod report_hits; @@ -12,6 +13,7 @@ use anyhow::Result; use clap::AppSettings; use clap::Arg; use clear_hit_data::ClearHitData; +use iostat::IoStat; use list_devices::ListDevices; use log::*; use report_hits::ReportHits; @@ -40,6 +42,7 @@ async fn async_main() -> Result<()> { Box::new(ClearHitData), Box::new(ListDevices), Box::new(ReportHits), + Box::new(IoStat), ]; // Define global command arguments @@ -59,7 +62,6 @@ async fn async_main() -> Result<()> { Arg::with_name("log-file") .requires("verbose") .global(true) - .short("l") .long("log-file") .value_name("FILE") .help("File to log debugging output to") diff --git a/cmd/zfs_object_agent/zcache/src/report_hits.rs b/cmd/zfs_object_agent/zcache/src/report_hits.rs index 7e185e8b0ec2..0f98dff6841c 100644 --- a/cmd/zfs_object_agent/zcache/src/report_hits.rs +++ b/cmd/zfs_object_agent/zcache/src/report_hits.rs @@ -130,8 +130,8 @@ impl ZcacheSubCommand for ReportHits { .help("Divide report data into this many buckets"), ) .arg( - Arg::with_name("non_cumulative") - .long("non_cumulative") + Arg::with_name("non-cumulative") + .long("non-cumulative") .short("n") .help("Don't accumulate hits from previous quantiles"), ) @@ -143,7 +143,7 @@ impl ZcacheSubCommand for ReportHits { async fn invoke(&mut self, args: &clap::ArgMatches) -> Result<()> { let quantiles = args.value_of("quantiles").unwrap().parse()?; - let cumulative = !args.is_present("non_cumulative"); + let cumulative = !args.is_present("non-cumulative"); let mut remote = RemoteChannel::new(false).await?; match remote.call(NAME, None).await { diff --git a/cmd/zfs_object_agent/zettacache/Cargo.toml b/cmd/zfs_object_agent/zettacache/Cargo.toml index 8af5c036d2c1..c37fc269ae50 100644 --- a/cmd/zfs_object_agent/zettacache/Cargo.toml +++ b/cmd/zfs_object_agent/zettacache/Cargo.toml @@ -29,7 +29,7 @@ num = "0.4.0" rand = "0.8.3" roaring = "0.7.0" seahash = "4.1.0" -serde = { version = "1.0.125", features = ["derive"] } +serde = { version = "1.0.125", features = ["derive", "rc"] } serde_json = "1.0.64" sysinfo = "0.21.1" tokio = { version = "1.4", features = ["full"] } diff --git a/cmd/zfs_object_agent/zettacache/src/block_access.rs b/cmd/zfs_object_agent/zettacache/src/block_access.rs index 8ae786f50073..234db209d063 100644 --- a/cmd/zfs_object_agent/zettacache/src/block_access.rs +++ b/cmd/zfs_object_agent/zettacache/src/block_access.rs @@ -13,21 +13,22 @@ use nix::sys::stat::SFlag; use num::Num; use num::NumCast; use serde::de::DeserializeOwned; -use serde::Deserialize; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use std::fmt::Debug; use std::fmt::Display; use std::io::Read; use std::io::Write; use std::os::unix::prelude::AsRawFd; use std::os::unix::prelude::OpenOptionsExt; +use std::path::Path; +use std::sync::atomic::Ordering; use std::time::Instant; use tokio::fs::File; use tokio::sync::Semaphore; use util::get_tunable; -use util::AlignedBytes; -use util::AlignedVec; +use util::zettacache_stats::*; use util::From64; +use util::{AlignedBytes, AlignedVec}; use util::{DeviceEntry, DeviceList}; lazy_static! { @@ -45,11 +46,62 @@ struct BlockHeader { checksum: u64, } +#[must_use] +struct OpInProgress<'a> { + begin: Instant, + counters: &'a IoStatValues, +} + +impl<'a> OpInProgress<'a> { + fn new(counters: &'a IoStatValues) -> Self { + counters.active_count.0.fetch_add(1, Ordering::Relaxed); + OpInProgress { + begin: Instant::now(), + counters, + } + } + + fn end(self, bytes: u64) { + let counters = self.counters; + counters.operations.0.fetch_add(1, Ordering::Relaxed); + counters.total_bytes.0.fetch_add(bytes, Ordering::Relaxed); + counters.total_nanoseconds.0.fetch_add( + self.begin.elapsed().as_nanos().try_into().unwrap(), + Ordering::Relaxed, + ); + + // The first latency bucket is 1 microsecond + let latency = self.begin.elapsed().as_micros(); + let histo = &counters.latency_histogram.0; + histo + .get(latency.next_power_of_two().trailing_zeros() as usize) + .unwrap_or_else(|| histo.last().unwrap()) + .0 + .fetch_add(1, Ordering::Relaxed); + + // The first request size bucket is 512 bytes + let size = bytes >> 9; + let histo = &counters.request_histogram.0; + histo + .get(size.next_power_of_two().trailing_zeros() as usize) + .unwrap_or_else(|| histo.last().unwrap()) + .0 + .fetch_add(1, Ordering::Relaxed); + } +} + +impl<'a> Drop for OpInProgress<'a> { + fn drop(&mut self) { + self.counters.active_count.0.fetch_sub(1, Ordering::Relaxed); + } +} + #[derive(Debug)] pub struct BlockAccess { sector_size: usize, disks: Vec, readonly: bool, + timebase: Instant, } #[derive(Debug)] @@ -58,6 +110,7 @@ pub struct Disk { device_path: String, size: u64, sector_size: usize, + io_stats: DiskIoStats, outstanding_reads: Semaphore, outstanding_writes: Semaphore, } @@ -121,11 +174,20 @@ impl Disk { } else { panic!("{}: invalid file type {:?}", disk_path, mode); } + + let device = Path::new(disk_path) + .file_name() + .unwrap() + .to_str() + .unwrap() + .to_owned(); + let this = Disk { file, device_path: disk_path.to_string(), size, sector_size, + io_stats: DiskIoStats::new(device), outstanding_reads: Semaphore::new(*DISK_READ_MAX_QUEUE_DEPTH), outstanding_writes: Semaphore::new(*DISK_WRITE_MAX_QUEUE_DEPTH), }; @@ -148,10 +210,12 @@ impl BlockAccess { }) .unwrap() .sector_size; + BlockAccess { sector_size, disks, readonly, + timebase: Instant::now(), } } @@ -188,7 +252,7 @@ impl BlockAccess { } // offset and length must be sector-aligned - pub async fn read_raw(&self, extent: Extent) -> AlignedBytes { + pub async fn read_raw(&self, extent: Extent, io_type: DiskIoType) -> AlignedBytes { self.verify_aligned(extent.location.offset); self.verify_aligned(extent.size); let disk = self.disk(extent.location.disk); @@ -196,7 +260,8 @@ impl BlockAccess { let sector_size = self.sector_size; let begin = Instant::now(); let _permit = disk.outstanding_reads.acquire().await.unwrap(); - let bytes = tokio::task::spawn_blocking(move || { + let op = OpInProgress::new(&disk.io_stats.stats[io_type]); + let bytes: AlignedBytes = tokio::task::spawn_blocking(move || { let mut v = AlignedVec::with_capacity(usize::from64(extent.size), sector_size); // By using the unsafe libc::pread() instead of // nix::sys::uio::pread(), we avoid the cost of zeroing out the @@ -216,6 +281,7 @@ impl BlockAccess { }) .await .unwrap(); + op.end(bytes.len() as u64); trace!( "read({:?}) returned in {}us", extent, @@ -226,7 +292,12 @@ impl BlockAccess { // location.offset and bytes.len() must be sector-aligned. However, // bytes.alignment() need not be the sector size (it will be copied if not). - pub async fn write_raw(&self, location: DiskLocation, mut bytes: AlignedBytes) { + pub async fn write_raw( + &self, + location: DiskLocation, + mut bytes: AlignedBytes, + io_type: DiskIoType, + ) { assert!( !self.readonly, "attempting zettacache write in readonly mode" @@ -247,6 +318,7 @@ impl BlockAccess { assert_eq!(bytes.as_ptr() as usize % self.sector_size, 0); let begin = Instant::now(); let _permit = disk.outstanding_writes.acquire().await.unwrap(); + let op = OpInProgress::new(&disk.io_stats.stats[io_type]); tokio::task::spawn_blocking(move || { nix::sys::uio::pwrite(fd, &bytes, i64::try_from(offset).unwrap()).unwrap(); trace!( @@ -258,6 +330,7 @@ impl BlockAccess { }) .await .unwrap(); + op.end(length as u64); } pub fn round_up_to_sector(&self, n: N) -> N { @@ -377,4 +450,16 @@ impl BlockAccess { self.round_up_to_sector(header_size + data.len()), )) } + + /// Return the I/O stats collected as a serialized json string. + pub fn io_stats_as_json(&self) -> String { + let timestamp = self.timebase.elapsed(); + // TODO -- should pass the timebase to differentiate previous stats across agent restart + + serde_json::to_string(&IoStatsRef { + timestamp, + disk_stats: self.disks.iter().map(|disk| &disk.io_stats).collect(), + }) + .unwrap() + } } diff --git a/cmd/zfs_object_agent/zettacache/src/block_based_log.rs b/cmd/zfs_object_agent/zettacache/src/block_based_log.rs index 478a3190163f..3187fb4a0bb1 100644 --- a/cmd/zfs_object_agent/zettacache/src/block_based_log.rs +++ b/cmd/zfs_object_agent/zettacache/src/block_based_log.rs @@ -25,6 +25,7 @@ use std::ops::Sub; use std::sync::Arc; use std::time::Instant; use util::get_tunable; +use util::zettacache_stats::DiskIoType; use util::AlignedVec; lazy_static! { @@ -96,7 +97,7 @@ impl BlockBasedLogPhys { let truncated_extent = extent.range(0, min(extent.size, (next_chunk_offset - *offset))); - let extent_bytes = block_access.read_raw(truncated_extent).await; + let extent_bytes = block_access.read_raw(truncated_extent, DiskIoType::MaintenanceRead).await; let mut total_consumed = 0; while total_consumed < extent_bytes.len() { let chunk_location = extent.location.offset + total_consumed as u64; @@ -317,10 +318,11 @@ impl BlockBasedLog { if extent.location != pending_location + pending_vec.len() || pending_vec.unused_capacity() < raw_chunk.len() => { - writes_stream.push( - self.block_access - .write_raw(pending_location, pending_vec.into()), - ); + writes_stream.push(self.block_access.write_raw( + pending_location, + pending_vec.into(), + DiskIoType::MaintenanceWrite, + )); pending_write = None; } _ => (), @@ -339,7 +341,11 @@ impl BlockBasedLog { assert_eq!(*pending_location + pending_vec.len(), extent.location); pending_vec.extend_from_slice(&raw_chunk); } - None => writes_stream.push(self.block_access.write_raw(extent.location, raw_chunk)), + None => writes_stream.push(self.block_access.write_raw( + extent.location, + raw_chunk, + DiskIoType::MaintenanceWrite, + )), } new_chunk_fn(chunk.id, chunk.offset, first_entry); @@ -349,10 +355,11 @@ impl BlockBasedLog { self.phys.next_chunk_offset.0 += raw_size; } if let Some((pending_location, pending_vec)) = pending_write { - writes_stream.push( - self.block_access - .write_raw(pending_location, pending_vec.into()), - ); + writes_stream.push(self.block_access.write_raw( + pending_location, + pending_vec.into(), + DiskIoType::MaintenanceWrite, + )); } writes_stream.for_each(|_| async move {}).await; self.pending_entries.truncate(0); @@ -554,7 +561,11 @@ impl BlockBasedLogWithSummary { chunk_extent, key ); - let chunk_bytes = self.this.block_access.read_raw(chunk_extent).await; + let chunk_bytes = self + .this + .block_access + .read_raw(chunk_extent, DiskIoType::ReadIndexForLookup) + .await; let (chunk, _consumed): (BlockBasedLogChunk, usize) = self.this.block_access.chunk_from_raw(&chunk_bytes).unwrap(); assert_eq!(chunk.id, ChunkId(chunk_id as u64)); diff --git a/cmd/zfs_object_agent/zettacache/src/superblock.rs b/cmd/zfs_object_agent/zettacache/src/superblock.rs index fd354a1d6e0c..656fbccf7ffc 100644 --- a/cmd/zfs_object_agent/zettacache/src/superblock.rs +++ b/cmd/zfs_object_agent/zettacache/src/superblock.rs @@ -7,6 +7,7 @@ use futures::stream::*; use log::*; use serde::{Deserialize, Serialize}; use util::maybe_die_with; +use util::zettacache_stats::DiskIoType; pub const SUPERBLOCK_SIZE: u64 = 4 * 1024; @@ -109,7 +110,10 @@ impl PrimaryPhys { impl SuperblockPhys { async fn read(block_access: &BlockAccess, disk: DiskId) -> Result { let raw = block_access - .read_raw(Extent::new(disk, 0, SUPERBLOCK_SIZE)) + .read_raw( + Extent::new(disk, 0, SUPERBLOCK_SIZE), + DiskIoType::MaintenanceRead, + ) .await; let (this, _): (Self, usize) = block_access.chunk_from_raw(&raw)?; debug!("got {:#?}", this); @@ -132,7 +136,11 @@ impl SuperblockPhys { let raw = block_access.chunk_to_raw(EncodeType::Json, self); // XXX pad it out to SUPERBLOCK_SIZE? block_access - .write_raw(DiskLocation { offset: 0, disk }, raw) + .write_raw( + DiskLocation { offset: 0, disk }, + raw, + DiskIoType::MaintenanceWrite, + ) .await; } } diff --git a/cmd/zfs_object_agent/zettacache/src/zettacache.rs b/cmd/zfs_object_agent/zettacache/src/zettacache.rs index 5d49dc095587..a87f6f996772 100644 --- a/cmd/zfs_object_agent/zettacache/src/zettacache.rs +++ b/cmd/zfs_object_agent/zettacache/src/zettacache.rs @@ -49,6 +49,7 @@ use tokio::time::{sleep_until, timeout_at}; use util::get_tunable; use util::maybe_die_with; use util::nice_p2size; +use util::zettacache_stats::DiskIoType; use util::AlignedBytes; use util::From64; use util::LockSet; @@ -99,7 +100,9 @@ struct ZettaCheckpointPhys { impl ZettaCheckpointPhys { async fn read(block_access: &BlockAccess, extent: Extent) -> ZettaCheckpointPhys { - let raw = block_access.read_raw(extent).await; + let raw = block_access + .read_raw(extent, DiskIoType::MaintenanceRead) + .await; let (this, _): (Self, usize) = block_access.chunk_from_raw(&raw).unwrap(); debug!("got {:#?}", this); this @@ -484,8 +487,12 @@ impl MergeState { .for_each_concurrent( *CACHE_REBALANCE_CONCURRENCY_LIMIT, |(old, new)| async move { - let bytes = block_access.read_raw(*old).await; - block_access.write_raw(*new, bytes).await; + let bytes = block_access + .read_raw(*old, DiskIoType::MaintenanceRead) + .await; + block_access + .write_raw(*new, bytes, DiskIoType::MaintenanceWrite) + .await; }, ) .await; @@ -655,7 +662,11 @@ impl ZettaCache { let checkpoint_extent = checkpoint_capacity.range(0, raw.len() as u64); block_access - .write_raw(checkpoint_extent.location, raw) + .write_raw( + checkpoint_extent.location, + raw, + DiskIoType::MaintenanceWrite, + ) .await; let num_disks = block_access.disks().count(); PrimaryPhys { @@ -1140,7 +1151,7 @@ impl ZettaCache { { // Hold the index lock over the whole operation // so that the index can't change after we get the value from it. - // Lock ordering requres that we lock the index before locking the state. + // Lock ordering requires that we lock the index before locking the state. let index = self.index.read().await; let read_data_fut_opt = { // We don't want to hold the state lock while reading from disk so we @@ -1310,7 +1321,7 @@ impl ZettaCache { if let LookupResponse::Present((cache_bytes, locked_key)) = self.lookup(guid, block, LookupSource::Write).await { - // For (hopefully) obvious rasonse, we only need to do the eviction when the bytes contained in the cache differ + // For (hopefully) obvious reasons, we only need to do the eviction when the bytes contained in the cache differ // from the bytes contained in the object store. The bytes contained in the object store are always preferred // over the bytes contained in the cache; we assume the bytes passed were retrieved from the object store. if *cache_bytes != *object_bytes { @@ -1347,6 +1358,10 @@ impl ZettaCache { pub fn devices_as_json(&self) -> String { serde_json::to_string(&self.block_access.list_devices()).unwrap() } + + pub fn io_stats_as_json(&self) -> String { + self.block_access.io_stats_as_json() + } } pub struct ZCacheDBHandle { @@ -1664,7 +1679,9 @@ impl ZettaCacheState { let _permit = write_sem.acquire().await.unwrap(); } - let bytes = block_access.read_raw(value.extent()).await; + let bytes = block_access + .read_raw(value.extent(), DiskIoType::ReadDataForLookup) + .await; sem.add_permits(1); // XXX we can easily handle an io error here by returning None Some(bytes) @@ -1788,7 +1805,9 @@ impl ZettaCacheState { // Note: locked_key can be dropped before the i/o completes, since the // changes to the State have already been made. future::Either::Right(async move { - block_access.write_raw(location, bytes).await; + block_access + .write_raw(location, bytes, DiskIoType::WriteDataForInsert) + .await; sem.add_permits(1); }) } @@ -1956,7 +1975,11 @@ impl ZettaCacheState { debug!("writing to {:?}: {:#?}", checkpoint_extent, checkpoint); self.block_access - .write_raw(checkpoint_extent.location, raw) + .write_raw( + checkpoint_extent.location, + raw, + DiskIoType::MaintenanceWrite, + ) .await; self.primary.checkpoint = checkpoint_extent; diff --git a/cmd/zfs_object_agent/zettaobject/src/public_connection.rs b/cmd/zfs_object_agent/zettaobject/src/public_connection.rs index da6ff5d1beb0..12fb4ab2c2c9 100644 --- a/cmd/zfs_object_agent/zettaobject/src/public_connection.rs +++ b/cmd/zfs_object_agent/zettaobject/src/public_connection.rs @@ -61,6 +61,7 @@ impl PublicConnectionState { ); server.register_handler("report_hits", Box::new(Self::report_hits)); server.register_handler("list_devices", Box::new(Self::list_devices)); + server.register_handler("zcache_iostat", Box::new(Self::zcache_iostat)); } fn get_pools(&mut self, nvl: NvList) -> HandlerReturn { @@ -250,4 +251,32 @@ impl PublicConnectionState { })) } } + + fn zcache_iostat(&mut self, nvl: NvList) -> HandlerReturn { + debug!("got request: {:?}", nvl); + let mut response = NvList::new_unique_names(); + let cache = self.cache.as_ref().cloned(); + + if let Some(zettacache) = cache { + Ok(Box::pin(async move { + let json_stats = zettacache.io_stats_as_json(); + + response + .insert("iostats_json", &json_stats.as_str()) + .unwrap(); + response.insert("Type", "zcache_iostat").unwrap(); + response.insert("result", "ok").unwrap(); + + debug!("sending response: {:?}", response); + Ok(Some(response)) + })) + } else { + Ok(Box::pin(async move { + response.insert("Type", "zcache_iostat").unwrap(); + response.insert("result", "err").unwrap(); + debug!("sending response: {:?}", response); + Ok(Some(response)) + })) + } + } }