From 287ee6c05302eb2db510729241ec2157eb377335 Mon Sep 17 00:00:00 2001 From: Don Brady Date: Thu, 20 Jan 2022 17:57:31 -0700 Subject: [PATCH] zcache iostat should survive an agent restart (#96) --- cmd/zfs_object_agent/Cargo.lock | 3 +- cmd/zfs_object_agent/util/Cargo.toml | 1 + cmd/zfs_object_agent/util/src/lib.rs | 1 + cmd/zfs_object_agent/util/src/write_stdout.rs | 54 +++++++ .../util/src/zettacache_stats.rs | 10 +- .../zcache/src/clear_hit_data.rs | 5 +- cmd/zfs_object_agent/zcache/src/iostat.rs | 83 +++++------ .../zcache/src/list_devices.rs | 13 +- cmd/zfs_object_agent/zcache/src/main.rs | 16 +-- .../zcache/src/remote_channel.rs | 135 +++++++++++++----- .../zcache/src/report_hits.rs | 26 ++-- cmd/zfs_object_agent/zcache/src/stats.rs | 60 ++++---- cmd/zfs_object_agent/zettacache/Cargo.toml | 2 +- .../zettacache/src/block_access.rs | 9 +- .../zettacache/src/zettacache.rs | 6 +- 15 files changed, 280 insertions(+), 144 deletions(-) create mode 100644 cmd/zfs_object_agent/util/src/write_stdout.rs diff --git a/cmd/zfs_object_agent/Cargo.lock b/cmd/zfs_object_agent/Cargo.lock index fc6d3cfec6b8..f9fafacf8adb 100644 --- a/cmd/zfs_object_agent/Cargo.lock +++ b/cmd/zfs_object_agent/Cargo.lock @@ -2553,6 +2553,7 @@ dependencies = [ "rusoto_credential", "serde 1.0.133", "tokio", + "uuid", ] [[package]] @@ -2807,7 +2808,6 @@ dependencies = [ "log", "lru", "lz4", - "metered", "more-asserts", "nix", "num", @@ -2819,6 +2819,7 @@ dependencies = [ "sysinfo", "tokio", "util", + "uuid", ] [[package]] diff --git a/cmd/zfs_object_agent/util/Cargo.toml b/cmd/zfs_object_agent/util/Cargo.toml index 25886cfa4a55..b9e6337fcf24 100644 --- a/cmd/zfs_object_agent/util/Cargo.toml +++ b/cmd/zfs_object_agent/util/Cargo.toml @@ -25,3 +25,4 @@ roaring = "0.7.0" rusoto_credential = "0.47.0" serde = { version = "1.0.125", features = ["derive"] } tokio = { version = "1.4", features = ["full"] } +uuid = {version = "0.8.2", features = ["v4", "serde"]} diff --git a/cmd/zfs_object_agent/util/src/lib.rs b/cmd/zfs_object_agent/util/src/lib.rs index 40f77f5d28f3..8fce5bf9120e 100644 --- a/cmd/zfs_object_agent/util/src/lib.rs +++ b/cmd/zfs_object_agent/util/src/lib.rs @@ -15,6 +15,7 @@ mod nicenum; mod range_tree; mod tunable; mod vec_ext; +pub mod write_stdout; mod zcache_devices; pub mod zettacache_stats; diff --git a/cmd/zfs_object_agent/util/src/write_stdout.rs b/cmd/zfs_object_agent/util/src/write_stdout.rs new file mode 100644 index 000000000000..64736c2e3cde --- /dev/null +++ b/cmd/zfs_object_agent/util/src/write_stdout.rs @@ -0,0 +1,54 @@ +/*! +Alternative to `println!` macros suitable for use in commands. + +The `println!` flavors of macros are good for debugging output but not suitable for +production code since they panic when their output is not fully consumed (i.e. write +to a stream that has closed). + +The println replacement macros in this module will terminate the process if writing +to stdout/stderr returns an error (like `EPIPE`) rather than cause a panic. +!*/ + +/// Similar to `print!` macro, except it terminates the process on write errors (does not panic). +#[macro_export] +macro_rules! write_stdout { + ($($arg:tt)*) => ({ + use std::io::Write; + if let Err(_) = write!(std::io::stdout(), $($arg)*) { + std::process::exit(0) + } + }) +} + +/// Similar to `println!` macro, except it terminates the process on write errors (does not panic). +#[macro_export] +macro_rules! writeln_stdout { + ($($arg:tt)*) => ({ + use std::io::Write; + if let Err(_) = writeln!(std::io::stdout(), $($arg)*) { + std::process::exit(0) + } + }) +} + +/// Similar to `eprint!` macro, except it terminates the process on write errors (does not panic). +#[macro_export] +macro_rules! write_stderr { + ($($arg:tt)*) => ({ + use std::io::Write; + if let Err(_) = write!(std::io::stderr(), $($arg)*) { + std::process::exit(0) + } + }) +} + +/// Similar to `eprintln!` macro, except it terminates the process on write errors (does not panic). +#[macro_export] +macro_rules! writeln_stderr { + ($($arg:tt)*) => ({ + use std::io::Write; + if let Err(_) = writeln!(std::io::stderr(), $($arg)*) { + std::process::exit(0) + } + }) +} diff --git a/cmd/zfs_object_agent/util/src/zettacache_stats.rs b/cmd/zfs_object_agent/util/src/zettacache_stats.rs index 77136430e6cd..b05cfcde2356 100644 --- a/cmd/zfs_object_agent/util/src/zettacache_stats.rs +++ b/cmd/zfs_object_agent/util/src/zettacache_stats.rs @@ -24,6 +24,7 @@ use std::ops::{AddAssign, Sub}; use std::sync::atomic::Ordering::Relaxed; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Duration; +use uuid::Uuid; /// The zettacache disk I/O types that are collected and displayed for each disk. #[derive(Debug, Enum, Copy, Clone, Serialize, Deserialize)] @@ -295,12 +296,14 @@ impl Sub for &DiskIoStats { /// A snapshot of the I/O stats collected from the zettacache. #[derive(Clone, Debug, Default, Serialize, Deserialize)] pub struct IoStats { + pub cache_runtime_id: Uuid, // must match before timestamps & disk_stats can be compared pub timestamp: Duration, pub disk_stats: Vec, } -#[derive(Debug, Default, Serialize)] +#[derive(Debug, Serialize)] pub struct IoStatsRef<'a> { + pub cache_runtime_id: Uuid, pub timestamp: Duration, pub disk_stats: Vec<&'a DiskIoStats>, } @@ -313,6 +316,7 @@ impl Sub for &IoStats { if other.disk_stats.is_empty() { return self.clone(); } + assert_eq!(self.cache_runtime_id, other.cache_runtime_id); let mut difference = IoStats { timestamp: self.timestamp - other.timestamp, @@ -383,6 +387,7 @@ impl Display for CacheStatCounter { /// A snapshot of the cache stats collected from the zettacache. #[derive(Clone, Debug, Default, Serialize, Deserialize)] pub struct CacheStats { + pub cache_runtime_id: Uuid, // must match before timestamp & stats can be compared pub timestamp: Duration, pub stats: EnumMap, } @@ -390,6 +395,7 @@ pub struct CacheStats { impl CacheStats { pub fn new() -> CacheStats { CacheStats { + cache_runtime_id: Default::default(), timestamp: Duration::default(), stats: Default::default(), } @@ -416,6 +422,8 @@ impl Sub<&Self> for &CacheStats { /// Subtract two CacheStats. Used to create the net values between two samples. fn sub(self, other: &Self) -> CacheStats { + assert_eq!(self.cache_runtime_id, other.cache_runtime_id); + let mut difference = CacheStats { timestamp: self.timestamp - other.timestamp, ..Default::default() diff --git a/cmd/zfs_object_agent/zcache/src/clear_hit_data.rs b/cmd/zfs_object_agent/zcache/src/clear_hit_data.rs index 3d4229202356..081bf9797f6c 100644 --- a/cmd/zfs_object_agent/zcache/src/clear_hit_data.rs +++ b/cmd/zfs_object_agent/zcache/src/clear_hit_data.rs @@ -3,6 +3,7 @@ use crate::subcommand::ZcacheSubCommand; use anyhow::Result; use async_trait::async_trait; use clap::SubCommand; +use util::writeln_stdout; static NAME: &str = "clear_hit_data"; pub struct ClearHitData; @@ -22,10 +23,10 @@ impl ZcacheSubCommand for ClearHitData { let response = remote.call(NAME, None).await; match response { Ok(_) => { - println!("Hits-by-size data cleared"); + writeln_stdout!("Hits-by-size data cleared"); } Err(RemoteError::ResultError(_)) => { - println!("No cache found, so no hits-by-size data present"); + writeln_stdout!("No cache found, so no hits-by-size data present"); } Err(RemoteError::Other(e)) => return Err(e), } diff --git a/cmd/zfs_object_agent/zcache/src/iostat.rs b/cmd/zfs_object_agent/zcache/src/iostat.rs index adfc9e7cba55..8f6b5faef15b 100644 --- a/cmd/zfs_object_agent/zcache/src/iostat.rs +++ b/cmd/zfs_object_agent/zcache/src/iostat.rs @@ -14,6 +14,7 @@ use std::thread::sleep; use std::time::Duration; use util::zettacache_stats::*; use util::{nice_number_time, nice_p2size}; +use util::{write_stdout, writeln_stdout}; static NAME: &str = "iostat"; static REQUEST: &str = "zcache_iostat"; @@ -45,22 +46,22 @@ impl IoStatDisplay { } fn display_dashes(width: usize) { - print!("{0:-<1$} ", "-", width); + write_stdout!("{0:-<1$} ", "-", width); } /// Display centered column group titles, such as 'Lookup: Read-Data' fn display_header(&self, titles: Vec<&str>, columns_per_group: usize) { if self.show_time { - print!("{:^1$} ", "timestamp", self.time_width()); + write_stdout!("{:^1$} ", "timestamp", self.time_width()); } if self.show_devices { - print!("{:^1$} ", "zcache", self.max_name_length); + write_stdout!("{:^1$} ", "zcache", self.max_name_length); } let column_group_width = (columns_per_group * (IoStatDisplay::VALUE_WIDTH + 2)) - 2; for title in titles { - print!("{:^1$} ", title, column_group_width); + write_stdout!("{:^1$} ", title, column_group_width); } - println!(); + writeln_stdout!(); } /// Print a line comprised of a set of headers repeated `copies` times. @@ -71,14 +72,14 @@ impl IoStatDisplay { // First print column headers if self.show_time { // Note: we use the date as the header over the timestamp column - print!( + write_stdout!( "{0:>1$} ", format!("{}", Local::now().format("%Y-%m-%d")), self.time_width() ); } if self.show_devices { - print!("{:^1$} ", "device", self.max_name_length); + write_stdout!("{:^1$} ", "device", self.max_name_length); } for cg in 0..copies { for h in &headers { @@ -88,13 +89,13 @@ impl IoStatDisplay { } else { 2 // default is two spaces }; - print!("{0:^1$}{2:3$}", h, IoStatDisplay::VALUE_WIDTH, "", spacing); + write_stdout!("{0:^1$}{2:3$}", h, IoStatDisplay::VALUE_WIDTH, "", spacing); } if cg != last_group { - print!(" "); // Separate column groups by two additional spaces + write_stdout!(" "); // Separate column groups by two additional spaces } } - println!(); + writeln_stdout!(); // Now print dashes underneath each column if self.show_time { @@ -108,10 +109,10 @@ impl IoStatDisplay { IoStatDisplay::display_dashes(IoStatDisplay::VALUE_WIDTH); } if cg != last_group { - print!(" "); // Separate column groups by 2 additional spaces + write_stdout!(" "); // Separate column groups by 2 additional spaces } } - println!(); + writeln_stdout!(); } /// Display 3 rows worth of headers which includes a row of dashes below each column @@ -180,9 +181,9 @@ impl IoStatDisplay { if self.show_active { stat_values.active_count.display_pretty(None); } - print!(" "); // Note we pad with two additional spaces between groups + write_stdout!(" "); // Note we pad with two additional spaces between groups } - println!(); + writeln_stdout!(); } /// Display the default iostat output. @@ -210,7 +211,7 @@ impl IoStatDisplay { // Show the time once (above) in 'summary' row, but not with each device row String::from("") }; - print!("{:>1$} ", time, self.time_width()); + write_stdout!("{:>1$} ", time, self.time_width()); } if self.show_devices { let (width, indent) = if i == 0 { @@ -218,7 +219,7 @@ impl IoStatDisplay { } else { (self.max_name_length - 2, " ") }; - print!("{}{:<2$} ", indent, disk_stats.name, width); + write_stdout!("{}{:<2$} ", indent, disk_stats.name, width); } self.display_one_row(disk_stats, stat_delta.timestamp); @@ -227,17 +228,17 @@ impl IoStatDisplay { } } if self.show_devices { - println!() + writeln_stdout!() } } fn display_histogram_headers(&self, name: &str, headers: Vec<&str>) { - print!("{:<1$} ", name, self.max_name_length); + write_stdout!("{:<1$} ", name, self.max_name_length); for column in headers { - print!("{:^1$}", column, IoStatDisplay::VALUE_WIDTH + 2); + write_stdout!("{:^1$}", column, IoStatDisplay::VALUE_WIDTH + 2); } - println!(); + writeln_stdout!(); } /// Print the histogram iostat output. @@ -245,12 +246,12 @@ impl IoStatDisplay { for disk_stat in stat_delta.disk_stats.iter() { if self.show_time { if self.interval_is_subsecond { - println!("{}", Local::now().format("%Y-%m-%d %H:%M:%S%.3f UTC")); + writeln_stdout!("{}", Local::now().format("%Y-%m-%d %H:%M:%S%.3f UTC")); } else { - println!("{}", Local::now().format("%Y-%m-%d %H:%M:%S UTC")); + writeln_stdout!("{}", Local::now().format("%Y-%m-%d %H:%M:%S UTC")); } } - println!(); + writeln_stdout!(); let device_name = if self.show_devices { &disk_stat.name } else { @@ -274,33 +275,33 @@ impl IoStatDisplay { // First display each bucket name // Use ending range of bucket for latency (first bucket is 1us) let nice_value = nice_number_time(Duration::from_nanos((1024 << j) - 1)); - print!("{:>1$} ", nice_value, self.max_name_length); + write_stdout!("{:>1$} ", nice_value, self.max_name_length); // Then display bucket values for each disk io type (total of 5) for v in disk_stat.stats.values() { let count = &v.latency_histogram.0[j]; count.display_pretty(None); } - println!(); + writeln_stdout!(); } } else { for j in 0..RequestHistogram::BUCKETS { // First display each bucket name // Use starting range of bucket for request sizes (first bucket is 512B) let nice_value = nice_p2size(512 << j); - print!("{:>1$} ", nice_value, self.max_name_length); + write_stdout!("{:>1$} ", nice_value, self.max_name_length); // Then display bucket values for each disk io type (total of 5) for v in disk_stat.stats.values() { let count = &v.request_histogram.0[j]; count.display_pretty(None); } - println!(); + writeln_stdout!(); } } // Print a line of dashes after each histogram - println!( + writeln_stdout!( "{:-<1$}", "-", self.max_name_length + ((IoStatDisplay::VALUE_WIDTH + 2) * disk_stat.stats.len()) @@ -314,9 +315,7 @@ impl IoStatDisplay { async fn display_io_stats(&mut self) -> Result<()> { let mut iteration = 0; let mut previous = IoStats::default(); // place holder empty stats - let mut remote = RemoteChannel::new(false).await?; - // TODO need to handle an agent restart (currently stops with signal SIGPIPE) loop { let latest = match remote.call(REQUEST, None).await { @@ -337,26 +336,30 @@ impl IoStatDisplay { latest } Err(RemoteError::ResultError(_)) => { - println!("No cache found?"); + info!("No cache found?"); continue; } Err(RemoteError::Other(e)) => { - println!("remote call error: {}", e); - // typically something like "Connection reset by peer (os error 104)" + info!("object agent restarted: {}", e); return Err(e); } }; - let delta = &latest - &previous; + if previous.disk_stats.is_empty() + || latest.cache_runtime_id == previous.cache_runtime_id + { + let delta = &latest - &previous; - match &self.histogram_name { - None => self.display_iostat_default(iteration, &delta), - Some(name) => self.display_iostat_histogram(name, &delta), + match &self.histogram_name { + None => self.display_iostat_default(iteration, &delta), + Some(name) => self.display_iostat_histogram(name, &delta), + } + // Flush stdout in case output is redirected to a file + io::stdout().flush()?; + } else { + info!("object agent restarted"); } - // Flush stdout in case output is redirected to a file - io::stdout().flush()?; - iteration += 1; let interval: Duration = match self.interval { None => return Ok(()), diff --git a/cmd/zfs_object_agent/zcache/src/list_devices.rs b/cmd/zfs_object_agent/zcache/src/list_devices.rs index 6101126aeed0..1e35c0694748 100644 --- a/cmd/zfs_object_agent/zcache/src/list_devices.rs +++ b/cmd/zfs_object_agent/zcache/src/list_devices.rs @@ -8,6 +8,7 @@ use clap::{Arg, SubCommand}; use std::fs; use std::path::{Path, PathBuf}; use util::{nice_p2size, DeviceEntry, DeviceList}; +use util::{write_stdout, writeln_stdout}; static NAME: &str = "list_devices"; @@ -52,11 +53,11 @@ impl DeviceDisplay { let name_width = self.max_name_length(&devices.devices); for device in &devices.devices { - print!("{:<1$} ", self.derive_name(&device.name), name_width); + write_stdout!("{:<1$} ", self.derive_name(&device.name), name_width); if self.show_size { - print!("{:>6}", nice_p2size(device.size)); + write_stdout!("{:>6}", nice_p2size(device.size)); } - println!(); + writeln_stdout!(); } } @@ -69,17 +70,17 @@ impl DeviceDisplay { let devices: DeviceList = serde_json::from_str(devices_json.to_str()?)?; if self.json_output { - println!("{}", serde_json::to_string_pretty(&devices)?) + writeln_stdout!("{}", serde_json::to_string_pretty(&devices)?) } else { self.display_devices(&devices); } } Err(RemoteError::ResultError(_)) => { - println!("No cache found?"); + writeln_stdout!("No cache found?"); return Ok(()); } Err(RemoteError::Other(e)) => { - println!("remote call error: {}", e); + writeln_stdout!("remote call error: {}", e); return Err(e); } } diff --git a/cmd/zfs_object_agent/zcache/src/main.rs b/cmd/zfs_object_agent/zcache/src/main.rs index b479b96f96a7..2d1b230ae8a7 100644 --- a/cmd/zfs_object_agent/zcache/src/main.rs +++ b/cmd/zfs_object_agent/zcache/src/main.rs @@ -2,6 +2,8 @@ #![warn(clippy::cast_possible_truncation)] #![warn(clippy::cast_possible_wrap)] #![warn(clippy::cast_sign_loss)] +#![deny(clippy::print_stdout)] +#![deny(clippy::print_stderr)] mod clear_hit_data; mod iostat; mod list_devices; @@ -20,17 +22,9 @@ use log::*; use report_hits::ReportHits; use stats::Stats; use subcommand::ZcacheSubCommand; +use util::writeln_stdout; fn main() -> Result<()> { - // When zcache is used in a UNIX shell pipeline and its output is not fully - // consumed a SIGPIPE (e.g. "broken pipe") signal is sent to us. By default, - // we would abort and generate a core dump which is annoying. The unsafe - // line below changes that behavior to just terminating as it is expected by - // other UNIX utilities. - // reference: https://github.com/rust-lang/rust/issues/46016 - unsafe { - libc::signal(libc::SIGPIPE, libc::SIG_DFL); - } async_main() } @@ -91,8 +85,8 @@ async fn async_main() -> Result<()> { match sub_commands.into_iter().find(|cmd| cmd.name() == cmd_name) { Some(mut subcmd) => subcmd.invoke(cmd_args.unwrap()).await?, None => { - println!("Unable to invoke {}", cmd_name); - println!("{}", matches.usage()); + writeln_stdout!("Unable to invoke {}", cmd_name); + writeln_stdout!("{}", matches.usage()); std::process::exit(exitcode::USAGE); } } diff --git a/cmd/zfs_object_agent/zcache/src/remote_channel.rs b/cmd/zfs_object_agent/zcache/src/remote_channel.rs index 2c5fcb61e9a1..d3e0fa5a5a72 100644 --- a/cmd/zfs_object_agent/zcache/src/remote_channel.rs +++ b/cmd/zfs_object_agent/zcache/src/remote_channel.rs @@ -1,9 +1,11 @@ -use anyhow::{anyhow, Context, Result}; +use anyhow::{anyhow, Result}; use log::*; use nvpair::{NvEncoding, NvList}; -use tokio::io::AsyncReadExt; -use tokio::io::AsyncWriteExt; +use std::thread::sleep; +use std::time::Duration; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::UnixStream; +use util::writeln_stderr; use util::From64; #[derive(Debug)] @@ -12,6 +14,8 @@ pub enum RemoteError { Other(anyhow::Error), } +const ZOA_MAX_RETRIES: usize = 30; + impl std::fmt::Display for RemoteError { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { @@ -43,20 +47,53 @@ impl From for RemoteError { pub struct RemoteChannel { stream: UnixStream, + socket_path: String, } impl RemoteChannel { + async fn open(socket_path: &str) -> Result { + // Retry until a connection is established + // TODO for the initial command launch can we shorten this? + let mut reconnect_retries = 0; + loop { + match UnixStream::connect(socket_path).await { + Ok(stream) => { + info!("opened socket {}", socket_path); + return Ok(stream); + } + Err(e) => { + if reconnect_retries > ZOA_MAX_RETRIES { + info!( + "cannot connect after {} attempts to zfs object agent {}", + reconnect_retries, + e.to_string() + ); + writeln_stderr!("cannot connect to zfs object agent"); + return Err(anyhow!(e)); + } + info!("open socket failed {}", e.to_string()); + sleep(Duration::from_millis(500)); + reconnect_retries += 1; + continue; + } + } + } + } + + /// Create a new RemoteChannel and establish a remote connection to the object agent. pub async fn new(need_priv: bool) -> Result { let socket_path = if need_priv { - "/etc/zfs/zfs_root_socket" + "/etc/zfs/zfs_root_socket".to_string() } else { - "/etc/zfs/zfs_public_socket" + "/etc/zfs/zfs_public_socket".to_string() }; - // XXX - do we need retrys here? - let stream = UnixStream::connect(socket_path) - .await - .with_context(|| format!("Could not connect to {}", socket_path))?; - Ok(Self { stream }) + + let stream = RemoteChannel::open(&socket_path).await?; + + Ok(Self { + stream, + socket_path, + }) } async fn send(&mut self, message: NvList) -> Result<()> { @@ -69,45 +106,69 @@ impl RemoteChannel { } async fn receive(&mut self) -> Result { - // recieve a packed nvlist and unpack it... + // receive a packed nvlist and unpack it... let len64 = self.stream.read_u64_le().await?; let mut v: Vec = vec![0; usize::from64(len64)]; self.stream.read_exact(v.as_mut()).await?; Ok(NvList::try_unpack(v.as_ref()).unwrap()) } + /// Send a request to the object agent and wait for a response. If the agent is restarted + /// before completing this request, it will reconnect and resend the request. Therefore, + /// the request must be idempotent (i.e. executing the request more than once has the same + /// effect as executing it only once). pub async fn call( &mut self, request: &str, args: Option, ) -> Result { - // send request - let mut nvlist = args.unwrap_or_else(NvList::new_unique_names); - nvlist.insert("Type", request).unwrap(); - self.send(nvlist).await?; - debug!("sent {} request, now waiting for response...", request); - // receive response - let response = self.receive().await?; - debug!("received response: {:?}", response); - let response_type = response.lookup_string("Type")?; - let response_type = response_type.to_str()?; - if response_type != request { - return Err(RemoteError::Other(anyhow!( - "expected response type \"{}\", got \"{}\"", - request, - response_type - ))); - } + loop { + // send request, retrying as needed + let mut nvlist = args.clone().unwrap_or_else(NvList::new_unique_names); + nvlist.insert("Type", request).unwrap(); + match self.send(nvlist).await { + Ok(_) => {} + Err(e) => { + // reopen the channel and resend the request + info!("send: object agent restarted: {}", e); + self.stream = RemoteChannel::open(&self.socket_path).await?; + continue; + } + } + debug!("sent {} request, now waiting for response...", request); + + // receive response, retrying as needed + let response: NvList = match self.receive().await { + Ok(response) => response, + Err(e) => { + // reopen the channel and resend the request + info!("receive: object agent restarted: {}", e); + self.stream = RemoteChannel::open(&self.socket_path).await?; + continue; + } + }; + debug!("received response: {:?}", response); + + let response_type = response.lookup_string("Type")?; + let response_type = response_type.to_str()?; + if response_type != request { + return Err(RemoteError::Other(anyhow!( + "expected response type \"{}\", got \"{}\"", + request, + response_type + ))); + } - let result = response.lookup_string("result")?; - let result = result.to_str()?; - match result { - "ok" => Ok(response), - "err" => Err(RemoteError::ResultError(response)), - _ => Err(RemoteError::Other(anyhow!( - "expected \"ok\" or \"err\" for result, got \"{}\"", - result - ))), + let result = response.lookup_string("result")?; + let result = result.to_str()?; + return match result { + "ok" => Ok(response), + "err" => Err(RemoteError::ResultError(response)), + _ => Err(RemoteError::Other(anyhow!( + "expected \"ok\" or \"err\" for result, got \"{}\"", + result + ))), + }; } } } diff --git a/cmd/zfs_object_agent/zcache/src/report_hits.rs b/cmd/zfs_object_agent/zcache/src/report_hits.rs index 0231507e2460..3bc1c2a94ce6 100644 --- a/cmd/zfs_object_agent/zcache/src/report_hits.rs +++ b/cmd/zfs_object_agent/zcache/src/report_hits.rs @@ -10,6 +10,7 @@ use num_traits::cast::ToPrimitive; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use util::nice_p2size; use util::From64; +use util::{write_stdout, writeln_stdout}; static NAME: &str = "report_hits"; @@ -75,19 +76,20 @@ impl SizeHistogram { /// print out a histogram of hits-by-cache-size fn print(&self, quantiles: usize, cumulative: bool, ghost: bool) { let start_as_utc: DateTime = self.start.into(); - println!("Data collection started: {}", start_as_utc.to_rfc2822()); - println!("Data collection ended: {}", Local::now().to_rfc2822()); + writeln_stdout!("Data collection started: {}", start_as_utc.to_rfc2822()); + writeln_stdout!("Data collection ended: {}", Local::now().to_rfc2822()); let total = self.sum_live_hits(); - print!( + write_stdout!( "Cache Hits by Size ({} lookups with {} hits ", - self.lookups, total + self.lookups, + total ); let hit_percent = if self.lookups == 0 { 100.0 } else { total as f64 * 100.0 / self.lookups as f64 }; - println!( + writeln_stdout!( "({:.1}%) in {} cache)", hit_percent, nice_p2size(self.cache_capacity) @@ -107,7 +109,7 @@ impl SizeHistogram { if !ghost { return; } - println!("-------------------ghost hits---------------------"); + writeln_stdout!("-------------------ghost hits---------------------"); } cache_size += bucket_size; // The last bucket may not be the "full" bucket size @@ -119,9 +121,9 @@ impl SizeHistogram { ); cache_size = histogram_capacity; } - print!("{: >8} : ", nice_p2size(cache_size)); + write_stdout!("{: >8} : ", nice_p2size(cache_size)); if total == 0 { - println!(); + writeln_stdout!(); continue; } if cumulative { @@ -131,16 +133,16 @@ impl SizeHistogram { }; if bucket_total == 0 { // this bucket is empty (if we are accumulating, no hits have been seen yet) - println!(" 0%"); + writeln_stdout!(" 0%"); continue; } let percent = (bucket_total as f64 * hit_percent) / total as f64; if percent < 1.0 { // there are a small number of hits - println!(" <1% *"); + writeln_stdout!(" <1% *"); } else { let stars = std::cmp::max(percent.to_usize().unwrap() * HISTOGRAM_WIDTH / 100, 1); - println!("{: >3.0}% {:*<2$}", percent, "", stars); + writeln_stdout!("{: >3.0}% {:*<2$}", percent, "", stars); } } } @@ -204,7 +206,7 @@ impl ZcacheSubCommand for ReportHits { hits_by_size.print(quantiles, cumulative, ghost); } Err(RemoteError::ResultError(_)) => { - println!("No cache found, so no hits-by-size data is available"); + writeln_stdout!("No cache found, so no hits-by-size data is available"); } Err(RemoteError::Other(e)) => return Err(e), } diff --git a/cmd/zfs_object_agent/zcache/src/stats.rs b/cmd/zfs_object_agent/zcache/src/stats.rs index 173e1fe91b6f..9660e0bfa005 100644 --- a/cmd/zfs_object_agent/zcache/src/stats.rs +++ b/cmd/zfs_object_agent/zcache/src/stats.rs @@ -15,6 +15,7 @@ use std::time::Duration; use util::zettacache_stats::CacheStatCounter::*; use util::zettacache_stats::*; use util::{nice_number_count, nice_p2size}; +use util::{write_stdout, writeln_stdout}; static NAME: &str = "stats"; static REQUEST: &str = "zcache_stats"; @@ -48,24 +49,24 @@ impl StatsDisplay { fn display_bytes(&self, bytes: f64) { let value: u64 = bytes.round().to_u64().unwrap(); if self.show_exact_values { - print!("{:0}\t", value); + write_stdout!("{:0}\t", value); } else if value == 0 { // Intentionally avoid displaying "0.00B" when 0 - print!("{:>6} ", "0"); + write_stdout!("{:>6} ", "0"); } else { - print!("{:>6} ", nice_p2size(value)); + write_stdout!("{:>6} ", nice_p2size(value)); } } fn display_count(&self, count: f64) { if self.show_exact_values { let value: u64 = count.round().to_u64().unwrap(); - print!("{:0}\t", value); + write_stdout!("{:0}\t", value); } else if count == 0.0 { // Intentionally avoid displaying "0.00" when 0 - print!("{:>6} ", "0"); + write_stdout!("{:>6} ", "0"); } else { - print!("{:>6} ", nice_number_count(count)); + write_stdout!("{:>6} ", nice_number_count(count)); } } @@ -77,18 +78,20 @@ impl StatsDisplay { }; if self.show_exact_values { - print!("{:0.0}\t", percent.round()); - } else if percent < 0.5 { - print!("{:>6} ", "-"); + write_stdout!("{:0.0}\t", percent.round()); + } else if percent <= 0.0 { + write_stdout!("{:>6} ", "-"); + } else if percent >= 100.0 { + write_stdout!("{:>5.0}% ", percent.round()); } else { - print!("{:>5.0}% ", percent.round()); + write_stdout!("{:>5.1}% ", percent); } } fn display_timestamp(&self) { if self.show_exact_values { // e.g. "1639893294" - print!("{}", Local::now().format("%s%t")); + write_stdout!("{}", Local::now().format("%s%t")); } else { let time = if self.interval_is_subsecond { // e.g. "05:43:54.254" @@ -97,26 +100,26 @@ impl StatsDisplay { // e.g. "05:43:54" Local::now().format("%H:%M:%S") }; - print!("{0:>1$} ", time, self.time_width()); + write_stdout!("{0:>1$} ", time, self.time_width()); } } fn display_dashes(width: usize) { - print!("{0:-<1$} ", "-", width); + write_stdout!("{0:-<1$} ", "-", width); } fn display_headers_impl(&self, top: Vec<(&str, usize)>, bottom: Vec<&str>) { if self.show_time { - print!("{0:^1$} ", "TIMESTAMP", self.time_width()); + write_stdout!("{0:^1$} ", "TIMESTAMP", self.time_width()); } for (header, n) in top.iter() { let width = (n * (StatsDisplay::VALUE_WIDTH + 2)) - 2; - print!("{0:^1$} ", header, width); + write_stdout!("{0:^1$} ", header, width); } - println!(); + writeln_stdout!(); if self.show_time { - print!( + write_stdout!( "{0:>1$} ", format!("{}", Local::now().format("%Y-%m-%d")), self.time_width() @@ -129,9 +132,9 @@ impl StatsDisplay { } else { 2 // default is two spaces }; - print!("{0:^1$}{2:3$}", h, StatsDisplay::VALUE_WIDTH, "", spacing); + write_stdout!("{0:^1$}{2:3$}", h, StatsDisplay::VALUE_WIDTH, "", spacing); } - println!(); + writeln_stdout!(); // Now print dashes underneath each column if self.show_time { @@ -141,7 +144,7 @@ impl StatsDisplay { for _h in 0..bottom.len() { StatsDisplay::display_dashes(StatsDisplay::VALUE_WIDTH); } - println!(); + writeln_stdout!(); } fn display_headers(&self) { @@ -292,7 +295,7 @@ impl StatsDisplay { ); } - println!(); + writeln_stdout!(); } async fn display_stats(&self) -> Result<()> { @@ -309,11 +312,11 @@ impl StatsDisplay { latest = serde_json::from_str(stats_json.to_str()?).unwrap(); } Err(RemoteError::ResultError(_)) => { - println!("No cache found?"); + writeln_stdout!("No cache found?"); continue; } Err(RemoteError::Other(e)) => { - println!("remote call error: {}", e); + writeln_stdout!("remote call error: {}", e); // typically something like "Connection reset by peer (os error 104)" return Err(e); } @@ -324,8 +327,12 @@ impl StatsDisplay { self.display_headers(); } - // Display the net values of collected stats - self.display_stat_values(&(&latest - &&previous)); + if latest.cache_runtime_id == previous.cache_runtime_id { + // Display the net values of collected stats + self.display_stat_values(&(&latest - &&previous)); + } else { + info!("object agent restarted"); + } // Flush stdout in case output is redirected to a file io::stdout().flush().unwrap_or(()); @@ -382,8 +389,7 @@ impl ZcacheSubCommand for Stats { Arg::with_name("timestamp") .long("timestamp") .short("t") - .help("Display a timestamp on each line of stats") - .conflicts_with("all"), + .help("Display a timestamp on each line of stats"), ) .arg( Arg::with_name("insert-detail") diff --git a/cmd/zfs_object_agent/zettacache/Cargo.toml b/cmd/zfs_object_agent/zettacache/Cargo.toml index 52b4cb8b4b58..bf324e68448d 100644 --- a/cmd/zfs_object_agent/zettacache/Cargo.toml +++ b/cmd/zfs_object_agent/zettacache/Cargo.toml @@ -23,7 +23,6 @@ libc = "0.2" log = "0.4" lru = "0.7.0" lz4 = "1.23.2" -metered = "0.8" more-asserts = "0.2.1" nix = "0.23.0" num = "0.4.0" @@ -35,3 +34,4 @@ serde_json = "1.0.64" sysinfo = "0.21.1" tokio = { version = "1.4", features = ["full"] } util = { path = "../util" } +uuid = {version = "0.8.2", features = ["v4", "serde"]} diff --git a/cmd/zfs_object_agent/zettacache/src/block_access.rs b/cmd/zfs_object_agent/zettacache/src/block_access.rs index 46fccde5e40f..5ffed66251e1 100644 --- a/cmd/zfs_object_agent/zettacache/src/block_access.rs +++ b/cmd/zfs_object_agent/zettacache/src/block_access.rs @@ -32,6 +32,7 @@ use util::zettacache_stats::*; use util::From64; use util::{AlignedBytes, AlignedVec}; use util::{DeviceEntry, DeviceList}; +use uuid::Uuid; lazy_static! { static ref MIN_SECTOR_SIZE: usize = get_tunable("min_sector_size", 512); @@ -454,12 +455,10 @@ impl BlockAccess { } /// Return the I/O stats collected as a serialized json string. - pub fn io_stats_as_json(&self) -> String { - let timestamp = self.timebase.elapsed(); - // TODO -- should pass the timebase to differentiate previous stats across agent restart - + pub fn io_stats_as_json(&self, agent_id: Uuid) -> String { serde_json::to_string(&IoStatsRef { - timestamp, + cache_runtime_id: agent_id, // used to detect agent restarts across stat snapshots + timestamp: self.timebase.elapsed(), disk_stats: self.disks.iter().map(|disk| &disk.io_stats).collect(), }) .unwrap() diff --git a/cmd/zfs_object_agent/zettacache/src/zettacache.rs b/cmd/zfs_object_agent/zettacache/src/zettacache.rs index 12d9c89be9bf..21f8bb381430 100644 --- a/cmd/zfs_object_agent/zettacache/src/zettacache.rs +++ b/cmd/zfs_object_agent/zettacache/src/zettacache.rs @@ -58,6 +58,7 @@ use util::From64; use util::LockSet; use util::LockedItem; use util::MutexExt; +use uuid::Uuid; lazy_static! { static ref DEFAULT_CHECKPOINT_SIZE_PCT: f64 = get_tunable("default_checkpoint_size_pct", 0.1); @@ -158,6 +159,7 @@ pub struct ZettaCache { blocking_buffer_bytes_available: Arc, nonblocking_buffer_bytes_available: Arc, write_slots: Arc, + cache_runtime_id: Uuid, } #[derive(Debug, Serialize, Deserialize, Copy, Clone)] @@ -1001,6 +1003,7 @@ impl ZettaCache { block_access, stats, timebase: Instant::now(), + cache_runtime_id: Uuid::new_v4(), }; let (merge_rx, merge_index) = match checkpoint.merge_progress { @@ -1623,11 +1626,12 @@ impl ZettaCache { } pub fn io_stats_as_json(&self) -> String { - self.block_access.io_stats_as_json() + self.block_access.io_stats_as_json(self.cache_runtime_id) } pub async fn stats_as_json(&self) -> String { let mut stats = CacheStats::clone(&self.stats); + stats.cache_runtime_id = self.cache_runtime_id; stats.timestamp = self.timebase.elapsed(); serde_json::to_string(&stats).unwrap() }